summaryrefslogtreecommitdiff
path: root/src/test/kotlin
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-03-08 15:44:39 +0100
committerGitHub2024-03-08 15:44:39 +0100
commita0c4d1863f8b894d1e2937d1e4b2c2130dce0349 (patch)
treecb51ac309cf2bb552d0b01cba99c7f38c08f48e0 /src/test/kotlin
parentbed91439db9fa485372e1c00f9b1622550f9f94c (diff)
parentfc1901211d64bcdd7f71d3d6425692944d99327d (diff)
Merge pull request #57 from CCC-MF/issue_56
feat: use requestId from incoming Kafka Record Header
Diffstat (limited to 'src/test/kotlin')
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt33
1 files changed, 33 insertions, 0 deletions
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
index cf5ba39..1157644 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
@@ -25,6 +25,9 @@ import de.ukw.ccc.bwhc.dto.MtbFile
import de.ukw.ccc.bwhc.dto.Patient
import dev.dnpm.etl.processor.services.RequestProcessor
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.header.internals.RecordHeaders
+import org.apache.kafka.common.record.TimestampType
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@@ -34,6 +37,7 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
+import java.util.*
@ExtendWith(MockitoExtension::class)
class KafkaInputListenerTest {
@@ -76,4 +80,33 @@ class KafkaInputListenerTest {
verify(requestProcessor, times(1)).processDeletion(anyString())
}
+ @Test
+ fun shouldProcessMtbFileRequestWithExistingRequestId() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(Patient.builder().withId("DUMMY_12345678").build())
+ .withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build())
+ .build()
+
+ val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
+ )
+
+ verify(requestProcessor, times(1)).processMtbFile(any(), anyString())
+ }
+
+ @Test
+ fun shouldProcessDeleteRequestWithExistingRequestId() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(Patient.builder().withId("DUMMY_12345678").build())
+ .withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build())
+ .build()
+
+ val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
+ )
+ verify(requestProcessor, times(1)).processDeletion(anyString(), anyString())
+ }
+
} \ No newline at end of file