summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-02-29 08:51:28 +0100
committerPaul-Christian Volkmer2024-02-29 08:57:00 +0100
commit46ddaf10f7a5f0f3da121ef219442109a209db0b (patch)
treee8bab6455f7aa41bb168dd9689fc5df923782929 /src/main
parent25f286f73bb649558850d69bf063130ede3cfc08 (diff)
parent408b121f269b49da1b2dbfa4c3d7190b6df6b010 (diff)
Merge pull request #45 from CCC-MF/issue_34
Verwendung einer applikationsweiten Retry-Konfiguration
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java72
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt22
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt27
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt35
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt33
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt10
6 files changed, 120 insertions, 79 deletions
diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
index 62e702f..3d367bc 100644
--- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
+++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
@@ -22,21 +22,6 @@ package dev.dnpm.etl.processor.pseudonym;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import dev.dnpm.etl.processor.config.GPasConfigProperties;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.security.cert.CertificateFactory;
-import java.security.cert.X509Certificate;
-import java.util.Base64;
-import java.util.HashMap;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
@@ -54,35 +39,39 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
+import org.springframework.http.*;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
-import org.springframework.retry.RetryCallback;
-import org.springframework.retry.RetryContext;
-import org.springframework.retry.RetryListener;
-import org.springframework.retry.RetryPolicy;
-import org.springframework.retry.backoff.ExponentialBackOffPolicy;
-import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
-import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Base64;
+
public class GpasPseudonymGenerator implements Generator {
private final static FhirContext r4Context = FhirContext.forR4();
private final String gPasUrl;
private final String psnTargetDomain;
private final HttpHeaders httpHeader;
- private final RetryTemplate retryTemplate = defaultTemplate();
+ private final RetryTemplate retryTemplate;
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
private SSLContext customSslContext;
private RestTemplate restTemplate;
- public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) {
+ public GpasPseudonymGenerator(GPasConfigProperties gpasCfg, RetryTemplate retryTemplate) {
+ this.retryTemplate = retryTemplate;
this.gPasUrl = gpasCfg.getUri();
this.psnTargetDomain = gpasCfg.getTarget();
@@ -202,31 +191,6 @@ public class GpasPseudonymGenerator implements Generator {
return headers;
}
- protected RetryTemplate defaultTemplate() {
- RetryTemplate retryTemplate = new RetryTemplate();
- ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
- backOffPolicy.setInitialInterval(1000);
- backOffPolicy.setMultiplier(1.25);
- retryTemplate.setBackOffPolicy(backOffPolicy);
- HashMap<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
- retryableExceptions.put(RestClientException.class, true);
- retryableExceptions.put(ConnectException.class, true);
- RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
- retryTemplate.setRetryPolicy(retryPolicy);
-
- retryTemplate.registerListener(new RetryListener() {
- @Override
- public <T, E extends Throwable> void onError(RetryContext context,
- RetryCallback<T, E> callback, Throwable throwable) {
- log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(),
- context.getRetryCount());
- RetryListener.super.onError(context, callback, throwable);
- }
- });
-
- return retryTemplate;
- }
-
/**
* Read SSL root certificate and return SSLContext
*
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 2290d44..430648e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -30,7 +30,8 @@ data class AppConfigProperties(
replacement = "app.pseudonymize.generator"
)
var pseudonymizer: PseudonymGenerator = PseudonymGenerator.BUILDIN,
- var transformations: List<TransformationProperties> = listOf()
+ var transformations: List<TransformationProperties> = listOf(),
+ var maxRetryAttempts: Int = 3
) {
companion object {
const val NAME = "app"
@@ -72,10 +73,21 @@ data class RestTargetProperties(
}
}
-@ConfigurationProperties(KafkaTargetProperties.NAME)
-data class KafkaTargetProperties(
- val topic: String = "etl-processor",
- val responseTopic: String = "${topic}_response",
+@ConfigurationProperties(KafkaProperties.NAME)
+data class KafkaProperties(
+ val inputTopic: String?,
+ val outputTopic: String = "etl-processor",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputTopic"
+ )
+ val topic: String = outputTopic,
+ val outputResponseTopic: String = "${outputTopic}_response",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputResponseTopic"
+ )
+ val responseTopic: String = outputResponseTopic,
val groupId: String = "${topic}_group",
val servers: String = ""
) {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index 92965a6..8fb9e19 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -35,13 +35,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
+import org.springframework.retry.RetryCallback
+import org.springframework.retry.RetryContext
+import org.springframework.retry.RetryListener
import org.springframework.retry.policy.SimpleRetryPolicy
import org.springframework.retry.support.RetryTemplate
import org.springframework.retry.support.RetryTemplateBuilder
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.security.crypto.password.PasswordEncoder
import org.springframework.security.provisioning.InMemoryUserDetailsManager
-import org.springframework.security.provisioning.UserDetailsManager
import reactor.core.publisher.Sinks
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@@ -62,8 +64,8 @@ class AppConfiguration {
@ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "GPAS")
@Bean
- fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator {
- return GpasPseudonymGenerator(configProperties)
+ fun gpasPseudonymGenerator(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator {
+ return GpasPseudonymGenerator(configProperties, retryTemplate)
}
@ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "BUILDIN", matchIfMissing = true)
@@ -75,8 +77,8 @@ class AppConfiguration {
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS")
@ConditionalOnMissingBean
@Bean
- fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties): Generator {
- return GpasPseudonymGenerator(configProperties)
+ fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator {
+ return GpasPseudonymGenerator(configProperties, retryTemplate)
}
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN")
@@ -111,11 +113,20 @@ class AppConfiguration {
}
@Bean
- fun retryTemplate(): RetryTemplate {
+ fun retryTemplate(configProperties: AppConfigProperties): RetryTemplate {
return RetryTemplateBuilder()
.notRetryOn(IllegalArgumentException::class.java)
- .fixedBackoff(5.seconds.toJavaDuration())
- .customPolicy(SimpleRetryPolicy(3))
+ .exponentialBackoff(2.seconds.toJavaDuration(), 1.25, 5.seconds.toJavaDuration())
+ .customPolicy(SimpleRetryPolicy(configProperties.maxRetryAttempts))
+ .withListener(object : RetryListener {
+ override fun <T : Any, E : Throwable> onError(
+ context: RetryContext,
+ callback: RetryCallback<T, E>,
+ throwable: Throwable
+ ) {
+ logger.warn("Error occured: {}. Retrying {}", throwable.message, context.retryCount)
+ }
+ })
.build()
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 68b86b2..0977527 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -20,6 +20,7 @@
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.input.KafkaInputListener
import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
@@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
- value = [KafkaTargetProperties::class]
+ value = [KafkaProperties::class]
)
-@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+@ConditionalOnProperty(value = ["app.kafka.servers"])
@ConditionalOnMissingBean(MtbFileSender::class)
@Order(-5)
class AppKafkaConfiguration {
@@ -54,21 +55,21 @@ class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
retryTemplate: RetryTemplate,
objectMapper: ObjectMapper
): MtbFileSender {
logger.info("Selected 'KafkaMtbFileSender'")
- return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
+ return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper)
}
@Bean
- fun kafkaListenerContainer(
+ fun kafkaResponseListenerContainer(
consumerFactory: ConsumerFactory<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
- val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
+ val containerProperties = ContainerProperties(kafkaProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
@@ -82,6 +83,26 @@ class AppKafkaConfiguration {
}
@Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaProperties: KafkaProperties,
+ kafkaInputListener: KafkaInputListener
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaProperties.inputTopic)
+ containerProperties.messageListener = kafkaInputListener
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListener(
+ applicationEventPublisher: ApplicationEventPublisher,
+ ): KafkaInputListener {
+ return KafkaInputListener(applicationEventPublisher)
+ }
+
+ @Bean
fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService {
return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer)
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
new file mode 100644
index 0000000..ee6b56e
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -0,0 +1,33 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.input
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.kafka.listener.MessageListener
+
+class KafkaInputListener(
+ private val applicationEventPublisher: ApplicationEventPublisher
+) : MessageListener<String, MtbFile> {
+ override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
+ TODO("Not yet implemented")
+ }
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 01c7d43..09edd31 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.config.KafkaProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
- private val kafkaTargetProperties: KafkaTargetProperties,
+ private val kafkaProperties: KafkaProperties,
private val retryTemplate: RetryTemplate,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -41,7 +41,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
@@ -71,7 +71,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
@@ -90,7 +90,7 @@ class KafkaMtbFileSender(
}
override fun endpoint(): String {
- return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})"
+ return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})"
}
private fun key(request: MtbFileSender.MtbFileRequest): String {