summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt19
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt35
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt33
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt10
4 files changed, 81 insertions, 16 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 2290d44..2eea92e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -72,10 +72,21 @@ data class RestTargetProperties(
}
}
-@ConfigurationProperties(KafkaTargetProperties.NAME)
-data class KafkaTargetProperties(
- val topic: String = "etl-processor",
- val responseTopic: String = "${topic}_response",
+@ConfigurationProperties(KafkaProperties.NAME)
+data class KafkaProperties(
+ val inputTopic: String?,
+ val outputTopic: String = "etl-processor",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputTopic"
+ )
+ val topic: String = outputTopic,
+ val outputResponseTopic: String = "${outputTopic}_response",
+ @get:DeprecatedConfigurationProperty(
+ reason = "Deprecated",
+ replacement = "outputResponseTopic"
+ )
+ val responseTopic: String = outputResponseTopic,
val groupId: String = "${topic}_group",
val servers: String = ""
) {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 68b86b2..0977527 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -20,6 +20,7 @@
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.input.KafkaInputListener
import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
@@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
- value = [KafkaTargetProperties::class]
+ value = [KafkaProperties::class]
)
-@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+@ConditionalOnProperty(value = ["app.kafka.servers"])
@ConditionalOnMissingBean(MtbFileSender::class)
@Order(-5)
class AppKafkaConfiguration {
@@ -54,21 +55,21 @@ class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
retryTemplate: RetryTemplate,
objectMapper: ObjectMapper
): MtbFileSender {
logger.info("Selected 'KafkaMtbFileSender'")
- return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
+ return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper)
}
@Bean
- fun kafkaListenerContainer(
+ fun kafkaResponseListenerContainer(
consumerFactory: ConsumerFactory<String, String>,
- kafkaTargetProperties: KafkaTargetProperties,
+ kafkaProperties: KafkaProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
- val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
+ val containerProperties = ContainerProperties(kafkaProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
@@ -82,6 +83,26 @@ class AppKafkaConfiguration {
}
@Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaProperties: KafkaProperties,
+ kafkaInputListener: KafkaInputListener
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaProperties.inputTopic)
+ containerProperties.messageListener = kafkaInputListener
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = ["app.kafka.input-topic"])
+ fun kafkaInputListener(
+ applicationEventPublisher: ApplicationEventPublisher,
+ ): KafkaInputListener {
+ return KafkaInputListener(applicationEventPublisher)
+ }
+
+ @Bean
fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService {
return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer)
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
new file mode 100644
index 0000000..ee6b56e
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -0,0 +1,33 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.input
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.kafka.listener.MessageListener
+
+class KafkaInputListener(
+ private val applicationEventPublisher: ApplicationEventPublisher
+) : MessageListener<String, MtbFile> {
+ override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
+ TODO("Not yet implemented")
+ }
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 01c7d43..09edd31 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.config.KafkaProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
- private val kafkaTargetProperties: KafkaTargetProperties,
+ private val kafkaProperties: KafkaProperties,
private val retryTemplate: RetryTemplate,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -41,7 +41,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
@@ -71,7 +71,7 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
+ kafkaProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
@@ -90,7 +90,7 @@ class KafkaMtbFileSender(
}
override fun endpoint(): String {
- return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})"
+ return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})"
}
private fun key(request: MtbFileSender.MtbFileRequest): String {