summaryrefslogtreecommitdiff
path: root/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt5
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt15
2 files changed, 12 insertions, 8 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 1ff5e58..3799762 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -98,9 +98,10 @@ class AppKafkaConfiguration {
@Bean
@ConditionalOnProperty(value = ["app.kafka.input-topic"])
fun kafkaInputListener(
- requestProcessor: RequestProcessor
+ requestProcessor: RequestProcessor,
+ objectMapper: ObjectMapper
): KafkaInputListener {
- return KafkaInputListener(requestProcessor)
+ return KafkaInputListener(requestProcessor, objectMapper)
}
@Bean
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index f3c0ab4..63bf60a 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -19,6 +19,7 @@
package dev.dnpm.etl.processor.input
+import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.services.RequestProcessor
@@ -27,17 +28,19 @@ import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
class KafkaInputListener(
- private val requestProcessor: RequestProcessor
-) : MessageListener<String, MtbFile> {
+ private val requestProcessor: RequestProcessor,
+ private val objectMapper: ObjectMapper
+) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaInputListener::class.java)
- override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
- if (data.value().consent.status == Consent.Status.ACTIVE) {
+ override fun onMessage(data: ConsumerRecord<String, String>) {
+ val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
+ if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
- requestProcessor.processMtbFile(data.value())
+ requestProcessor.processMtbFile(mtbFile)
} else {
logger.debug("Accepted MTB File and process deletion")
- requestProcessor.processDeletion(data.value().patient.id)
+ requestProcessor.processDeletion(mtbFile.patient.id)
}
}
} \ No newline at end of file