diff options
| author | Paul-Christian Volkmer | 2024-03-08 15:42:04 +0100 |
|---|---|---|
| committer | Paul-Christian Volkmer | 2024-03-08 15:42:04 +0100 |
| commit | fc1901211d64bcdd7f71d3d6425692944d99327d (patch) | |
| tree | cb51ac309cf2bb552d0b01cba99c7f38c08f48e0 /src/test/kotlin | |
| parent | bed91439db9fa485372e1c00f9b1622550f9f94c (diff) | |
feat: use requestId from incoming Kafka Record Header
Diffstat (limited to 'src/test/kotlin')
| -rw-r--r-- | src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt index cf5ba39..1157644 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt @@ -25,6 +25,9 @@ import de.ukw.ccc.bwhc.dto.MtbFile import de.ukw.ccc.bwhc.dto.Patient import dev.dnpm.etl.processor.services.RequestProcessor import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -34,6 +37,7 @@ import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.times import org.mockito.kotlin.verify +import java.util.* @ExtendWith(MockitoExtension::class) class KafkaInputListenerTest { @@ -76,4 +80,33 @@ class KafkaInputListenerTest { verify(requestProcessor, times(1)).processDeletion(anyString()) } + @Test + fun shouldProcessMtbFileRequestWithExistingRequestId() { + val mtbFile = MtbFile.builder() + .withPatient(Patient.builder().withId("DUMMY_12345678").build()) + .withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build()) + .build() + + val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty()) + ) + + verify(requestProcessor, times(1)).processMtbFile(any(), anyString()) + } + + @Test + fun shouldProcessDeleteRequestWithExistingRequestId() { + val mtbFile = MtbFile.builder() + .withPatient(Patient.builder().withId("DUMMY_12345678").build()) + .withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build()) + .build() + + val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty()) + ) + verify(requestProcessor, times(1)).processDeletion(anyString(), anyString()) + } + }
\ No newline at end of file |
