From dc3aa929216b26d46ce134ac98c279a68d5679c1 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 12 Mar 2026 11:23:32 +0100 Subject: feat: save error request for non json (#266) --- .../processor/input/MtbFileRestControllerTest.kt | 21 + .../dnpm/etl/processor/input/KafkaInputListener.kt | 61 +-- .../etl/processor/input/MtbFileRestController.kt | 17 + .../etl/processor/input/KafkaInputListenerTest.kt | 494 +++++++++++---------- 4 files changed, 328 insertions(+), 265 deletions(-) (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt index ed9d910..e0e8162 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.input import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.CustomMediaType import dev.dnpm.etl.processor.config.AppSecurityConfiguration import dev.dnpm.etl.processor.consent.ConsentEvaluation import dev.dnpm.etl.processor.consent.ConsentEvaluator @@ -29,11 +30,14 @@ import dev.dnpm.etl.processor.security.TokenRepository import dev.dnpm.etl.processor.security.UserRoleRepository import dev.dnpm.etl.processor.services.RequestProcessor import dev.pcvolkmer.mv64e.mtb.* +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Nested import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource +import org.mockito.ArgumentCaptor +import org.mockito.Mockito import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.* import org.springframework.beans.factory.annotation.Autowired @@ -58,6 +62,7 @@ import java.util.* classes = [ MtbFileRestController::class, + MtbFileRestControllerAdvice::class, AppSecurityConfiguration::class, MtbFileConsentService::class, ] @@ -253,6 +258,22 @@ class MtbFileRestControllerTest { verify(requestProcessor, never()).processDeletion(anyValueClass(), any()) } + + @ParameterizedTest + @ValueSource(strings = ["[]", "null", "X", ""]) + fun shouldNotAcceptNonJsonObjectPostRequestContent(requestContent: String) { + mockMvc + .post("/mtbfile") { + content = requestContent + contentType = CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON + with(user("onkostarserver").roles("MTBFILE")) + } + .andExpect { status { isBadRequest() } } + + val result = verify(requestProcessor, times(1)).processMtbFile(any()) + assertThat(result).isFalse() + } + @Nested @MockitoBean(types = [UserRoleRepository::class, ClientRegistrationRepository::class]) @TestPropertySource( diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt index 41c85ac..03cd03d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -70,37 +70,42 @@ class KafkaInputListener( } private fun handleDnpmV2Message(record: ConsumerRecord) { - val mtbFile = objectMapper.readValue(record.value(), Mtb::class.java) - val patientId = PatientId(mtbFile.patient.id) - val firstRequestIdHeader = record.headers().headers("requestId")?.firstOrNull() - val requestId = - if (null != firstRequestIdHeader) { - RequestId(String(firstRequestIdHeader.value())) - } else { - RequestId("") - } - val firstRequestMethodHeader = record.headers().headers("requestMethod")?.firstOrNull() - val requestMethod = - if (null != firstRequestMethodHeader) { - String(firstRequestMethodHeader.value()) - } else { - "" - } + try { + val mtbFile = objectMapper.readValue(record.value(), Mtb::class.java) + val patientId = PatientId(mtbFile.patient.id) + val firstRequestIdHeader = record.headers().headers("requestId")?.firstOrNull() + val requestId = + if (null != firstRequestIdHeader) { + RequestId(String(firstRequestIdHeader.value())) + } else { + RequestId("") + } + val firstRequestMethodHeader = record.headers().headers("requestMethod")?.firstOrNull() + val requestMethod = + if (null != firstRequestMethodHeader) { + String(firstRequestMethodHeader.value()) + } else { + "" + } - if (requestMethod == "DELETE") { - logger.debug("Accepted MTB File and process deletion") - if (requestId.isBlank()) { - requestProcessor.processDeletion(patientId, TtpConsentStatus.UNKNOWN_CHECK_FILE) - } else { - requestProcessor.processDeletion(patientId, requestId, TtpConsentStatus.UNKNOWN_CHECK_FILE) - } - } else { - logger.debug("Accepted MTB File for processing") - if (requestId.isBlank()) { - requestProcessor.processMtbFile(mtbFile) + if (requestMethod == "DELETE") { + logger.debug("Accepted MTB File and process deletion") + if (requestId.isBlank()) { + requestProcessor.processDeletion(patientId, TtpConsentStatus.UNKNOWN_CHECK_FILE) + } else { + requestProcessor.processDeletion(patientId, requestId, TtpConsentStatus.UNKNOWN_CHECK_FILE) + } } else { - requestProcessor.processMtbFile(mtbFile, requestId) + logger.debug("Accepted MTB File for processing") + if (requestId.isBlank()) { + requestProcessor.processMtbFile(mtbFile) + } else { + requestProcessor.processMtbFile(mtbFile, requestId) + } } + } catch (e: Exception) { + logger.error("Error while processing MtbFile", e) + requestProcessor.processMtbFile(Mtb.builder().build()) } } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt index 523a0a8..c9825c7 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt @@ -28,6 +28,7 @@ import dev.pcvolkmer.mv64e.mtb.Mtb import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.http.ResponseEntity +import org.springframework.http.converter.HttpMessageNotReadableException import org.springframework.web.bind.annotation.* @RestController @@ -67,3 +68,19 @@ class MtbFileRestController( return ResponseEntity.accepted().build() } } + +@RestControllerAdvice(assignableTypes = [MtbFileRestController::class]) +class MtbFileRestControllerAdvice( + private val requestProcessor: RequestProcessor +) { + + private val logger = LoggerFactory.getLogger(MtbFileRestControllerAdvice::class.java) + + @ExceptionHandler(HttpMessageNotReadableException::class) + fun handleMessageNotReadableException(e: Exception): ResponseEntity { + logger.error("Error while processing MtbFile", e) + requestProcessor.processMtbFile(Mtb.builder().build()) + return ResponseEntity.badRequest().build() + } + +} diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt index 222c92a..0d3f275 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt @@ -21,287 +21,307 @@ package dev.dnpm.etl.processor.input import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.CustomMediaType -import dev.dnpm.etl.processor.consent.ConsentEvaluation import dev.dnpm.etl.processor.consent.ConsentEvaluator import dev.dnpm.etl.processor.consent.TtpConsentStatus import dev.dnpm.etl.processor.services.RequestProcessor import dev.pcvolkmer.mv64e.mtb.* -import java.util.* import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.record.TimestampType +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.mockito.ArgumentCaptor import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.* +import org.mockito.kotlin.any +import org.mockito.kotlin.anyValueClass +import org.mockito.kotlin.firstValue +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import java.util.* @ExtendWith(MockitoExtension::class) class KafkaInputListenerTest { - private lateinit var requestProcessor: RequestProcessor - private lateinit var objectMapper: ObjectMapper + private lateinit var requestProcessor: RequestProcessor + private lateinit var objectMapper: ObjectMapper - private lateinit var kafkaInputListener: KafkaInputListener + private lateinit var kafkaInputListener: KafkaInputListener - @BeforeEach - fun setup( - @Mock requestProcessor: RequestProcessor, - @Mock consentEvaluator: ConsentEvaluator, - ) { - this.requestProcessor = requestProcessor - this.objectMapper = ObjectMapper() + @BeforeEach + fun setup( + @Mock requestProcessor: RequestProcessor, + @Mock consentEvaluator: ConsentEvaluator, + ) { + this.requestProcessor = requestProcessor + this.objectMapper = ObjectMapper() - this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper) - } + this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper) + } - @Test - fun shouldProcessMtbFileRequest() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.PERMIT) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessMtbFileRequest() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.PERMIT) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - kafkaInputListener.onMessage( - ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) - ) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) + ) - verify(requestProcessor, times(1)).processMtbFile(any()) - } + verify(requestProcessor, times(1)).processMtbFile(any()) + } - @Test - fun shouldProcessRequestEvenIfNoConsentInformation() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf() - ) - .build() - ) - .build() - ) - .build() + @Test + fun shouldProcessRequestEvenIfNoConsentInformation() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf() + ) + .build() + ) + .build() + ) + .build() - kafkaInputListener.onMessage( - ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) - ) - verify(requestProcessor, times(1)).processMtbFile(any()) - } + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)) + ) + verify(requestProcessor, times(1)).processMtbFile(any()) + } - @Test - fun shouldProcessMtbFileRequestWithExistingRequestId() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.PERMIT) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessMtbFileRequestWithExistingRequestId() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.PERMIT) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + val headers = + RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), + ) ) - ) - verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) + } - @Test - fun shouldProcessRequestWithoutConsentGiven() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessRequestWithoutConsentGiven() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + val headers = + RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), + ) ) - ) - verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) + } - @Test - fun shouldProcessDnpmV2Request() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessDnpmV2Request() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders( - listOf( - RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), - RecordHeader( - "contentType", - CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), - ), + val headers = + RecordHeaders( + listOf( + RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), + RecordHeader( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ), + ) + ) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), ) ) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), - ) - ) - verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) - } + verify(requestProcessor, times(1)).processMtbFile(any(), anyValueClass()) + } - @Test - fun shouldProcessDnpmV2DeleteRequest() { - val mtbFile = - Mtb.builder() - .patient(Patient.builder().id("DUMMY_12345678").build()) - .metadata( - MvhMetadata.builder() - .modelProjectConsent( - ModelProjectConsent.builder() - .provisions( - listOf( - Provision.builder() - .type(ConsentProvision.DENY) - .purpose(ModelProjectConsentPurpose.SEQUENCING) - .build() + @Test + fun shouldProcessDnpmV2DeleteRequest() { + val mtbFile = + Mtb.builder() + .patient(Patient.builder().id("DUMMY_12345678").build()) + .metadata( + MvhMetadata.builder() + .modelProjectConsent( + ModelProjectConsent.builder() + .provisions( + listOf( + Provision.builder() + .type(ConsentProvision.DENY) + .purpose(ModelProjectConsentPurpose.SEQUENCING) + .build() + ) ) - ) - .build() - ) - .build() - ) - .build() + .build() + ) + .build() + ) + .build() - val headers = - RecordHeaders( - listOf( - RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), - RecordHeader( - "requestMethod", - "DELETE".toByteArray(), - ), - RecordHeader( - "contentType", - CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), - ), + val headers = + RecordHeaders( + listOf( + RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()), + RecordHeader( + "requestMethod", + "DELETE".toByteArray(), + ), + RecordHeader( + "contentType", + CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(), + ), + ) + ) + kafkaInputListener.onMessage( + ConsumerRecord( + "testtopic", + 0, + 0, + -1L, + TimestampType.NO_TIMESTAMP_TYPE, + -1, + -1, + "", + this.objectMapper.writeValueAsString(mtbFile), + headers, + Optional.empty(), ) ) - kafkaInputListener.onMessage( - ConsumerRecord( - "testtopic", - 0, - 0, - -1L, - TimestampType.NO_TIMESTAMP_TYPE, - -1, - -1, - "", - this.objectMapper.writeValueAsString(mtbFile), - headers, - Optional.empty(), + verify(requestProcessor, times(1)) + .processDeletion(anyValueClass(), anyValueClass(), any()) + } + + @ParameterizedTest + @ValueSource(strings = ["[]", "null", "X", ""]) + fun shouldNotProcessNonJsonRecordBody(content: String) { + val mtbFile = + + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, "", content) ) - ) - verify(requestProcessor, times(1)) - .processDeletion(anyValueClass(), anyValueClass(), any()) - } + + val result = verify(requestProcessor, times(1)).processMtbFile(any()) + assertThat(result).isFalse() + } } -- cgit v1.2.3