summaryrefslogtreecommitdiff
path: root/src/test/kotlin/dev/dnpm
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/kotlin/dev/dnpm')
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt494
1 files changed, 257 insertions, 237 deletions
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
index 222c92a..0d3f275 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
@@ -21,287 +21,307 @@ package dev.dnpm.etl.processor.input
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.CustomMediaType
-import dev.dnpm.etl.processor.consent.ConsentEvaluation
import dev.dnpm.etl.processor.consent.ConsentEvaluator
import dev.dnpm.etl.processor.consent.TtpConsentStatus
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.pcvolkmer.mv64e.mtb.*
-import java.util.*
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.record.TimestampType
+import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import org.mockito.ArgumentCaptor
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
-import org.mockito.kotlin.*
+import org.mockito.kotlin.any
+import org.mockito.kotlin.anyValueClass
+import org.mockito.kotlin.firstValue
+import org.mockito.kotlin.times
+import org.mockito.kotlin.verify
+import java.util.*
@ExtendWith(MockitoExtension::class)
class KafkaInputListenerTest {
- private lateinit var requestProcessor: RequestProcessor
- private lateinit var objectMapper: ObjectMapper
+ private lateinit var requestProcessor: RequestProcessor
+ private lateinit var objectMapper: ObjectMapper
- private lateinit var kafkaInputListener: KafkaInputListener
+ private lateinit var kafkaInputListener: KafkaInputListener
- @BeforeEach
- fun setup(
- @Mock requestProcessor: RequestProcessor,
- @Mock consentEvaluator: ConsentEvaluator,
- ) {
- this.requestProcessor = requestProcessor
- this.objectMapper = ObjectMapper()
+ @BeforeEach
+ fun setup(
+ @Mock requestProcessor: RequestProcessor,
+ @Mock consentEvaluator: ConsentEvaluator,
+ ) {
+ this.requestProcessor = requestProcessor
+ this.objectMapper = ObjectMapper()
- this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper)
- }
+ this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper)
+ }
- @Test
- fun shouldProcessMtbFileRequest() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.PERMIT)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
+ @Test
+ fun shouldProcessMtbFileRequest() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.PERMIT)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
)
- )
- .build()
- )
- .build()
- )
- .build()
+ .build()
+ )
+ .build()
+ )
+ .build()
- kafkaInputListener.onMessage(
- ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile))
- )
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile))
+ )
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
- }
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
+ }
- @Test
- fun shouldProcessRequestEvenIfNoConsentInformation() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf()
- )
- .build()
- )
- .build()
- )
- .build()
+ @Test
+ fun shouldProcessRequestEvenIfNoConsentInformation() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf()
+ )
+ .build()
+ )
+ .build()
+ )
+ .build()
- kafkaInputListener.onMessage(
- ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile))
- )
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
- }
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile))
+ )
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
+ }
- @Test
- fun shouldProcessMtbFileRequestWithExistingRequestId() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.PERMIT)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
+ @Test
+ fun shouldProcessMtbFileRequestWithExistingRequestId() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.PERMIT)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
)
- )
- .build()
- )
- .build()
- )
- .build()
+ .build()
+ )
+ .build()
+ )
+ .build()
- val headers =
- RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
- kafkaInputListener.onMessage(
- ConsumerRecord(
- "testtopic",
- 0,
- 0,
- -1L,
- TimestampType.NO_TIMESTAMP_TYPE,
- -1,
- -1,
- "",
- this.objectMapper.writeValueAsString(mtbFile),
- headers,
- Optional.empty(),
+ val headers =
+ RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord(
+ "testtopic",
+ 0,
+ 0,
+ -1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ -1,
+ -1,
+ "",
+ this.objectMapper.writeValueAsString(mtbFile),
+ headers,
+ Optional.empty(),
+ )
)
- )
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
- }
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
+ }
- @Test
- fun shouldProcessRequestWithoutConsentGiven() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.DENY)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
+ @Test
+ fun shouldProcessRequestWithoutConsentGiven() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.DENY)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
)
- )
- .build()
- )
- .build()
- )
- .build()
+ .build()
+ )
+ .build()
+ )
+ .build()
- val headers =
- RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
- kafkaInputListener.onMessage(
- ConsumerRecord(
- "testtopic",
- 0,
- 0,
- -1L,
- TimestampType.NO_TIMESTAMP_TYPE,
- -1,
- -1,
- "",
- this.objectMapper.writeValueAsString(mtbFile),
- headers,
- Optional.empty(),
+ val headers =
+ RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
+ kafkaInputListener.onMessage(
+ ConsumerRecord(
+ "testtopic",
+ 0,
+ 0,
+ -1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ -1,
+ -1,
+ "",
+ this.objectMapper.writeValueAsString(mtbFile),
+ headers,
+ Optional.empty(),
+ )
)
- )
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
- }
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
+ }
- @Test
- fun shouldProcessDnpmV2Request() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.DENY)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
+ @Test
+ fun shouldProcessDnpmV2Request() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.DENY)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
)
- )
- .build()
- )
- .build()
- )
- .build()
+ .build()
+ )
+ .build()
+ )
+ .build()
- val headers =
- RecordHeaders(
- listOf(
- RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
- RecordHeader(
- "contentType",
- CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
- ),
+ val headers =
+ RecordHeaders(
+ listOf(
+ RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
+ RecordHeader(
+ "contentType",
+ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
+ ),
+ )
+ )
+ kafkaInputListener.onMessage(
+ ConsumerRecord(
+ "testtopic",
+ 0,
+ 0,
+ -1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ -1,
+ -1,
+ "",
+ this.objectMapper.writeValueAsString(mtbFile),
+ headers,
+ Optional.empty(),
)
)
- kafkaInputListener.onMessage(
- ConsumerRecord(
- "testtopic",
- 0,
- 0,
- -1L,
- TimestampType.NO_TIMESTAMP_TYPE,
- -1,
- -1,
- "",
- this.objectMapper.writeValueAsString(mtbFile),
- headers,
- Optional.empty(),
- )
- )
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
- }
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
+ }
- @Test
- fun shouldProcessDnpmV2DeleteRequest() {
- val mtbFile =
- Mtb.builder()
- .patient(Patient.builder().id("DUMMY_12345678").build())
- .metadata(
- MvhMetadata.builder()
- .modelProjectConsent(
- ModelProjectConsent.builder()
- .provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.DENY)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
+ @Test
+ fun shouldProcessDnpmV2DeleteRequest() {
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.DENY)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
)
- )
- .build()
- )
- .build()
- )
- .build()
+ .build()
+ )
+ .build()
+ )
+ .build()
- val headers =
- RecordHeaders(
- listOf(
- RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
- RecordHeader(
- "requestMethod",
- "DELETE".toByteArray(),
- ),
- RecordHeader(
- "contentType",
- CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
- ),
+ val headers =
+ RecordHeaders(
+ listOf(
+ RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
+ RecordHeader(
+ "requestMethod",
+ "DELETE".toByteArray(),
+ ),
+ RecordHeader(
+ "contentType",
+ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
+ ),
+ )
+ )
+ kafkaInputListener.onMessage(
+ ConsumerRecord(
+ "testtopic",
+ 0,
+ 0,
+ -1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ -1,
+ -1,
+ "",
+ this.objectMapper.writeValueAsString(mtbFile),
+ headers,
+ Optional.empty(),
)
)
- kafkaInputListener.onMessage(
- ConsumerRecord(
- "testtopic",
- 0,
- 0,
- -1L,
- TimestampType.NO_TIMESTAMP_TYPE,
- -1,
- -1,
- "",
- this.objectMapper.writeValueAsString(mtbFile),
- headers,
- Optional.empty(),
+ verify(requestProcessor, times(1))
+ .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>())
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = ["[]", "null", "X", ""])
+ fun shouldNotProcessNonJsonRecordBody(content: String) {
+ val mtbFile =
+
+ kafkaInputListener.onMessage(
+ ConsumerRecord("testtopic", 0, 0, "", content)
)
- )
- verify(requestProcessor, times(1))
- .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>())
- }
+
+ val result = verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
+ assertThat(result).isFalse()
+ }
}