summaryrefslogtreecommitdiff
path: root/src/main/kotlin/dev
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin/dev')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt1
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt4
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt16
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt70
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt4
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt87
6 files changed, 166 insertions, 16 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
index 0c4ab68..5d28c97 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
@@ -28,4 +28,3 @@ class EtlProcessorApplication
fun main(args: Array<String>) {
runApplication<EtlProcessorApplication>(*args)
}
-
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 64be70d..6502a1b 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -48,7 +48,7 @@ data class GPasConfigProperties(
val password: String?,
val sslCaLocation: String?,
-) {
+ ) {
companion object {
const val NAME = "app.pseudonymize.gpas"
}
@@ -66,6 +66,8 @@ data class RestTargetProperties(
@ConfigurationProperties(KafkaTargetProperties.NAME)
data class KafkaTargetProperties(
val topic: String = "etl-processor",
+ val responseTopic: String = "${topic}_response",
+ val groupId: String = "${topic}_group",
val servers: String = ""
) {
companion object {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index c677f2b..cbba1f1 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.ReportService
-import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
@@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.core.KafkaTemplate
import reactor.core.publisher.Sinks
@Configuration
@@ -60,7 +58,10 @@ class AppConfiguration {
}
@Bean
- fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService {
+ fun pseudonymizeService(
+ generator: Generator,
+ pseudonymizeConfigProperties: PseudonymizeConfigProperties
+ ): PseudonymizeService {
return PseudonymizeService(generator, pseudonymizeConfigProperties)
}
@@ -70,15 +71,6 @@ class AppConfiguration {
return RestMtbFileSender(restTargetProperties)
}
- @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
- @Bean
- fun kafkaMtbFileSender(
- kafkaTemplate: KafkaTemplate<String, String>,
- objectMapper: ObjectMapper
- ): MtbFileSender {
- return KafkaMtbFileSender(kafkaTemplate, objectMapper)
- }
-
@Bean
fun reportService(objectMapper: ObjectMapper): ReportService {
return ReportService(objectMapper)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
new file mode 100644
index 0000000..f81d3fb
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -0,0 +1,70 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.output.KafkaMtbFileSender
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.listener.ContainerProperties
+import org.springframework.kafka.listener.KafkaMessageListenerContainer
+
+@Configuration
+@EnableConfigurationProperties(
+ value = [KafkaTargetProperties::class]
+)
+@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+class AppKafkaConfiguration {
+
+ @Bean
+ fun kafkaMtbFileSender(
+ kafkaTemplate: KafkaTemplate<String, String>,
+ objectMapper: ObjectMapper
+ ): MtbFileSender {
+ return KafkaMtbFileSender(kafkaTemplate, objectMapper)
+ }
+
+ @Bean
+ fun kafkaListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
+ kafkaResponseProcessor: KafkaResponseProcessor
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic);
+ containerProperties.messageListener = kafkaResponseProcessor
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ fun kafkaResponseProcessor(
+ requestRepository: RequestRepository,
+ objectMapper: ObjectMapper
+ ): KafkaResponseProcessor {
+ return KafkaResponseProcessor(requestRepository, objectMapper)
+ }
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
index ecd8219..c1d4d43 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
@@ -36,9 +36,9 @@ data class Request(
val patientId: String,
val pid: String,
val fingerprint: String,
- val status: RequestStatus,
val type: RequestType,
- val processedAt: Instant = Instant.now(),
+ var status: RequestStatus,
+ var processedAt: Instant = Instant.now(),
@Embedded.Nullable var report: Report? = null
)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
new file mode 100644
index 0000000..f0c91cb
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -0,0 +1,87 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services.kafka
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.slf4j.LoggerFactory
+import org.springframework.kafka.listener.MessageListener
+import java.time.Instant
+
+class KafkaResponseProcessor(
+ private val requestRepository: RequestRepository,
+ private val objectMapper: ObjectMapper
+) : MessageListener<String, String> {
+
+ private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java)
+
+ override fun onMessage(data: ConsumerRecord<String, String>) {
+ try {
+ val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
+ requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
+ val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
+ when (responseBody.statusCode) {
+ 200 -> {
+ it.status = RequestStatus.SUCCESS
+ it.processedAt = Instant.ofEpochMilli(data.timestamp())
+ requestRepository.save(it)
+ }
+
+ 201 -> {
+ it.status = RequestStatus.WARNING
+ it.processedAt = Instant.ofEpochMilli(data.timestamp())
+ it.report = Report(
+ "Warnungen über mangelhafte Daten",
+ responseBody.statusBody
+ )
+ requestRepository.save(it)
+ }
+
+ 400, 422 -> {
+ it.status = RequestStatus.ERROR
+ it.processedAt = Instant.ofEpochMilli(data.timestamp())
+ it.report = Report(
+ "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
+ responseBody.statusBody
+ )
+ requestRepository.save(it)
+ }
+
+ else -> {
+ logger.error("Cannot process Kafka response: Unknown response code!")
+ }
+ }
+ }
+ } catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
+ }
+ }
+
+ data class ResponseKey(val requestId: String)
+
+ data class ResponseBody(
+ @JsonProperty("status code") val statusCode: Int,
+ @JsonProperty("status_body") val statusBody: String
+ )
+} \ No newline at end of file