From c6b37fda69784a5d6058fe19ab87bf73e84c8b1c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sun, 6 Apr 2025 22:17:46 +0200 Subject: feat: support multiple request content types (#109) --- .../etl/processor/input/MtbFileRestController.kt | 9 ++-- .../etl/processor/output/KafkaMtbFileSender.kt | 56 +++++++++++--------- .../dev/dnpm/etl/processor/output/MtbFileSender.kt | 12 +---- .../dev/dnpm/etl/processor/output/MtbRequest.kt | 59 ++++++++++++++++++++++ .../dnpm/etl/processor/output/RestMtbFileSender.kt | 27 ++++++---- .../etl/processor/services/RequestProcessor.kt | 54 +++++++++++++------- .../processor/services/TransformationService.kt | 15 +++++- 7 files changed, 164 insertions(+), 68 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt (limited to 'src/main') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt index 432711a..e67a380 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt @@ -26,7 +26,6 @@ import dev.dnpm.etl.processor.PatientId import dev.dnpm.etl.processor.services.RequestProcessor import dev.pcvolkmer.mv64e.mtb.Mtb import org.slf4j.LoggerFactory -import org.springframework.http.HttpStatus import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.* @@ -47,10 +46,10 @@ class MtbFileRestController( @PostMapping( consumes = [ MediaType.APPLICATION_JSON_VALUE ] ) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { if (mtbFile.consent.status == Consent.Status.ACTIVE) { - logger.debug("Accepted MTB File for processing") + logger.debug("Accepted MTB File (bwHC V1) for processing") requestProcessor.processMtbFile(mtbFile) } else { - logger.debug("Accepted MTB File and process deletion") + logger.debug("Accepted MTB File (bwHC V1) and process deletion") val patientId = PatientId(mtbFile.patient.id) requestProcessor.processDeletion(patientId) } @@ -59,7 +58,9 @@ class MtbFileRestController( @PostMapping( consumes = [ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE] ) fun mtbFile(@RequestBody mtbFile: Mtb): ResponseEntity { - return ResponseEntity.status(HttpStatus.NOT_IMPLEMENTED).build() + logger.debug("Accepted MTB File (DNPM V2) for processing") + requestProcessor.processMtbFile(mtbFile) + return ResponseEntity.accepted().build() } @DeleteMapping(path = ["{patientId}"]) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 6391e99..c00b2fd 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -22,10 +22,12 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.RequestId +import dev.dnpm.etl.processor.CustomMediaType import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.apache.kafka.clients.producer.ProducerRecord import org.slf4j.LoggerFactory +import org.springframework.http.MediaType import org.springframework.kafka.core.KafkaTemplate import org.springframework.retry.support.RetryTemplate @@ -38,14 +40,20 @@ class KafkaMtbFileSender( private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { + override fun send(request: MtbFileRequest): MtbFileSender.Response { return try { return retryTemplate.execute { - val result = kafkaTemplate.send( - kafkaProperties.outputTopic, - key(request), - objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) - ) + val record = + ProducerRecord(kafkaProperties.outputTopic, key(request), objectMapper.writeValueAsString(request)) + when (request) { + is BwhcV1MtbFileRequest -> record.headers() + .add("contentType", MediaType.APPLICATION_JSON_VALUE.toByteArray()) + + is DnpmV2MtbFileRequest -> record.headers() + .add("contentType", CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) + } + + val result = kafkaTemplate.send(record) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") MtbFileSender.Response(RequestStatus.UNKNOWN) @@ -59,7 +67,7 @@ class KafkaMtbFileSender( } } - override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { + override fun send(request: DeleteRequest): MtbFileSender.Response { val dummyMtbFile = MtbFile.builder() .withConsent( Consent.builder() @@ -71,12 +79,15 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute { - val result = kafkaTemplate.send( - kafkaProperties.outputTopic, - key(request), - objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) - ) + val record = + ProducerRecord( + kafkaProperties.outputTopic, + key(request), + // Always use old BwhcV1FileRequest with Consent REJECT + objectMapper.writeValueAsString(BwhcV1MtbFileRequest(request.requestId, dummyMtbFile)) + ) + val result = kafkaTemplate.send(record) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") MtbFileSender.Response(RequestStatus.UNKNOWN) @@ -94,13 +105,12 @@ class KafkaMtbFileSender( return "${this.kafkaProperties.servers} (${this.kafkaProperties.outputTopic}/${this.kafkaProperties.outputResponseTopic})" } - private fun key(request: MtbFileSender.MtbFileRequest): String { - return "{\"pid\": \"${request.mtbFile.patient.id}\"}" - } - - private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId.value}\"}" + private fun key(request: MtbRequest): String { + return when (request) { + is BwhcV1MtbFileRequest -> "{\"pid\": \"${request.content.patient.id}\"}" + is DnpmV2MtbFileRequest -> "{\"pid\": \"${request.content.patient.id}\"}" + is DeleteRequest -> "{\"pid\": \"${request.patientId.value}\"}" + else -> throw IllegalArgumentException("Unsupported request type: ${request::class.simpleName}") + } } - - data class Data(val requestId: RequestId, val content: MtbFile) -} \ No newline at end of file +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 8d994c5..285ce07 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -19,25 +19,17 @@ package dev.dnpm.etl.processor.output -import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.PatientPseudonym -import dev.dnpm.etl.processor.RequestId import dev.dnpm.etl.processor.monitoring.RequestStatus import org.springframework.http.HttpStatusCode interface MtbFileSender { - fun send(request: MtbFileRequest): Response + fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response fun endpoint(): String data class Response(val status: RequestStatus, val body: String = "") - - data class MtbFileRequest(val requestId: RequestId, val mtbFile: MtbFile) - - data class DeleteRequest(val requestId: RequestId, val patientId: PatientPseudonym) - } fun Int.asRequestStatus(): RequestStatus { @@ -51,4 +43,4 @@ fun Int.asRequestStatus(): RequestStatus { fun HttpStatusCode.asRequestStatus(): RequestStatus { return this.value().asRequestStatus() -} \ No newline at end of file +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt new file mode 100644 index 0000000..9b500f0 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt @@ -0,0 +1,59 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.output + +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.PatientPseudonym +import dev.dnpm.etl.processor.RequestId +import dev.pcvolkmer.mv64e.mtb.Mtb + +interface MtbRequest { + val requestId: RequestId +} + +sealed interface MtbFileRequest : MtbRequest { + override val requestId: RequestId + val content: T + + fun patientPseudonym(): PatientPseudonym +} + +data class BwhcV1MtbFileRequest( + override val requestId: RequestId, + override val content: MtbFile +) : MtbFileRequest { + override fun patientPseudonym(): PatientPseudonym { + return PatientPseudonym(content.patient.id) + } +} + +data class DnpmV2MtbFileRequest( + override val requestId: RequestId, + override val content: Mtb +) : MtbFileRequest { + override fun patientPseudonym(): PatientPseudonym { + return PatientPseudonym(content.patient.id) + } +} + +data class DeleteRequest( + override val requestId: RequestId, + val patientId: PatientPseudonym +) : MtbRequest diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 90e3629..78222b2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -19,10 +19,11 @@ package dev.dnpm.etl.processor.output -import dev.dnpm.etl.processor.config.RestTargetProperties -import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.CustomMediaType import dev.dnpm.etl.processor.PatientPseudonym +import dev.dnpm.etl.processor.config.RestTargetProperties import dev.dnpm.etl.processor.monitoring.ReportService +import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.asRequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity @@ -46,11 +47,11 @@ abstract class RestMtbFileSender( abstract fun deleteUrl(patientId: PatientPseudonym): String - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { + override fun send(request: MtbFileRequest): MtbFileSender.Response { try { return retryTemplate.execute { - val headers = getHttpHeaders() - val entityReq = HttpEntity(request.mtbFile, headers) + val headers = getHttpHeaders(request) + val entityReq = HttpEntity(request.content, headers) val response = restTemplate.postForEntity( sendUrl(), entityReq, @@ -76,10 +77,10 @@ abstract class RestMtbFileSender( return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } - override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { + override fun send(request: DeleteRequest): MtbFileSender.Response { try { return retryTemplate.execute { - val headers = getHttpHeaders() + val headers = getHttpHeaders(request) val entityReq = HttpEntity(null, headers) restTemplate.delete( deleteUrl(request.patientId), @@ -102,11 +103,15 @@ abstract class RestMtbFileSender( return this.restTargetProperties.uri.orEmpty() } - private fun getHttpHeaders(): HttpHeaders { + private fun getHttpHeaders(request: MtbRequest): HttpHeaders { val username = restTargetProperties.username val password = restTargetProperties.password val headers = HttpHeaders() - headers.contentType = MediaType.APPLICATION_JSON + headers.contentType = when (request) { + is BwhcV1MtbFileRequest -> MediaType.APPLICATION_JSON + is DnpmV2MtbFileRequest -> CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON + else -> MediaType.APPLICATION_JSON + } if (username.isNullOrBlank() || password.isNullOrBlank()) { return headers @@ -116,4 +121,4 @@ abstract class RestMtbFileSender( return headers } -} \ No newline at end of file +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 5b2c42a..f25452e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -27,10 +27,11 @@ import dev.dnpm.etl.processor.monitoring.Report import dev.dnpm.etl.processor.monitoring.Request import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType -import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.* import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import dev.dnpm.etl.processor.pseudonym.anonymizeContentWith import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith +import dev.pcvolkmer.mv64e.mtb.Mtb import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.springframework.context.ApplicationEventPublisher @@ -55,29 +56,40 @@ class RequestProcessor( fun processMtbFile(mtbFile: MtbFile, requestId: RequestId) { val pid = PatientId(mtbFile.patient.id) - mtbFile pseudonymizeWith pseudonymizeService mtbFile anonymizeContentWith pseudonymizeService + val request = BwhcV1MtbFileRequest(requestId, transformationService.transform(mtbFile)) + saveAndSend(request, pid) + } - val request = MtbFileSender.MtbFileRequest(requestId, transformationService.transform(mtbFile)) + fun processMtbFile(mtbFile: Mtb) { + processMtbFile(mtbFile, randomRequestId()) + } - val patientPseudonym = PatientPseudonym(request.mtbFile.patient.id) + fun processMtbFile(mtbFile: Mtb, requestId: RequestId) { + val pid = PatientId(mtbFile.patient.id) + mtbFile pseudonymizeWith pseudonymizeService + mtbFile anonymizeContentWith pseudonymizeService + val request = DnpmV2MtbFileRequest(requestId, transformationService.transform(mtbFile)) + saveAndSend(request, pid) + } + private fun saveAndSend(request: MtbFileRequest, pid: PatientId) { requestService.save( Request( - requestId, - patientPseudonym, + request.requestId, + request.patientPseudonym(), pid, - fingerprint(request.mtbFile), + fingerprint(request), RequestType.MTB_FILE, RequestStatus.UNKNOWN ) ) - if (appConfigProperties.duplicationDetection && isDuplication(mtbFile)) { + if (appConfigProperties.duplicationDetection && isDuplication(request)) { applicationEventPublisher.publishEvent( ResponseEvent( - requestId, + request.requestId, Instant.now(), RequestStatus.DUPLICATION ) @@ -89,7 +101,7 @@ class RequestProcessor( applicationEventPublisher.publishEvent( ResponseEvent( - requestId, + request.requestId, Instant.now(), responseStatus.status, when (responseStatus.status) { @@ -100,8 +112,11 @@ class RequestProcessor( ) } - private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { - val patientPseudonym = PatientPseudonym(pseudonymizedMtbFile.patient.id) + private fun isDuplication(pseudonymizedMtbFileRequest: MtbFileRequest): Boolean { + val patientPseudonym = when (pseudonymizedMtbFileRequest) { + is BwhcV1MtbFileRequest -> PatientPseudonym(pseudonymizedMtbFileRequest.content.patient.id) + is DnpmV2MtbFileRequest -> PatientPseudonym(pseudonymizedMtbFileRequest.content.patient.id) + } val lastMtbFileRequestForPatient = requestService.lastMtbFileRequestForPatientPseudonym(patientPseudonym) @@ -109,7 +124,7 @@ class RequestProcessor( return null != lastMtbFileRequestForPatient && !isLastRequestDeletion - && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile) + && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFileRequest) } fun processDeletion(patientId: PatientId) { @@ -131,7 +146,7 @@ class RequestProcessor( ) ) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + val responseStatus = sender.send(DeleteRequest(requestId, patientPseudonym)) applicationEventPublisher.publishEvent( ResponseEvent( @@ -160,8 +175,11 @@ class RequestProcessor( } } - private fun fingerprint(mtbFile: MtbFile): Fingerprint { - return fingerprint(objectMapper.writeValueAsString(mtbFile)) + private fun fingerprint(request: MtbFileRequest): Fingerprint { + return when (request) { + is BwhcV1MtbFileRequest -> fingerprint(objectMapper.writeValueAsString(request.content)) + is DnpmV2MtbFileRequest -> fingerprint(objectMapper.writeValueAsString(request.content)) + } } private fun fingerprint(s: String): Fingerprint { @@ -172,4 +190,4 @@ class RequestProcessor( ) } -} \ No newline at end of file +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt index 2a9dc5b..9447a84 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt @@ -23,10 +23,21 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.jayway.jsonpath.JsonPath import com.jayway.jsonpath.PathNotFoundException import de.ukw.ccc.bwhc.dto.MtbFile +import dev.pcvolkmer.mv64e.mtb.Mtb class TransformationService(private val objectMapper: ObjectMapper, private val transformations: List) { fun transform(mtbFile: MtbFile): MtbFile { - var json = objectMapper.writeValueAsString(mtbFile) + val json = transform(objectMapper.writeValueAsString(mtbFile)) + return objectMapper.readValue(json, MtbFile::class.java) + } + + fun transform(mtbFile: Mtb): Mtb { + val json = transform(objectMapper.writeValueAsString(mtbFile)) + return objectMapper.readValue(json, Mtb::class.java) + } + + private fun transform(content: String): String { + var json = content transformations.forEach { transformation -> val jsonPath = JsonPath.parse(json) @@ -48,7 +59,7 @@ class TransformationService(private val objectMapper: ObjectMapper, private val json = jsonPath.jsonString() } - return objectMapper.readValue(json, MtbFile::class.java) + return json } fun getTransformations(): List { -- cgit v1.2.3