summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-03-08 15:44:39 +0100
committerGitHub2024-03-08 15:44:39 +0100
commita0c4d1863f8b894d1e2937d1e4b2c2130dce0349 (patch)
treecb51ac309cf2bb552d0b01cba99c7f38c08f48e0 /src/main
parentbed91439db9fa485372e1c00f9b1622550f9f94c (diff)
parentfc1901211d64bcdd7f71d3d6425692944d99327d (diff)
Merge pull request #57 from CCC-MF/issue_56
feat: use requestId from incoming Kafka Record Header
Diffstat (limited to 'src/main')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt19
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt9
2 files changed, 24 insertions, 4 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index 63bf60a..de901ce 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -35,12 +35,27 @@ class KafkaInputListener(
override fun onMessage(data: ConsumerRecord<String, String>) {
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
+ val firstRequestIdHeader = data.headers().headers("requestId")?.firstOrNull()
+ val requestId = if (null != firstRequestIdHeader) {
+ String(firstRequestIdHeader.value())
+ } else {
+ ""
+ }
+
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
- requestProcessor.processMtbFile(mtbFile)
+ if (requestId.isBlank()) {
+ requestProcessor.processMtbFile(mtbFile)
+ } else {
+ requestProcessor.processMtbFile(mtbFile, requestId)
+ }
} else {
logger.debug("Accepted MTB File and process deletion")
- requestProcessor.processDeletion(mtbFile.patient.id)
+ if (requestId.isBlank()) {
+ requestProcessor.processDeletion(mtbFile.patient.id)
+ } else {
+ requestProcessor.processDeletion(mtbFile.patient.id, requestId)
+ }
}
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index d0b6341..66ff291 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -48,7 +48,10 @@ class RequestProcessor(
) {
fun processMtbFile(mtbFile: MtbFile) {
- val requestId = UUID.randomUUID().toString()
+ processMtbFile(mtbFile, UUID.randomUUID().toString())
+ }
+
+ fun processMtbFile(mtbFile: MtbFile, requestId: String) {
val pid = mtbFile.patient.id
mtbFile pseudonymizeWith pseudonymizeService
@@ -103,8 +106,10 @@ class RequestProcessor(
}
fun processDeletion(patientId: String) {
- val requestId = UUID.randomUUID().toString()
+ processDeletion(patientId, UUID.randomUUID().toString())
+ }
+ fun processDeletion(patientId: String, requestId: String) {
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)