diff options
Diffstat (limited to 'src/main/kotlin/dev/dnpm')
4 files changed, 81 insertions, 16 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 2290d44..2eea92e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -72,10 +72,21 @@ data class RestTargetProperties( } } -@ConfigurationProperties(KafkaTargetProperties.NAME) -data class KafkaTargetProperties( - val topic: String = "etl-processor", - val responseTopic: String = "${topic}_response", +@ConfigurationProperties(KafkaProperties.NAME) +data class KafkaProperties( + val inputTopic: String?, + val outputTopic: String = "etl-processor", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputTopic" + ) + val topic: String = outputTopic, + val outputResponseTopic: String = "${outputTopic}_response", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputResponseTopic" + ) + val responseTopic: String = outputResponseTopic, val groupId: String = "${topic}_group", val servers: String = "" ) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 68b86b2..0977527 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.input.KafkaInputListener import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService import dev.dnpm.etl.processor.output.KafkaMtbFileSender @@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks @Configuration @EnableConfigurationProperties( - value = [KafkaTargetProperties::class] + value = [KafkaProperties::class] ) -@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnProperty(value = ["app.kafka.servers"]) @ConditionalOnMissingBean(MtbFileSender::class) @Order(-5) class AppKafkaConfiguration { @@ -54,21 +55,21 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate<String, String>, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, retryTemplate: RetryTemplate, objectMapper: ObjectMapper ): MtbFileSender { logger.info("Selected 'KafkaMtbFileSender'") - return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper) } @Bean - fun kafkaListenerContainer( + fun kafkaResponseListenerContainer( consumerFactory: ConsumerFactory<String, String>, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer<String, String> { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) + val containerProperties = ContainerProperties(kafkaProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } @@ -82,6 +83,26 @@ class AppKafkaConfiguration { } @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListenerContainer( + consumerFactory: ConsumerFactory<String, String>, + kafkaProperties: KafkaProperties, + kafkaInputListener: KafkaInputListener + ): KafkaMessageListenerContainer<String, String> { + val containerProperties = ContainerProperties(kafkaProperties.inputTopic) + containerProperties.messageListener = kafkaInputListener + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListener( + applicationEventPublisher: ApplicationEventPublisher, + ): KafkaInputListener { + return KafkaInputListener(applicationEventPublisher) + } + + @Bean fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService { return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt new file mode 100644 index 0000000..ee6b56e --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -0,0 +1,33 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.input + +import de.ukw.ccc.bwhc.dto.MtbFile +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.context.ApplicationEventPublisher +import org.springframework.kafka.listener.MessageListener + +class KafkaInputListener( + private val applicationEventPublisher: ApplicationEventPublisher +) : MessageListener<String, MtbFile> { + override fun onMessage(data: ConsumerRecord<String, MtbFile>) { + TODO("Not yet implemented") + } +}
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 01c7d43..09edd31 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate<String, String>, - private val kafkaTargetProperties: KafkaTargetProperties, + private val kafkaProperties: KafkaProperties, private val retryTemplate: RetryTemplate, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -41,7 +41,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute<MtbFileSender.Response, Exception> { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) @@ -71,7 +71,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute<MtbFileSender.Response, Exception> { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) @@ -90,7 +90,7 @@ class KafkaMtbFileSender( } override fun endpoint(): String { - return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})" + return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})" } private fun key(request: MtbFileSender.MtbFileRequest): String { |
