summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt21
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt11
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt8
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt90
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt41
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt27
6 files changed, 115 insertions, 83 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index 2f6b2bb..41c85ac 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -79,21 +79,28 @@ class KafkaInputListener(
} else {
RequestId("")
}
-
- if (consentEvaluator.check(mtbFile).hasConsent()) {
- logger.debug("Accepted MTB File for processing")
- if (requestId.isBlank()) {
- requestProcessor.processMtbFile(mtbFile)
+ val firstRequestMethodHeader = record.headers().headers("requestMethod")?.firstOrNull()
+ val requestMethod =
+ if (null != firstRequestMethodHeader) {
+ String(firstRequestMethodHeader.value())
} else {
- requestProcessor.processMtbFile(mtbFile, requestId)
+ ""
}
- } else {
+
+ if (requestMethod == "DELETE") {
logger.debug("Accepted MTB File and process deletion")
if (requestId.isBlank()) {
requestProcessor.processDeletion(patientId, TtpConsentStatus.UNKNOWN_CHECK_FILE)
} else {
requestProcessor.processDeletion(patientId, requestId, TtpConsentStatus.UNKNOWN_CHECK_FILE)
}
+ } else {
+ logger.debug("Accepted MTB File for processing")
+ if (requestId.isBlank()) {
+ requestProcessor.processMtbFile(mtbFile)
+ } else {
+ requestProcessor.processMtbFile(mtbFile, requestId)
+ }
}
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
index 071e3cd..f4ab194 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
@@ -52,15 +52,8 @@ class MtbFileRestController(
],
)
fun mtbFile(@RequestBody mtbFile: Mtb): ResponseEntity<Unit> {
- val consentEvaluation = consentEvaluator.check(mtbFile)
- if (consentEvaluation.hasConsent()) {
- logger.debug("Accepted MTB File (DNPM V2) for processing")
- requestProcessor.processMtbFile(mtbFile)
- } else {
- logger.debug("Accepted MTB File (DNPM V2) and process deletion")
- val patientId = PatientId(mtbFile.patient.id)
- requestProcessor.processDeletion(patientId, consentEvaluation.getStatus())
- }
+ logger.debug("Accepted MTB File (DNPM V2) for processing")
+ requestProcessor.processMtbFile(mtbFile)
return ResponseEntity.accepted().build()
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 71e4a78..bcd532f 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -48,6 +48,7 @@ class KafkaMtbFileSender(
objectMapper.writeValueAsString(request.content),
)
record.headers().add("requestId", request.requestId.value.toByteArray())
+ record.headers().add("requestMethod", "POST".toByteArray())
when (request) {
is DnpmV2MtbFileRequest ->
record
@@ -86,6 +87,13 @@ class KafkaMtbFileSender(
),
)
record.headers().add("requestId", request.requestId.value.toByteArray())
+ record.headers().add("requestMethod", "DELETE".toByteArray())
+ record
+ .headers()
+ .add(
+ "contentType",
+ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
+ )
val result = kafkaTemplate.send(record)
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
index da05fba..222c92a 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
@@ -42,7 +42,6 @@ import org.mockito.kotlin.*
class KafkaInputListenerTest {
private lateinit var requestProcessor: RequestProcessor
- private lateinit var consentEvaluator: ConsentEvaluator
private lateinit var objectMapper: ObjectMapper
private lateinit var kafkaInputListener: KafkaInputListener
@@ -53,7 +52,6 @@ class KafkaInputListenerTest {
@Mock consentEvaluator: ConsentEvaluator,
) {
this.requestProcessor = requestProcessor
- this.consentEvaluator = consentEvaluator
this.objectMapper = ObjectMapper()
this.kafkaInputListener = KafkaInputListener(requestProcessor, consentEvaluator, objectMapper)
@@ -61,9 +59,6 @@ class KafkaInputListenerTest {
@Test
fun shouldProcessMtbFileRequest() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true))
-
val mtbFile =
Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
@@ -93,10 +88,7 @@ class KafkaInputListenerTest {
}
@Test
- fun shouldProcessDeleteRequest() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false))
-
+ fun shouldProcessRequestEvenIfNoConsentInformation() {
val mtbFile =
Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
@@ -105,12 +97,7 @@ class KafkaInputListenerTest {
.modelProjectConsent(
ModelProjectConsent.builder()
.provisions(
- listOf(
- Provision.builder()
- .type(ConsentProvision.DENY)
- .purpose(ModelProjectConsentPurpose.SEQUENCING)
- .build()
- )
+ listOf()
)
.build()
)
@@ -121,16 +108,11 @@ class KafkaInputListenerTest {
kafkaInputListener.onMessage(
ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile))
)
-
- verify(requestProcessor, times(1))
- .processDeletion(anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE))
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
}
@Test
fun shouldProcessMtbFileRequestWithExistingRequestId() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true))
-
val mtbFile =
Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
@@ -174,10 +156,7 @@ class KafkaInputListenerTest {
}
@Test
- fun shouldProcessDeleteRequestWithExistingRequestId() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false))
-
+ fun shouldProcessRequestWithoutConsentGiven() {
val mtbFile =
Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
@@ -216,15 +195,62 @@ class KafkaInputListenerTest {
Optional.empty(),
)
)
- verify(requestProcessor, times(1))
- .processDeletion(anyValueClass(), anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE))
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
}
@Test
fun shouldProcessDnpmV2Request() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, false))
+ val mtbFile =
+ Mtb.builder()
+ .patient(Patient.builder().id("DUMMY_12345678").build())
+ .metadata(
+ MvhMetadata.builder()
+ .modelProjectConsent(
+ ModelProjectConsent.builder()
+ .provisions(
+ listOf(
+ Provision.builder()
+ .type(ConsentProvision.DENY)
+ .purpose(ModelProjectConsentPurpose.SEQUENCING)
+ .build()
+ )
+ )
+ .build()
+ )
+ .build()
+ )
+ .build()
+ val headers =
+ RecordHeaders(
+ listOf(
+ RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
+ RecordHeader(
+ "contentType",
+ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
+ ),
+ )
+ )
+ kafkaInputListener.onMessage(
+ ConsumerRecord(
+ "testtopic",
+ 0,
+ 0,
+ -1L,
+ TimestampType.NO_TIMESTAMP_TYPE,
+ -1,
+ -1,
+ "",
+ this.objectMapper.writeValueAsString(mtbFile),
+ headers,
+ Optional.empty(),
+ )
+ )
+ verify(requestProcessor, times(1)).processMtbFile(any<Mtb>(), anyValueClass())
+ }
+
+ @Test
+ fun shouldProcessDnpmV2DeleteRequest() {
val mtbFile =
Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
@@ -251,6 +277,10 @@ class KafkaInputListenerTest {
listOf(
RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()),
RecordHeader(
+ "requestMethod",
+ "DELETE".toByteArray(),
+ ),
+ RecordHeader(
"contentType",
CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray(),
),
@@ -272,6 +302,6 @@ class KafkaInputListenerTest {
)
)
verify(requestProcessor, times(1))
- .processDeletion(anyValueClass(), anyValueClass(), eq(TtpConsentStatus.UNKNOWN_CHECK_FILE))
+ .processDeletion(anyValueClass(), anyValueClass(), any<TtpConsentStatus>())
}
}
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt
index ce72ba3..f3d669b 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/MtbFileRestControllerTest.kt
@@ -74,9 +74,6 @@ class MtbFileRestControllerTest {
@Test
fun shouldRespondPostRequest() {
- whenever(consentEvaluator.check(any()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true))
-
val mtbFileContent =
ClassPathResource("mv64e-mtb-fake-patient.json")
.inputStream
@@ -95,10 +92,7 @@ class MtbFileRestControllerTest {
@ParameterizedTest
@ArgumentsSource(Dnpm21MtbFile::class)
- fun shouldProcessPostRequest(mtb: Mtb, broadConsent: TtpConsentStatus, shouldProcess: String) {
- whenever(consentEvaluator.check(any<Mtb>()))
- .thenReturn(ConsentEvaluation(broadConsent, shouldProcess == "process"))
-
+ fun shouldProcessPostRequest(mtb: Mtb) {
mockMvc
.post("/mtbfile") {
content = objectMapper.writeValueAsString(mtb)
@@ -106,12 +100,6 @@ class MtbFileRestControllerTest {
}
.andExpect { status { isAccepted() } }
- if (shouldProcess == "process") {
- verify(requestProcessor, times(1)).processMtbFile(any<Mtb>())
- } else {
- verify(requestProcessor, times(1))
- .processDeletion(anyValueClass(), org.mockito.kotlin.eq(broadConsent))
- }
}
@ParameterizedTest
@@ -129,9 +117,6 @@ class MtbFileRestControllerTest {
]
)
fun shouldAcceptPostRequests(url: String) {
- whenever(consentEvaluator.check(any<Mtb>()))
- .thenReturn(ConsentEvaluation(TtpConsentStatus.BROAD_CONSENT_GIVEN, true))
-
val mtb =
buildMtb(
MvhMetadata.builder()
@@ -197,29 +182,15 @@ class MtbFileRestControllerTest {
class Dnpm21MtbFile :
ArgProvider(
- // No Metadata and no broad consent => delete
- Arguments.of(buildMtb(null), TtpConsentStatus.BROAD_CONSENT_MISSING_OR_REJECTED, "delete"),
// No Metadata and broad consent given => process
Arguments.of(buildMtb(null), TtpConsentStatus.BROAD_CONSENT_GIVEN, "process"),
- // No model project consent and no broad consent => delete
- Arguments.of(
- buildMtb(
- MvhMetadata.builder()
- .modelProjectConsent(ModelProjectConsent.builder().build())
- .build()
- ),
- TtpConsentStatus.BROAD_CONSENT_MISSING_OR_REJECTED,
- "delete",
- ),
// No model project consent and broad consent given => process
Arguments.of(
buildMtb(
MvhMetadata.builder()
.modelProjectConsent(ModelProjectConsent.builder().build())
.build()
- ),
- TtpConsentStatus.BROAD_CONSENT_GIVEN,
- "process",
+ )
),
// Model project consent given and no broad consent => process
Arguments.of(
@@ -239,9 +210,7 @@ class Dnpm21MtbFile :
.build()
)
.build()
- ),
- TtpConsentStatus.UNKNOWN_CHECK_FILE,
- "process",
+ )
),
// Model project consent given and broad consent given => process
Arguments.of(
@@ -261,9 +230,7 @@ class Dnpm21MtbFile :
.build()
)
.build()
- ),
- TtpConsentStatus.BROAD_CONSENT_GIVEN,
- "process",
+ )
),
) {
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
index b1cd5fa..8adc21c 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
@@ -172,11 +172,38 @@ class KafkaMtbFileSenderTest {
assertThat(captor.firstValue.headers().headers("requestId")).isNotNull
assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value())
.isEqualTo(TEST_REQUEST_ID.value.toByteArray())
+ assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull
+ assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value())
+ .isEqualTo("POST".toByteArray())
assertThat(captor.firstValue.value()).isNotNull
assertThat(captor.firstValue.value())
.isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID)))
}
+ @Test
+ fun shouldSendDeleteRequestWithCorrectKeyAndHeaderAndBody() {
+ doAnswer { completedFuture(SendResult<String, String>(null, null)) }
+ .whenever(kafkaTemplate)
+ .send(any<ProducerRecord<String, String>>())
+
+ kafkaMtbFileSender.send(DeleteRequest(TEST_REQUEST_ID, PatientPseudonym("PID")))
+
+ val captor = argumentCaptor<ProducerRecord<String, String>>()
+ verify(kafkaTemplate, times(1)).send(captor.capture())
+ assertThat(captor.firstValue.key()).isNotNull
+ assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}")
+ assertThat(captor.firstValue.headers().headers("contentType")).isNotNull
+ assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value())
+ .isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray())
+ assertThat(captor.firstValue.headers().headers("requestId")).isNotNull
+ assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value())
+ .isEqualTo(TEST_REQUEST_ID.value.toByteArray())
+ assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull
+ assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value())
+ .isEqualTo("DELETE".toByteArray())
+ assertThat(captor.firstValue.value()).isNotNull
+ }
+
@ParameterizedTest
@MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource")
fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) {