diff options
| author | Paul-Christian Volkmer | 2026-01-07 10:47:12 +0100 |
|---|---|---|
| committer | GitHub | 2026-01-07 10:47:12 +0100 |
| commit | c23e9d790e500d17f2f19252dcd7f3a13cd098e3 (patch) | |
| tree | 619527375362019bb78dfca41e3997ead954d9d4 | |
| parent | d4ef16c115b8429637f933038254646a61dd81b1 (diff) | |
feat!: do not delete patient information if no consent is given (#236)
6 files changed, 115 insertions, 83 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt index 2f6b2bb..41c85ac 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -79,21 +79,28 @@ class KafkaInputListener( } else { RequestId("") } - - if (consentEvaluator.check(mtbFile).hasConsent()) { - logger.debug("Accepted MTB File for processing") - if (requestId.isBlank()) { - requestProcessor.processMtbFile(mtbFile) + val firstRequestMethodHeader = record.headers().headers("requestMethod")?.firstOrNull() + val requestMethod = + if (null != firstRequestMethodHeader) { + String(firstRequestMethodHeader.value()) } else { - requestProcessor.processMtbFile(mtbFile, requestId) + "" } - } else { + + if (requestMethod == "DELETE") { logger.debug("Accepted MTB File and process deletion") if (requestId.isBlank()) { requestProcessor.processDeletion(patientId, TtpConsentStatus.UNKNOWN_CHECK_FILE) } else { requestProcessor.processDeletion(patientId, requestId, TtpConsentStatus.UNKNOWN_CHECK_FILE) } + } else { + logger.debug("Accepted MTB File for processing") + if (requestId.isBlank()) { + requestProcessor.processMtbFile(mtbFile) + } else { + requestProcessor.processMtbFile(mtbFile, requestId) + } } } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt index 071e3cd..f4ab194 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt @@ -52,15 +52,8 @@ class MtbFileRestController( ], ) fun mtbFile(@RequestBody mtbFile: Mtb): ResponseEntity<Unit> { - val consentEvaluation = consentEvaluator.check(mtbFile) - if (consentEvaluation.hasConsent()) { - logger.debug("Accepted MTB File (DNPM V2) for processing") - requestProcessor.processMtbFile(mtbFile) - } else { - logger.debug("Accepted MTB File (DNPM V2) and process deletion") - val patientId = PatientId(mtbFile.patient.id) - requestProcessor.processDeletion(patientId, consentEvaluation.getStatus()) - } + logger.debug("Accepted MTB File (DNPM V2) for processing") + requestProcessor.processMtbFile(mtbFile) return ResponseEntity.accepted().build() } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 71e4a78..bcd532f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -48,6 +48,7 @@ class KafkaMtbFileSender( objectMapper.writeValueAsString(request.content), ) record.headers().add("requestId", request.requestId.value.toByteArray()) + record.headers().add("requestMethod", "POST".toByteArray()) when (request) { is DnpmV2MtbFileRequest -> record @@ -86,6 +87,13 @@ class KafkaMtbFileSender( ), ) record.headers().add("requestId", request.requestId.value.toByteArray()) + record.headers().add("requestMethod", "DELETE".toByteArray()) + record + .headers() + .add( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ) val result = kafkaTemplate.send(record) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt index da05fba..222c92a 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt @@ -42,7 +42,6 @@ import org.mockito.kotlin.* class KafkaInputListenerTest { private lateinit var requestProcessor: RequestProcessor - private lateinit var consentEvaluator: ConsentEvaluator private lateinit var objectMapper: ObjectMapper private lateinit var kafkaInputListener: KafkaInputListener @@ -53,7 +52,6 @@ class KafkaInputListenerTest { @Mock consentEvaluator: ConsentEvaluator, ) { this.requestProcessor = requestProcessor - this.consentEvaluator = consentEvaluator this.objectMapper = ObjectMapper() this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper) @@ -61,9 +59,6 @@ class KafkaInputListenerTest { @Test fun shouldProcessMtbFileRequest() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true)) - val mtbFile = Mtb.builder() .patient(Patient.builder().id("DUMMY_12345678").build()) @@ -93,10 +88,7 @@ class KafkaInputListenerTest { } @Test - fun shouldProcessDeleteRequest() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false)) - + fun shouldProcessRequestEvenIfNoConsentInformation() { val mtbFile = Mtb.builder() .patient(Patient.builder().id("DUMMY_12345678").build()) @@ -105,12 +97,7 @@ class KafkaInputListenerTest { .modelProjectConsent( ModelProjectConsent.builder() .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() - ) + listOf() ) .build() ) @@ -121,16 +108,11 @@ class KafkaInputListenerTest { kafkaInputListener.onMessage( ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) ) - - verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE)) + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) } @Test fun shouldProcessMtbFileRequestWithExistingRequestId() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true)) - val mtbFile = Mtb.builder() .patient(Patient.builder().id("DUMMY_12345678").build()) @@ -174,10 +156,7 @@ class KafkaInputListenerTest { } @Test - fun shouldProcessDeleteRequestWithExistingRequestId() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false)) - + fun shouldProcessRequestWithoutConsentGiven() { val mtbFile = Mtb.builder() .patient(Patient.builder().id("DUMMY_12345678").build()) @@ -216,15 +195,62 @@ class KafkaInputListenerTest { Optional.empty(), ) ) - verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE)) + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) } @Test fun shouldProcessDnpmV2Request() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false)) + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) + ) + .build() + ) + .build() + ) + .build() + val headers = + RecordHeaders( + listOf( + RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), + RecordHeader( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ), + ) + ) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), + ) + ) + verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass()) + } + + @Test + fun shouldProcessDnpmV2DeleteRequest() { val mtbFile = Mtb.builder() .patient(Patient.builder().id("DUMMY_12345678").build()) @@ -251,6 +277,10 @@ class KafkaInputListenerTest { listOf( RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), RecordHeader( + "requestMethod", + "DELETE".toByteArray(), + ), + RecordHeader( "contentType", CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), ), @@ -272,6 +302,6 @@ class KafkaInputListenerTest { ) ) verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE)) + .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>()) } } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt index ce72ba3..f3d669b 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt @@ -74,9 +74,6 @@ class MtbFileRestControllerTest { @Test fun shouldRespondPostRequest() { - whenever(consentEvaluator.check(any())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true)) - val mtbFileContent = ClassPathResource("mv64e-mtb-fake-patient.json") .inputStream @@ -95,10 +92,7 @@ class MtbFileRestControllerTest { @ParameterizedTest @ArgumentsSource(Dnpm21MtbFile::class) - fun shouldProcessPostRequest(mtb: Mtb, broadConsent: TtpConsentStatus, shouldProcess: String) { - whenever(consentEvaluator.check(any<Mtb>())) - .thenReturn(ConsentEvaluation(broadConsent, shouldProcess == "process")) - + fun shouldProcessPostRequest(mtb: Mtb) { mockMvc .post("/mtbfile") { content = objectMapper.writeValueAsString(mtb) @@ -106,12 +100,6 @@ class MtbFileRestControllerTest { } .andExpect { status { isAccepted() } } - if (shouldProcess == "process") { - verify(requestProcessor, times(1)).processMtbFile(any<Mtb>()) - } else { - verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), org.mockito.kotlin.eq(broadConsent)) - } } @ParameterizedTest @@ -129,9 +117,6 @@ class MtbFileRestControllerTest { ] ) fun shouldAcceptPostRequests(url: String) { - whenever(consentEvaluator.check(any<Mtb>())) - .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true)) - val mtb = buildMtb( MvhMetadata.builder() @@ -197,29 +182,15 @@ class MtbFileRestControllerTest { class Dnpm21MtbFile : ArgProvider( - // No Metadata and no broad consent => delete - Arguments.of(buildMtb(null), TtpConsentStatus.BROAD_CONSENT_MISSING_OR_REJECTED, "delete"), // No Metadata and broad consent given => process Arguments.of(buildMtb(null), TtpConsentStatus.BROAD_CONSENT_GIVEN, "process"), - // No model project consent and no broad consent => delete - Arguments.of( - buildMtb( - MvhMetadata.builder() - .modelProjectConsent(ModelProjectConsent.builder().build()) - .build() - ), - TtpConsentStatus.BROAD_CONSENT_MISSING_OR_REJECTED, - "delete", - ), // No model project consent and broad consent given => process Arguments.of( buildMtb( MvhMetadata.builder() .modelProjectConsent(ModelProjectConsent.builder().build()) .build() - ), - TtpConsentStatus.BROAD_CONSENT_GIVEN, - "process", + ) ), // Model project consent given and no broad consent => process Arguments.of( @@ -239,9 +210,7 @@ class Dnpm21MtbFile : .build() ) .build() - ), - TtpConsentStatus.UNKNOWN_CHECK_FILE, - "process", + ) ), // Model project consent given and broad consent given => process Arguments.of( @@ -261,9 +230,7 @@ class Dnpm21MtbFile : .build() ) .build() - ), - TtpConsentStatus.BROAD_CONSENT_GIVEN, - "process", + ) ), ) { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index b1cd5fa..8adc21c 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -172,11 +172,38 @@ class KafkaMtbFileSenderTest { assertThat(captor.firstValue.headers().headers("requestId")).isNotNull assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()) .isEqualTo(TEST_REQUEST_ID.value.toByteArray()) + assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull + assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value()) + .isEqualTo("POST".toByteArray()) assertThat(captor.firstValue.value()).isNotNull assertThat(captor.firstValue.value()) .isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID))) } + @Test + fun shouldSendDeleteRequestWithCorrectKeyAndHeaderAndBody() { + doAnswer { completedFuture(SendResult<String, String>(null, null)) } + .whenever(kafkaTemplate) + .send(any<ProducerRecord<String, String>>()) + + kafkaMtbFileSender.send(DeleteRequest(TEST_REQUEST_ID, PatientPseudonym("PID"))) + + val captor = argumentCaptor<ProducerRecord<String, String>>() + verify(kafkaTemplate, times(1)).send(captor.capture()) + assertThat(captor.firstValue.key()).isNotNull + assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}") + assertThat(captor.firstValue.headers().headers("contentType")).isNotNull + assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()) + .isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) + assertThat(captor.firstValue.headers().headers("requestId")).isNotNull + assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()) + .isEqualTo(TEST_REQUEST_ID.value.toByteArray()) + assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull + assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value()) + .isEqualTo("DELETE".toByteArray()) + assertThat(captor.firstValue.value()).isNotNull + } + @ParameterizedTest @MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource") fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) { |
