diff options
| author | Paul-Christian Volkmer | 2024-03-08 15:42:04 +0100 |
|---|---|---|
| committer | Paul-Christian Volkmer | 2024-03-08 15:42:04 +0100 |
| commit | fc1901211d64bcdd7f71d3d6425692944d99327d (patch) | |
| tree | cb51ac309cf2bb552d0b01cba99c7f38c08f48e0 /src/main/kotlin/dev | |
| parent | bed91439db9fa485372e1c00f9b1622550f9f94c (diff) | |
feat: use requestId from incoming Kafka Record Header
Diffstat (limited to 'src/main/kotlin/dev')
| -rw-r--r-- | src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt | 19 | ||||
| -rw-r--r-- | src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 9 |
2 files changed, 24 insertions, 4 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt index 63bf60a..de901ce 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -35,12 +35,27 @@ class KafkaInputListener( override fun onMessage(data: ConsumerRecord<String, String>) { val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java) + val firstRequestIdHeader = data.headers().headers("requestId")?.firstOrNull() + val requestId = if (null != firstRequestIdHeader) { + String(firstRequestIdHeader.value()) + } else { + "" + } + if (mtbFile.consent.status == Consent.Status.ACTIVE) { logger.debug("Accepted MTB File for processing") - requestProcessor.processMtbFile(mtbFile) + if (requestId.isBlank()) { + requestProcessor.processMtbFile(mtbFile) + } else { + requestProcessor.processMtbFile(mtbFile, requestId) + } } else { logger.debug("Accepted MTB File and process deletion") - requestProcessor.processDeletion(mtbFile.patient.id) + if (requestId.isBlank()) { + requestProcessor.processDeletion(mtbFile.patient.id) + } else { + requestProcessor.processDeletion(mtbFile.patient.id, requestId) + } } } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index d0b6341..66ff291 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -48,7 +48,10 @@ class RequestProcessor( ) { fun processMtbFile(mtbFile: MtbFile) { - val requestId = UUID.randomUUID().toString() + processMtbFile(mtbFile, UUID.randomUUID().toString()) + } + + fun processMtbFile(mtbFile: MtbFile, requestId: String) { val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService @@ -103,8 +106,10 @@ class RequestProcessor( } fun processDeletion(patientId: String) { - val requestId = UUID.randomUUID().toString() + processDeletion(patientId, UUID.randomUUID().toString()) + } + fun processDeletion(patientId: String, requestId: String) { try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) |
