diff options
| author | Paul-Christian Volkmer | 2026-03-12 11:23:32 +0100 |
|---|---|---|
| committer | GitHub | 2026-03-12 11:23:32 +0100 |
| commit | dc3aa929216b26d46ce134ac98c279a68d5679c1 (patch) | |
| tree | d86a42a14ee8bd35046d4304fb30e618f56c6d17 /src/test | |
| parent | a8f8d5f137c9776a20b2bc91cd3bdd99c9b96991 (diff) | |
feat: save error request for non json (#266)
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt | 494 |
1 files changed, 257 insertions, 237 deletions
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt index 222c92a..0d3f275 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt @@ -21,287 +21,307 @@ package dev.dnpm.etl.processor.input import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.CustomMediaType -import dev.dnpm.etl.processor.consent.ConsentEvaluation import dev.dnpm.etl.processor.consent.ConsentEvaluator import dev.dnpm.etl.processor.consent.TtpConsentStatus import dev.dnpm.etl.processor.services.RequestProcessor import dev.pcvolkmer.mv64e.mtb.* -import java.util.* import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.record.TimestampType +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.ArgumentCaptor import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.* +import org.mockito.kotlin.any +import org.mockito.kotlin.anyValueClass +import org.mockito.kotlin.firstValue +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import java.util.* @ExtendWith(MockitoExtension::class) class KafkaInputListenerTest { - private lateinit var requestProcessor: RequestProcessor - private lateinit var objectMapper: ObjectMapper + private lateinit var requestProcessor: RequestProcessor + private lateinit var objectMapper: ObjectMapper - private lateinit var kafkaInputListener: KafkaInputListener + private lateinit var kafkaInputListener: KafkaInputListener - @BeforeEach - fun setup( - @Mock requestProcessor: RequestProcessor, - @Mock consentEvaluator: ConsentEvaluator, - ) { - this.requestProcessor = requestProcessor - this.objectMapper = ObjectMapper() + @BeforeEach + fun setup( + @Mock requestProcessor: RequestProcessor, + @Mock consentEvaluator: ConsentEvaluator, + ) { + this.requestProcessor = requestProcessor + this.objectMapper = ObjectMapper() - this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper) - } + this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper) + } - @Test - fun shouldProcessMtbFileRequest() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.PERMIT) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessMtbFileRequest() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.PERMIT) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - kafkaInputListener.onMessage( - ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) - ) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) + ) - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) - } + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) + } - @Test - fun shouldProcessRequestEvenIfNoConsentInformation() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf() - ) - .build() - ) - .build() - ) - .build() + @Test + fun shouldProcessRequestEvenIfNoConsentInformation() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf() + ) + .build() + ) + .build() + ) + .build() - kafkaInputListener.onMessage( - ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) - ) - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) - } + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) + ) + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) + } - @Test - fun shouldProcessMtbFileRequestWithExistingRequestId() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.PERMIT) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessMtbFileRequestWithExistingRequestId() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.PERMIT) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + val headers = + RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), + ) ) - ) - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) + } - @Test - fun shouldProcessRequestWithoutConsentGiven() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessRequestWithoutConsentGiven() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + val headers = + RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), + ) ) - ) - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) + } - @Test - fun shouldProcessDnpmV2Request() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessDnpmV2Request() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders( - listOf( - RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), - RecordHeader( - "contentType", - CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), - ), + val headers = + RecordHeaders( + listOf( + RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), + RecordHeader( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ), + ) + ) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), ) ) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), - ) - ) - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) + } - @Test - fun shouldProcessDnpmV2DeleteRequest() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessDnpmV2DeleteRequest() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders( - listOf( - RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), - RecordHeader( - "requestMethod", - "DELETE".toByteArray(), - ), - RecordHeader( - "contentType", - CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), - ), + val headers = + RecordHeaders( + listOf( + RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), + RecordHeader( + "requestMethod", + "DELETE".toByteArray(), + ), + RecordHeader( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ), + ) + ) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), ) ) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + verify(requestProcessor, times(1)) + .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>()) + } + + @ParameterizedTest + @ValueSource(strings = ["[]", "null", "X", ""]) + fun shouldNotProcessNonJsonRecordBody(content: String) { + val mtbFile = + + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", content) ) - ) - verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>()) - } + + val result = verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) + assertThat(result).isFalse() + } } |
