summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-03-08 15:42:04 +0100
committerPaul-Christian Volkmer2024-03-08 15:42:04 +0100
commitfc1901211d64bcdd7f71d3d6425692944d99327d (patch)
treecb51ac309cf2bb552d0b01cba99c7f38c08f48e0
parentbed91439db9fa485372e1c00f9b1622550f9f94c (diff)
feat: use requestId from incoming Kafka Record Header
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt19
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt9
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt33
3 files changed, 57 insertions, 4 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index 63bf60a..de901ce 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -35,12 +35,27 @@ class KafkaInputListener(
override fun onMessage(data: ConsumerRecord<String, String>) {
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
+ val firstRequestIdHeader = data.headers().headers("requestId")?.firstOrNull()
+ val requestId = if (null != firstRequestIdHeader) {
+ String(firstRequestIdHeader.value())
+ } else {
+ ""
+ }
+
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
- requestProcessor.processMtbFile(mtbFile)
+ if (requestId.isBlank()) {
+ requestProcessor.processMtbFile(mtbFile)
+ } else {
+ requestProcessor.processMtbFile(mtbFile, requestId)
+ }
} else {
logger.debug("Accepted MTB File and process deletion")
- requestProcessor.processDeletion(mtbFile.patient.id)
+ if (requestId.isBlank()) {
+ requestProcessor.processDeletion(mtbFile.patient.id)
+ } else {
+ requestProcessor.processDeletion(mtbFile.patient.id, requestId)
+ }
}
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index d0b6341..66ff291 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -48,7 +48,10 @@ class RequestProcessor(
) {
fun processMtbFile(mtbFile: MtbFile) {
- val requestId = UUID.randomUUID().toString()
+ processMtbFile(mtbFile, UUID.randomUUID().toString())
+ }
+
+ fun processMtbFile(mtbFile: MtbFile, requestId: String) {
val pid = mtbFile.patient.id
mtbFile pseudonymizeWith pseudonymizeService
@@ -103,8 +106,10 @@ class RequestProcessor(
}
fun processDeletion(patientId: String) {
- val requestId = UUID.randomUUID().toString()
+ processDeletion(patientId, UUID.randomUUID().toString())
+ }
+ fun processDeletion(patientId: String, requestId: String) {
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
index cf5ba39..1157644 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
@@ -25,6 +25,9 @@ import de.ukw.ccc.bwhc.dto.MtbFile
import de.ukw.ccc.bwhc.dto.Patient
import dev.dnpm.etl.processor.services.RequestProcessor
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.apache.kafka.common.record.TimestampType
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@@ -34,6 +37,7 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
+import java.util.*
@ExtendWith(MockitoExtension::class)
class KafkaInputListenerTest {
@@ -76,4 +80,33 @@ class KafkaInputListenerTest {
verify(requestProcessor, times(1)).processDeletion(anyString())
}
+ @Test
+ fun shouldProcessMtbFileRequestWithExistingRequestId() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(Patient.builder().withId("DUMMY_12345678").build())
+ .withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build())
+ .build()
+
+ val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
+ )
+
+ verify(requestProcessor, times(1)).processMtbFile(any(), anyString())
+ }
+
+ @Test
+ fun shouldProcessDeleteRequestWithExistingRequestId() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(Patient.builder().withId("DUMMY_12345678").build())
+ .withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build())
+ .build()
+
+ val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
+ )
+ verify(requestProcessor, times(1)).processDeletion(anyString(), anyString())
+ }
+
} \ No newline at end of file