summaryrefslogtreecommitdiff
path: root/src/main/kotlin/dev
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2025-04-06 22:17:46 +0200
committerGitHub2025-04-06 22:17:46 +0200
commitc6b37fda69784a5d6058fe19ab87bf73e84c8b1c (patch)
treee14bd0cbf5e30d31662ae71bff73a7c4d9f64d9a /src/main/kotlin/dev
parent8e3de6a220b9f48107e1f0af8193fd37102f9ae3 (diff)
feat: support multiple request content types (#109)
Diffstat (limited to 'src/main/kotlin/dev')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt9
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt56
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt12
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt59
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt27
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt54
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt15
7 files changed, 164 insertions, 68 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
index 432711a..e67a380 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/MtbFileRestController.kt
@@ -26,7 +26,6 @@ import dev.dnpm.etl.processor.PatientId
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.pcvolkmer.mv64e.mtb.Mtb
import org.slf4j.LoggerFactory
-import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
@@ -47,10 +46,10 @@ class MtbFileRestController(
@PostMapping( consumes = [ MediaType.APPLICATION_JSON_VALUE ] )
fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Unit> {
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
- logger.debug("Accepted MTB File for processing")
+ logger.debug("Accepted MTB File (bwHC V1) for processing")
requestProcessor.processMtbFile(mtbFile)
} else {
- logger.debug("Accepted MTB File and process deletion")
+ logger.debug("Accepted MTB File (bwHC V1) and process deletion")
val patientId = PatientId(mtbFile.patient.id)
requestProcessor.processDeletion(patientId)
}
@@ -59,7 +58,9 @@ class MtbFileRestController(
@PostMapping( consumes = [ CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE] )
fun mtbFile(@RequestBody mtbFile: Mtb): ResponseEntity<Unit> {
- return ResponseEntity.status(HttpStatus.NOT_IMPLEMENTED).build()
+ logger.debug("Accepted MTB File (DNPM V2) for processing")
+ requestProcessor.processMtbFile(mtbFile)
+ return ResponseEntity.accepted().build()
}
@DeleteMapping(path = ["{patientId}"])
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 6391e99..c00b2fd 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -22,10 +22,12 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.RequestId
+import dev.dnpm.etl.processor.CustomMediaType
import dev.dnpm.etl.processor.config.KafkaProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory
+import org.springframework.http.MediaType
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.retry.support.RetryTemplate
@@ -38,14 +40,20 @@ class KafkaMtbFileSender(
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
- override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
+ override fun <T> send(request: MtbFileRequest<T>): MtbFileSender.Response {
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
- val result = kafkaTemplate.send(
- kafkaProperties.outputTopic,
- key(request),
- objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
- )
+ val record =
+ ProducerRecord(kafkaProperties.outputTopic, key(request), objectMapper.writeValueAsString(request))
+ when (request) {
+ is BwhcV1MtbFileRequest -> record.headers()
+ .add("contentType", MediaType.APPLICATION_JSON_VALUE.toByteArray())
+
+ is DnpmV2MtbFileRequest -> record.headers()
+ .add("contentType", CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray())
+ }
+
+ val result = kafkaTemplate.send(record)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
MtbFileSender.Response(RequestStatus.UNKNOWN)
@@ -59,7 +67,7 @@ class KafkaMtbFileSender(
}
}
- override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
+ override fun send(request: DeleteRequest): MtbFileSender.Response {
val dummyMtbFile = MtbFile.builder()
.withConsent(
Consent.builder()
@@ -71,12 +79,15 @@ class KafkaMtbFileSender(
return try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
- val result = kafkaTemplate.send(
- kafkaProperties.outputTopic,
- key(request),
- objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
- )
+ val record =
+ ProducerRecord(
+ kafkaProperties.outputTopic,
+ key(request),
+ // Always use old BwhcV1FileRequest with Consent REJECT
+ objectMapper.writeValueAsString(BwhcV1MtbFileRequest(request.requestId, dummyMtbFile))
+ )
+ val result = kafkaTemplate.send(record)
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
MtbFileSender.Response(RequestStatus.UNKNOWN)
@@ -94,13 +105,12 @@ class KafkaMtbFileSender(
return "${this.kafkaProperties.servers} (${this.kafkaProperties.outputTopic}/${this.kafkaProperties.outputResponseTopic})"
}
- private fun key(request: MtbFileSender.MtbFileRequest): String {
- return "{\"pid\": \"${request.mtbFile.patient.id}\"}"
- }
-
- private fun key(request: MtbFileSender.DeleteRequest): String {
- return "{\"pid\": \"${request.patientId.value}\"}"
+ private fun key(request: MtbRequest): String {
+ return when (request) {
+ is BwhcV1MtbFileRequest -> "{\"pid\": \"${request.content.patient.id}\"}"
+ is DnpmV2MtbFileRequest -> "{\"pid\": \"${request.content.patient.id}\"}"
+ is DeleteRequest -> "{\"pid\": \"${request.patientId.value}\"}"
+ else -> throw IllegalArgumentException("Unsupported request type: ${request::class.simpleName}")
+ }
}
-
- data class Data(val requestId: RequestId, val content: MtbFile)
-} \ No newline at end of file
+}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
index 8d994c5..285ce07 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
@@ -19,25 +19,17 @@
package dev.dnpm.etl.processor.output
-import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.PatientPseudonym
-import dev.dnpm.etl.processor.RequestId
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.springframework.http.HttpStatusCode
interface MtbFileSender {
- fun send(request: MtbFileRequest): Response
+ fun <T> send(request: MtbFileRequest<T>): Response
fun send(request: DeleteRequest): Response
fun endpoint(): String
data class Response(val status: RequestStatus, val body: String = "")
-
- data class MtbFileRequest(val requestId: RequestId, val mtbFile: MtbFile)
-
- data class DeleteRequest(val requestId: RequestId, val patientId: PatientPseudonym)
-
}
fun Int.asRequestStatus(): RequestStatus {
@@ -51,4 +43,4 @@ fun Int.asRequestStatus(): RequestStatus {
fun HttpStatusCode.asRequestStatus(): RequestStatus {
return this.value().asRequestStatus()
-} \ No newline at end of file
+}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt
new file mode 100644
index 0000000..9b500f0
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbRequest.kt
@@ -0,0 +1,59 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.output
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.PatientPseudonym
+import dev.dnpm.etl.processor.RequestId
+import dev.pcvolkmer.mv64e.mtb.Mtb
+
+interface MtbRequest {
+ val requestId: RequestId
+}
+
+sealed interface MtbFileRequest<out T> : MtbRequest {
+ override val requestId: RequestId
+ val content: T
+
+ fun patientPseudonym(): PatientPseudonym
+}
+
+data class BwhcV1MtbFileRequest(
+ override val requestId: RequestId,
+ override val content: MtbFile
+) : MtbFileRequest<MtbFile> {
+ override fun patientPseudonym(): PatientPseudonym {
+ return PatientPseudonym(content.patient.id)
+ }
+}
+
+data class DnpmV2MtbFileRequest(
+ override val requestId: RequestId,
+ override val content: Mtb
+) : MtbFileRequest<Mtb> {
+ override fun patientPseudonym(): PatientPseudonym {
+ return PatientPseudonym(content.patient.id)
+ }
+}
+
+data class DeleteRequest(
+ override val requestId: RequestId,
+ val patientId: PatientPseudonym
+) : MtbRequest
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 90e3629..78222b2 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -19,10 +19,11 @@
package dev.dnpm.etl.processor.output
-import dev.dnpm.etl.processor.config.RestTargetProperties
-import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.CustomMediaType
import dev.dnpm.etl.processor.PatientPseudonym
+import dev.dnpm.etl.processor.config.RestTargetProperties
import dev.dnpm.etl.processor.monitoring.ReportService
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.asRequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
@@ -46,11 +47,11 @@ abstract class RestMtbFileSender(
abstract fun deleteUrl(patientId: PatientPseudonym): String
- override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
+ override fun <T> send(request: MtbFileRequest<T>): MtbFileSender.Response {
try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
- val headers = getHttpHeaders()
- val entityReq = HttpEntity(request.mtbFile, headers)
+ val headers = getHttpHeaders(request)
+ val entityReq = HttpEntity(request.content, headers)
val response = restTemplate.postForEntity(
sendUrl(),
entityReq,
@@ -76,10 +77,10 @@ abstract class RestMtbFileSender(
return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
- override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
+ override fun send(request: DeleteRequest): MtbFileSender.Response {
try {
return retryTemplate.execute<MtbFileSender.Response, Exception> {
- val headers = getHttpHeaders()
+ val headers = getHttpHeaders(request)
val entityReq = HttpEntity(null, headers)
restTemplate.delete(
deleteUrl(request.patientId),
@@ -102,11 +103,15 @@ abstract class RestMtbFileSender(
return this.restTargetProperties.uri.orEmpty()
}
- private fun getHttpHeaders(): HttpHeaders {
+ private fun getHttpHeaders(request: MtbRequest): HttpHeaders {
val username = restTargetProperties.username
val password = restTargetProperties.password
val headers = HttpHeaders()
- headers.contentType = MediaType.APPLICATION_JSON
+ headers.contentType = when (request) {
+ is BwhcV1MtbFileRequest -> MediaType.APPLICATION_JSON
+ is DnpmV2MtbFileRequest -> CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON
+ else -> MediaType.APPLICATION_JSON
+ }
if (username.isNullOrBlank() || password.isNullOrBlank()) {
return headers
@@ -116,4 +121,4 @@ abstract class RestMtbFileSender(
return headers
}
-} \ No newline at end of file
+}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index 5b2c42a..f25452e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2025 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -27,10 +27,11 @@ import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
-import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.output.*
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
import dev.dnpm.etl.processor.pseudonym.anonymizeContentWith
import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
+import dev.pcvolkmer.mv64e.mtb.Mtb
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
import org.springframework.context.ApplicationEventPublisher
@@ -55,29 +56,40 @@ class RequestProcessor(
fun processMtbFile(mtbFile: MtbFile, requestId: RequestId) {
val pid = PatientId(mtbFile.patient.id)
-
mtbFile pseudonymizeWith pseudonymizeService
mtbFile anonymizeContentWith pseudonymizeService
+ val request = BwhcV1MtbFileRequest(requestId, transformationService.transform(mtbFile))
+ saveAndSend(request, pid)
+ }
- val request = MtbFileSender.MtbFileRequest(requestId, transformationService.transform(mtbFile))
+ fun processMtbFile(mtbFile: Mtb) {
+ processMtbFile(mtbFile, randomRequestId())
+ }
- val patientPseudonym = PatientPseudonym(request.mtbFile.patient.id)
+ fun processMtbFile(mtbFile: Mtb, requestId: RequestId) {
+ val pid = PatientId(mtbFile.patient.id)
+ mtbFile pseudonymizeWith pseudonymizeService
+ mtbFile anonymizeContentWith pseudonymizeService
+ val request = DnpmV2MtbFileRequest(requestId, transformationService.transform(mtbFile))
+ saveAndSend(request, pid)
+ }
+ private fun <T> saveAndSend(request: MtbFileRequest<T>, pid: PatientId) {
requestService.save(
Request(
- requestId,
- patientPseudonym,
+ request.requestId,
+ request.patientPseudonym(),
pid,
- fingerprint(request.mtbFile),
+ fingerprint(request),
RequestType.MTB_FILE,
RequestStatus.UNKNOWN
)
)
- if (appConfigProperties.duplicationDetection && isDuplication(mtbFile)) {
+ if (appConfigProperties.duplicationDetection && isDuplication(request)) {
applicationEventPublisher.publishEvent(
ResponseEvent(
- requestId,
+ request.requestId,
Instant.now(),
RequestStatus.DUPLICATION
)
@@ -89,7 +101,7 @@ class RequestProcessor(
applicationEventPublisher.publishEvent(
ResponseEvent(
- requestId,
+ request.requestId,
Instant.now(),
responseStatus.status,
when (responseStatus.status) {
@@ -100,8 +112,11 @@ class RequestProcessor(
)
}
- private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
- val patientPseudonym = PatientPseudonym(pseudonymizedMtbFile.patient.id)
+ private fun <T> isDuplication(pseudonymizedMtbFileRequest: MtbFileRequest<T>): Boolean {
+ val patientPseudonym = when (pseudonymizedMtbFileRequest) {
+ is BwhcV1MtbFileRequest -> PatientPseudonym(pseudonymizedMtbFileRequest.content.patient.id)
+ is DnpmV2MtbFileRequest -> PatientPseudonym(pseudonymizedMtbFileRequest.content.patient.id)
+ }
val lastMtbFileRequestForPatient =
requestService.lastMtbFileRequestForPatientPseudonym(patientPseudonym)
@@ -109,7 +124,7 @@ class RequestProcessor(
return null != lastMtbFileRequestForPatient
&& !isLastRequestDeletion
- && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile)
+ && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFileRequest)
}
fun processDeletion(patientId: PatientId) {
@@ -131,7 +146,7 @@ class RequestProcessor(
)
)
- val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
+ val responseStatus = sender.send(DeleteRequest(requestId, patientPseudonym))
applicationEventPublisher.publishEvent(
ResponseEvent(
@@ -160,8 +175,11 @@ class RequestProcessor(
}
}
- private fun fingerprint(mtbFile: MtbFile): Fingerprint {
- return fingerprint(objectMapper.writeValueAsString(mtbFile))
+ private fun <T> fingerprint(request: MtbFileRequest<T>): Fingerprint {
+ return when (request) {
+ is BwhcV1MtbFileRequest -> fingerprint(objectMapper.writeValueAsString(request.content))
+ is DnpmV2MtbFileRequest -> fingerprint(objectMapper.writeValueAsString(request.content))
+ }
}
private fun fingerprint(s: String): Fingerprint {
@@ -172,4 +190,4 @@ class RequestProcessor(
)
}
-} \ No newline at end of file
+}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt
index 2a9dc5b..9447a84 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/TransformationService.kt
@@ -23,10 +23,21 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.jayway.jsonpath.JsonPath
import com.jayway.jsonpath.PathNotFoundException
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.pcvolkmer.mv64e.mtb.Mtb
class TransformationService(private val objectMapper: ObjectMapper, private val transformations: List<Transformation>) {
fun transform(mtbFile: MtbFile): MtbFile {
- var json = objectMapper.writeValueAsString(mtbFile)
+ val json = transform(objectMapper.writeValueAsString(mtbFile))
+ return objectMapper.readValue(json, MtbFile::class.java)
+ }
+
+ fun transform(mtbFile: Mtb): Mtb {
+ val json = transform(objectMapper.writeValueAsString(mtbFile))
+ return objectMapper.readValue(json, Mtb::class.java)
+ }
+
+ private fun transform(content: String): String {
+ var json = content
transformations.forEach { transformation ->
val jsonPath = JsonPath.parse(json)
@@ -48,7 +59,7 @@ class TransformationService(private val objectMapper: ObjectMapper, private val
json = jsonPath.jsonString()
}
- return objectMapper.readValue(json, MtbFile::class.java)
+ return json
}
fun getTransformations(): List<Transformation> {