summaryrefslogtreecommitdiff
path: root/src/main/kotlin/dev
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-02-29 12:49:06 +0100
committerPaul-Christian Volkmer2024-02-29 12:49:06 +0100
commit952ad8c0cfc64cf9c5e02f1b4f7fc2466f9f2bb3 (patch)
tree2f92d63c3d98fa013885513ae081b93a221f0ded /src/main/kotlin/dev
parent3e45bf84940b1e94c642bf3a88e94514e5a83c41 (diff)
test: add test for incoming kafka message processing
Diffstat (limited to 'src/main/kotlin/dev')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt5
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt15
2 files changed, 12 insertions, 8 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 1ff5e58..3799762 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -98,9 +98,10 @@ class AppKafkaConfiguration {
@Bean
@ConditionalOnProperty(value = ["app.kafka.input-topic"])
fun kafkaInputListener(
- requestProcessor: RequestProcessor
+ requestProcessor: RequestProcessor,
+ objectMapper: ObjectMapper
): KafkaInputListener {
- return KafkaInputListener(requestProcessor)
+ return KafkaInputListener(requestProcessor, objectMapper)
}
@Bean
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index f3c0ab4..63bf60a 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -19,6 +19,7 @@
package dev.dnpm.etl.processor.input
+import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.services.RequestProcessor
@@ -27,17 +28,19 @@ import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
class KafkaInputListener(
- private val requestProcessor: RequestProcessor
-) : MessageListener<String, MtbFile> {
+ private val requestProcessor: RequestProcessor,
+ private val objectMapper: ObjectMapper
+) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaInputListener::class.java)
- override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
- if (data.value().consent.status == Consent.Status.ACTIVE) {
+ override fun onMessage(data: ConsumerRecord<String, String>) {
+ val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
+ if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
- requestProcessor.processMtbFile(data.value())
+ requestProcessor.processMtbFile(mtbFile)
} else {
logger.debug("Accepted MTB File and process deletion")
- requestProcessor.processDeletion(data.value().patient.id)
+ requestProcessor.processDeletion(mtbFile.patient.id)
}
}
} \ No newline at end of file