summaryrefslogtreecommitdiff
path: root/src/main/kotlin/dev
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/dev')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt22
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt27
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt35
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt33
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt10
5 files changed, 102 insertions, 25 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 2290d44..430648e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -30,7 +30,8 @@ data class AppConfigProperties(
replacement = "app.pseudonymize.generator"
)
var pseudonymizer: PseudonymGenerator = PseudonymGenerator.BUILDIN,
- var transformations: List<TransformationProperties> = listOf()
+ var transformations: List<TransformationProperties> = listOf(),
+ var maxRetryAttempts: Int = 3
) {
companion object {
const val NAME = "app"
@@ -72,10 +73,21 @@ data class RestTargetProperties(
}
}
-@ConfigurationProperties(KafkaTargetProperties.NAME)
-data class KafkaTargetProperties(
- val topic: String = "etl-processor",
- val responseTopic: String = "${topic}_response",
+@ConfigurationProperties(KafkaProperties.NAME)
+data class KafkaProperties(
+ val inputTopic: String?,
+ val outputTopic: String = "etl-processor",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputTopic"
+ )
+ val topic: String = outputTopic,
+ val outputResponseTopic: String = "${outputTopic}_response",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputResponseTopic"
+ )
+ val responseTopic: String = outputResponseTopic,
val groupId: String = "${topic}_group",
val servers: String = ""
) {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index 92965a6..8fb9e19 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -35,13 +35,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
+import org.springframework.retry.RetryCallback
+import org.springframework.retry.RetryContext
+import org.springframework.retry.RetryListener
import org.springframework.retry.policy.SimpleRetryPolicy
import org.springframework.retry.support.RetryTemplate
import org.springframework.retry.support.RetryTemplateBuilder
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.security.crypto.password.PasswordEncoder
import org.springframework.security.provisioning.InMemoryUserDetailsManager
-import org.springframework.security.provisioning.UserDetailsManager
import reactor.core.publisher.Sinks
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@@ -62,8 +64,8 @@ class AppConfiguration {
@ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "GPAS")
@Bean
- fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator {
- return GpasPseudonymGenerator(configProperties)
+ fun gpasPseudonymGenerator(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator {
+ return GpasPseudonymGenerator(configProperties, retryTemplate)
}
@ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "BUILDIN", matchIfMissing = true)
@@ -75,8 +77,8 @@ class AppConfiguration {
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS")
@ConditionalOnMissingBean
@Bean
- fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties): Generator {
- return GpasPseudonymGenerator(configProperties)
+ fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator {
+ return GpasPseudonymGenerator(configProperties, retryTemplate)
}
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN")
@@ -111,11 +113,20 @@ class AppConfiguration {
}
@Bean
- fun retryTemplate(): RetryTemplate {
+ fun retryTemplate(configProperties: AppConfigProperties): RetryTemplate {
return RetryTemplateBuilder()
.notRetryOn(IllegalArgumentException::class.java)
- .fixedBackoff(5.seconds.toJavaDuration())
- .customPolicy(SimpleRetryPolicy(3))
+ .exponentialBackoff(2.seconds.toJavaDuration(), 1.25, 5.seconds.toJavaDuration())
+ .customPolicy(SimpleRetryPolicy(configProperties.maxRetryAttempts))
+ .withListener(object : RetryListener {
+ override fun <T : Any, E : Throwable> onError(
+ context: RetryContext,
+ callback: RetryCallback<T, E>,
+ throwable: Throwable
+ ) {
+ logger.warn("Error occured: {}. Retrying {}", throwable.message, context.retryCount)
+ }
+ })
.build()
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 68b86b2..0977527 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -20,6 +20,7 @@
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.input.KafkaInputListener
import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
@@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
- value = [KafkaTargetProperties::class]
+ value = [KafkaProperties::class]
)
-@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+@ConditionalOnProperty(value = ["app.kafka.servers"])
@ConditionalOnMissingBean(MtbFileSender::class)
@Order(-5)
class AppKafkaConfiguration {
@@ -54,21 +55,21 @@ class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
retryTemplate: RetryTemplate,
objectMapper: ObjectMapper
): MtbFileSender {
logger.info("Selected 'KafkaMtbFileSender'")
- return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
+ return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper)
}
@Bean
- fun kafkaListenerContainer(
+ fun kafkaResponseListenerContainer(
consumerFactory: ConsumerFactory<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
- val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
+ val containerProperties = ContainerProperties(kafkaProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
@@ -82,6 +83,26 @@ class AppKafkaConfiguration {
}
@Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaProperties: KafkaProperties,
+ kafkaInputListener: KafkaInputListener
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaProperties.inputTopic)
+ containerProperties.messageListener = kafkaInputListener
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListener(
+ applicationEventPublisher: ApplicationEventPublisher,
+ ): KafkaInputListener {
+ return KafkaInputListener(applicationEventPublisher)
+ }
+
+ @Bean
fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService {
return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer)
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
new file mode 100644
index 0000000..ee6b56e
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -0,0 +1,33 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.input
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.kafka.listener.MessageListener
+
+class KafkaInputListener(
+ private val applicationEventPublisher: ApplicationEventPublisher
+) : MessageListener<String, MtbFile> {
+ override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
+ TODO("Not yet implemented")
+ }
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 01c7d43..09edd31 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.config.KafkaProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
- private val kafkaTargetProperties: KafkaTargetProperties,
+ private val kafkaProperties: KafkaProperties,
private val retryTemplate: RetryTemplate,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -41,7 +41,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
@@ -71,7 +71,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
@@ -90,7 +90,7 @@ class KafkaMtbFileSender(
}
override fun endpoint(): String {
- return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})"
+ return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})"
}
private fun key(request: MtbFileSender.MtbFileRequest): String {