From 51cf7a7917d7376d1e7c685b9c0e56d8929ad9e1 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 11:52:11 +0200 Subject: Add processor to handle responses from Kafka topic --- .../dnpm/etl/processor/EtlProcessorApplication.kt | 1 - .../etl/processor/config/AppConfigProperties.kt | 4 +- .../dnpm/etl/processor/config/AppConfiguration.kt | 16 +--- .../etl/processor/config/AppKafkaConfiguration.kt | 70 +++++++++++++++++ .../dev/dnpm/etl/processor/monitoring/Request.kt | 4 +- .../services/kafka/KafkaResponseProcessor.kt | 87 ++++++++++++++++++++++ 6 files changed, 166 insertions(+), 16 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt index 0c4ab68..5d28c97 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt @@ -28,4 +28,3 @@ class EtlProcessorApplication fun main(args: Array) { runApplication(*args) } - diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 64be70d..6502a1b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -48,7 +48,7 @@ data class GPasConfigProperties( val password: String?, val sslCaLocation: String?, -) { + ) { companion object { const val NAME = "app.pseudonymize.gpas" } @@ -66,6 +66,8 @@ data class RestTargetProperties( @ConfigurationProperties(KafkaTargetProperties.NAME) data class KafkaTargetProperties( val topic: String = "etl-processor", + val responseTopic: String = "${topic}_response", + val groupId: String = "${topic}_group", val servers: String = "" ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index c677f2b..cbba1f1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator @@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks @Configuration @@ -60,7 +58,10 @@ class AppConfiguration { } @Bean - fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService { + fun pseudonymizeService( + generator: Generator, + pseudonymizeConfigProperties: PseudonymizeConfigProperties + ): PseudonymizeService { return PseudonymizeService(generator, pseudonymizeConfigProperties) } @@ -70,15 +71,6 @@ class AppConfiguration { return RestMtbFileSender(restTargetProperties) } - @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) - @Bean - fun kafkaMtbFileSender( - kafkaTemplate: KafkaTemplate, - objectMapper: ObjectMapper - ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt new file mode 100644 index 0000000..f81d3fb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer + +@Configuration +@EnableConfigurationProperties( + value = [KafkaTargetProperties::class] +) +@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +class AppKafkaConfiguration { + + @Bean + fun kafkaMtbFileSender( + kafkaTemplate: KafkaTemplate, + objectMapper: ObjectMapper + ): MtbFileSender { + return KafkaMtbFileSender(kafkaTemplate, objectMapper) + } + + @Bean + fun kafkaListenerContainer( + consumerFactory: ConsumerFactory, + kafkaTargetProperties: KafkaTargetProperties, + kafkaResponseProcessor: KafkaResponseProcessor + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + containerProperties.messageListener = kafkaResponseProcessor + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + fun kafkaResponseProcessor( + requestRepository: RequestRepository, + objectMapper: ObjectMapper + ): KafkaResponseProcessor { + return KafkaResponseProcessor(requestRepository, objectMapper) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt index ecd8219..c1d4d43 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt @@ -36,9 +36,9 @@ data class Request( val patientId: String, val pid: String, val fingerprint: String, - val status: RequestStatus, val type: RequestType, - val processedAt: Instant = Instant.now(), + var status: RequestStatus, + var processedAt: Instant = Instant.now(), @Embedded.Nullable var report: Report? = null ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt new file mode 100644 index 0000000..f0c91cb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -0,0 +1,87 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.LoggerFactory +import org.springframework.kafka.listener.MessageListener +import java.time.Instant + +class KafkaResponseProcessor( + private val requestRepository: RequestRepository, + private val objectMapper: ObjectMapper +) : MessageListener { + + private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java) + + override fun onMessage(data: ConsumerRecord) { + try { + val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) + requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + when (responseBody.statusCode) { + 200 -> { + it.status = RequestStatus.SUCCESS + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + requestRepository.save(it) + } + + 201 -> { + it.status = RequestStatus.WARNING + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Warnungen über mangelhafte Daten", + responseBody.statusBody + ) + requestRepository.save(it) + } + + 400, 422 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + responseBody.statusBody + ) + requestRepository.save(it) + } + + else -> { + logger.error("Cannot process Kafka response: Unknown response code!") + } + } + } + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + } + } + + data class ResponseKey(val requestId: String) + + data class ResponseBody( + @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_body") val statusBody: String + ) +} \ No newline at end of file -- cgit v1.2.3 From 35cb258b13543b37ce061f78eef4427e542ca72a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:18 +0200 Subject: Do not return specific status code based on remote status code --- .../dnpm/etl/processor/services/RequestProcessor.kt | 17 ++++------------- .../dev/dnpm/etl/processor/web/MtbFileController.kt | 21 ++++++--------------- 2 files changed, 10 insertions(+), 28 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 8588ebe..7d110b1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -42,7 +42,7 @@ class RequestProcessor( private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile): RequestStatus { + fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) @@ -62,7 +62,7 @@ class RequestProcessor( ) ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return RequestStatus.DUPLICATION + return } val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) @@ -115,11 +115,9 @@ class RequestProcessor( ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return requestStatus } - fun processDeletion(patientId: String): RequestStatus { + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() try { @@ -178,10 +176,6 @@ class RequestProcessor( } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return overallRequestStatus } catch (e: Exception) { requestRepository.save( Request( @@ -194,11 +188,8 @@ class RequestProcessor( report = Report("Fehler bei der Pseudonymisierung") ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return RequestStatus.ERROR } + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt index a2cc953..cf0e693 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.web import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.services.RequestProcessor import org.slf4j.LoggerFactory import org.springframework.http.ResponseEntity @@ -35,24 +34,16 @@ class MtbFileController( @PostMapping(path = ["/mtbfile"]) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - val requestStatus = requestProcessor.processMtbFile(mtbFile) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + return ResponseEntity.accepted().build() } @DeleteMapping(path = ["/mtbfile/{patientId}"]) fun deleteData(@PathVariable patientId: String): ResponseEntity { - val requestStatus = requestProcessor.processDeletion(patientId) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() } } \ No newline at end of file -- cgit v1.2.3 From 70d4fa2f0ff4b38757cabc967b3f38a63674ed47 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:53 +0200 Subject: Use duplication fingerprinting based on MTB file requests only --- src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 1 + 1 file changed, 1 insertion(+) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 7d110b1..afac40b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -48,6 +48,7 @@ class RequestProcessor( val lastRequestForPatient = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { -- cgit v1.2.3 From 7f8b21efd2273bb7b4ee0d93ff4988bade2fa610 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:23:16 +0200 Subject: Handle not parsable data quality reports --- .../dev/dnpm/etl/processor/monitoring/ReportService.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 6ee8ae9..8c31ede 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.monitoring import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.ObjectMapper @@ -33,9 +34,14 @@ class ReportService( } return try { objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues - } catch (e: JsonMappingException) { - e.printStackTrace() - listOf() + } catch (e: Exception) { + val otherIssue = + Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'") + return when (e) { + is JsonMappingException -> listOf(otherIssue) + is JsonParseException -> listOf(otherIssue) + else -> throw e + } } } -- cgit v1.2.3 From 577509e6f2d502d4dbbf1a1b526c43497fde56b3 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:53:09 +0200 Subject: Map 'status_code' and 'status code' to same data value --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index f0c91cb..d6c4da6 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.services.kafka +import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report @@ -81,7 +82,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, @JsonProperty("status_body") val statusBody: String ) } \ No newline at end of file -- cgit v1.2.3 From ac91620651daa2f9aa09709eaa0bb5a8f7222e71 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 12:59:53 +0200 Subject: Use Map as status body since it contains JSON --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index d6c4da6..547833c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -54,7 +54,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Warnungen über mangelhafte Daten", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -64,7 +64,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -83,6 +83,6 @@ class KafkaResponseProcessor( data class ResponseBody( @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: String + @JsonProperty("status_body") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 3dcee41569b462a4b938380b5ac3b208728fb358 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 15:14:49 +0200 Subject: Implement delete request using Apache Kafka This is implemented using a fake MTB file containing a rejected consent state and will be mapped to HTTP DELETE on kafka-to-bwhc consumer. --- .../etl/processor/output/KafkaMtbFileSender.kt | 35 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 55503cf..da25576 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -20,6 +20,8 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -42,16 +44,38 @@ class KafkaMtbFileSender( } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } - } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } } - // TODO not yet implemented override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { - return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + val dummyMtbFile = MtbFile.builder() + .withConsent( + Consent.builder() + .withPatient(request.patientId) + .withStatus(Consent.Status.REJECTED) + .build() + ) + .build() + + return try { + val result = kafkaTemplate.sendDefault( + header(request), + objectMapper.writeValueAsString(dummyMtbFile) + ) + + if (result.get() != null) { + logger.debug("Sent deletion request via KafkaMtbFileSender") + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { + logger.error("An error occurred sending to kafka", e) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + } } private fun header(request: MtbFileSender.MtbFileRequest): String { @@ -59,4 +83,9 @@ class KafkaMtbFileSender( "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } + + private fun header(request: MtbFileSender.DeleteRequest): String { + return "{\"pid\": \"${request.patientId}\", " + + "\"requestId\": \"${request.requestId}\"}" + } } \ No newline at end of file -- cgit v1.2.3 From ec76c775d9ab7c863cd0c55e6ba1232181f6775c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:04:31 +0200 Subject: Explicit producer topic configuration --- .../etl/processor/config/AppKafkaConfiguration.kt | 5 +++-- .../dnpm/etl/processor/output/KafkaMtbFileSender.kt | 20 ++++++++++++-------- .../services/kafka/KafkaResponseProcessor.kt | 13 +++++++++++++ 3 files changed, 28 insertions(+), 10 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index f81d3fb..7adcb02 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -43,9 +43,10 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, + kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } @Bean @@ -54,7 +55,7 @@ class AppKafkaConfiguration { kafkaTargetProperties: KafkaTargetProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index da25576..d903745 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.config.KafkaTargetProperties import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, + private val kafkaTargetProperties: KafkaTargetProperties, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -34,13 +36,14 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(request.mtbFile) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -61,14 +64,15 @@ class KafkaMtbFileSender( .build() return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(dummyMtbFile) ) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -78,13 +82,13 @@ class KafkaMtbFileSender( } } - private fun header(request: MtbFileSender.MtbFileRequest): String { + private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } - private fun header(request: MtbFileSender.DeleteRequest): String { + private fun key(request: MtbFileSender.DeleteRequest): String { return "{\"pid\": \"${request.patientId}\", " + "\"requestId\": \"${request.requestId}\"}" } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 547833c..fd047d0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -42,6 +42,9 @@ class KafkaResponseProcessor( val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + + println("${responseBody.statusCode}") + when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS @@ -69,6 +72,16 @@ class KafkaResponseProcessor( requestRepository.save(it) } + in 900..999 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", + objectMapper.writeValueAsString(responseBody.statusBody) + ) + requestRepository.save(it) + } + else -> { logger.error("Cannot process Kafka response: Unknown response code!") } -- cgit v1.2.3 From b14f2c1794fe41f9ec5e9e400d51c0fbf991953a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:18:16 +0200 Subject: Add information about 'no connection' responses --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index fd047d0..1e9263d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -43,8 +43,6 @@ class KafkaResponseProcessor( requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - println("${responseBody.statusCode}") - when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS -- cgit v1.2.3 From 459ad59c1d988a5b3ecc60d844f4fa6c9bce11f5 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 4 Aug 2023 11:43:23 +0200 Subject: Do not detect duplicates after deletion request --- .../kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index afac40b..bdf2827 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -46,12 +46,15 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val lastRequestForPatient = - requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + + val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { + val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE + + if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { requestRepository.save( Request( patientId = pseudonymized.patient.id, -- cgit v1.2.3 From bcc23f6b14436ba6f4585a583da6c236df68e25a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 14:50:12 +0200 Subject: Add RequestService to handle access to requests --- .../etl/processor/services/RequestProcessor.kt | 17 ++++--- .../dnpm/etl/processor/services/RequestService.kt | 56 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index bdf2827..e04e568 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -21,7 +21,10 @@ package dev.dnpm.etl.processor.services import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.* +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import org.apache.commons.codec.binary.Base32 @@ -35,7 +38,7 @@ import java.util.* class RequestProcessor( private val pseudonymizeService: PseudonymizeService, private val senders: List, - private val requestRepository: RequestRepository, + private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many ) { @@ -46,7 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } @@ -55,7 +58,7 @@ class RequestProcessor( val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { - requestRepository.save( + requestService.save( Request( patientId = pseudonymized.patient.id, pid = pid, @@ -99,7 +102,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = request.requestId, patientId = request.mtbFile.patient.id, @@ -165,7 +168,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = patientPseudonym, @@ -181,7 +184,7 @@ class RequestProcessor( ) ) } catch (e: Exception) { - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = "???", diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt new file mode 100644 index 0000000..0f69910 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -0,0 +1,56 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.springframework.stereotype.Service + +@Service +class RequestService( + private val requestRepository: RequestRepository +) { + + fun save(request: Request) = requestRepository.save(request) + + fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository + .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym) + + fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = + Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) + + fun isLastRequestDeletion(patientPseudonym: String) = + Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + + companion object { + + fun lastMtbFileRequestForPatientPseudonym(allRequests: List) = allRequests + .filter { it.type == RequestType.MTB_FILE } + .sortedByDescending { it.processedAt } + .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + + fun isLastRequestDeletion(allRequests: List) = allRequests + .maxByOrNull { it.processedAt }?.type == RequestType.DELETE + + } + +} \ No newline at end of file -- cgit v1.2.3 From 422441a3b39806016a952bf7bdff69e0834debca Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 16:46:02 +0200 Subject: Add tests for RequestProcessor --- .../dnpm/etl/processor/services/RequestProcessor.kt | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index e04e568..fcb0863 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -49,15 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) - - val lastMtbFileRequestForPatient = allRequests - .filter { it.type == RequestType.MTB_FILE } - .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - - val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE - - if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { + if (isDuplication(pseudonymized)) { requestService.save( Request( patientId = pseudonymized.patient.id, @@ -124,6 +116,16 @@ class RequestProcessor( statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } + private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { + val lastMtbFileRequestForPatient = + requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + + return null != lastMtbFileRequestForPatient + && !isLastRequestDeletion + && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile) + } + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() -- cgit v1.2.3 From 6ad6ee13a1cae8ed286e80b3a46c458e1052480b Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 19:16:59 +0200 Subject: Ignore unknown properties in DataQualityResponse --- src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 8c31ede..ae36705 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.monitoring +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonValue import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException @@ -46,8 +47,10 @@ class ReportService( } + @JsonIgnoreProperties(ignoreUnknown = true) private data class DataQualityReport(val issues: List) + @JsonIgnoreProperties(ignoreUnknown = true) data class Issue(val severity: Severity, val message: String) enum class Severity(@JsonValue val value: String) { -- cgit v1.2.3 From 7739afad1fc82f4ffe0debbebae58874f046d82d Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 08:13:27 +0200 Subject: Handle MTB File with rejected consent as deletion request --- .../dnpm/etl/processor/web/MtbFileController.kt | 49 ------------------- .../etl/processor/web/MtbFileRestController.kt | 55 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt deleted file mode 100644 index cf0e693..0000000 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor.web - -import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.services.RequestProcessor -import org.slf4j.LoggerFactory -import org.springframework.http.ResponseEntity -import org.springframework.web.bind.annotation.* - -@RestController -class MtbFileController( - private val requestProcessor: RequestProcessor, -) { - - private val logger = LoggerFactory.getLogger(MtbFileController::class.java) - - @PostMapping(path = ["/mtbfile"]) - fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - logger.debug("Accepted MTB File for processing") - requestProcessor.processMtbFile(mtbFile) - return ResponseEntity.accepted().build() - } - - @DeleteMapping(path = ["/mtbfile/{patientId}"]) - fun deleteData(@PathVariable patientId: String): ResponseEntity { - logger.debug("Accepted patient ID to process deletion") - requestProcessor.processDeletion(patientId) - return ResponseEntity.accepted().build() - } - -} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt new file mode 100644 index 0000000..9b441f6 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt @@ -0,0 +1,55 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.web + +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.services.RequestProcessor +import org.slf4j.LoggerFactory +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.* + +@RestController +class MtbFileRestController( + private val requestProcessor: RequestProcessor, +) { + + private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java) + + @PostMapping(path = ["/mtbfile"]) + fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { + if (mtbFile.consent.status == Consent.Status.ACTIVE) { + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + } else { + logger.debug("Accepted MTB File and process deletion") + requestProcessor.processDeletion(mtbFile.patient.id) + } + return ResponseEntity.accepted().build() + } + + @DeleteMapping(path = ["/mtbfile/{patientId}"]) + fun deleteData(@PathVariable patientId: String): ResponseEntity { + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() + } + +} \ No newline at end of file -- cgit v1.2.3 From 13bfa0018d6c9b48893ef96945659be9e7eec6c0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:20:20 +0200 Subject: Change endpoint configuration to select single endpoint * If REST endpoint is configured, it will be used * If Kafka endpoint is configured, it will be used * If both endpoints are configured, REST configuration has precedence and will be used --- .../dnpm/etl/processor/config/AppConfiguration.kt | 12 +---- .../etl/processor/config/AppKafkaConfiguration.kt | 8 ++++ .../etl/processor/config/AppRestConfiguration.kt | 52 ++++++++++++++++++++++ 3 files changed, 61 insertions(+), 11 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index cbba1f1..6b15fc0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,8 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.MtbFileSender -import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator @@ -38,9 +36,7 @@ import reactor.core.publisher.Sinks value = [ AppConfigProperties::class, PseudonymizeConfigProperties::class, - GPasConfigProperties::class, - RestTargetProperties::class, - KafkaTargetProperties::class + GPasConfigProperties::class ] ) class AppConfiguration { @@ -65,12 +61,6 @@ class AppConfiguration { return PseudonymizeService(generator, pseudonymizeConfigProperties) } - @ConditionalOnProperty(value = ["app.rest.uri"]) - @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { - return RestMtbFileSender(restTargetProperties) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 7adcb02..6d0254e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -24,10 +24,13 @@ import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.ContainerProperties @@ -38,14 +41,19 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer value = [KafkaTargetProperties::class] ) @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-5) class AppKafkaConfiguration { + private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java) + @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { + logger.info("Selected 'KafkaMtbFileSender'") return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt new file mode 100644 index 0000000..5e77a4f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -0,0 +1,52 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order + +@Configuration +@EnableConfigurationProperties( + value = [ + RestTargetProperties::class + ] +) +@ConditionalOnProperty(value = ["app.rest.uri"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-10) +class AppRestConfiguration { + + private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) + + @Bean + fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + logger.info("Selected 'RestMtbFileSender'") + return RestMtbFileSender(restTargetProperties) + } + +} + -- cgit v1.2.3 From 47830ed9f7774c84674e9399cd347d12424f4f42 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:34:23 +0200 Subject: Use single MtbFileSender --- .../etl/processor/services/RequestProcessor.kt | 110 +++++++++------------ 1 file changed, 49 insertions(+), 61 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index fcb0863..936c1bf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -37,7 +37,7 @@ import java.util.* @Service class RequestProcessor( private val pseudonymizeService: PseudonymizeService, - private val senders: List, + private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many @@ -66,32 +66,26 @@ class RequestProcessor( val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) - val responses = senders.map { - val responseStatus = it.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } - responseStatus + val responseStatus = sender.send(request) + if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { + logger.info( + "Sent file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) + } else { + logger.error( + "Error sending file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) } - val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { - RequestStatus.WARNING - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -104,9 +98,7 @@ class RequestProcessor( type = RequestType.MTB_FILE, report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", - responses.joinToString("\n") { it.reason }) - + RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null } @@ -132,42 +124,38 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responses = senders.map { - val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + when (responseStatus.status) { + MtbFileSender.ResponseStatus.SUCCESS -> { + logger.info( + "Sent delete for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + MtbFileSender.ResponseStatus.ERROR -> { + logger.error( + "Error deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + else -> { + logger.error( + "Unknown result on deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) } - responseStatus } - val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -176,9 +164,9 @@ class RequestProcessor( patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = overallRequestStatus, + status = requestStatus, type = RequestType.DELETE, - report = when (overallRequestStatus) { + report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null -- cgit v1.2.3 From 7f048e2483138deecc28208af42546097ef929d7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 12:26:57 +0200 Subject: Do not append custom prefix to gPAS pseudonym --- .../etl/processor/pseudonym/PseudonymizeService.kt | 36 ++-------------- .../dev/dnpm/etl/processor/pseudonym/extensions.kt | 50 ++++++++++++++++++++++ .../etl/processor/services/RequestProcessor.kt | 14 +++--- 3 files changed, 62 insertions(+), 38 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 1a79850..ab8ce2f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.pseudonym -import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties class PseudonymizeService( @@ -27,38 +26,11 @@ class PseudonymizeService( private val configProperties: PseudonymizeConfigProperties ) { - fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym = patientPseudonym(mtbFile.patient.id) - - mtbFile.episode.patient = patientPseudonym - mtbFile.carePlans.forEach { it.patient = patientPseudonym } - mtbFile.patient.id = patientPseudonym - mtbFile.claims.forEach { it.patient = patientPseudonym } - mtbFile.consent.patient = patientPseudonym - mtbFile.claimResponses.forEach { it.patient = patientPseudonym } - mtbFile.diagnoses.forEach { it.patient = patientPseudonym } - mtbFile.ecogStatus.forEach { it.patient = patientPseudonym } - mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } - mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReports.forEach { it.patient = patientPseudonym } - mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } - mtbFile.ngsReports.forEach { it.patient = patientPseudonym } - mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.responses.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - - return mtbFile - } - fun patientPseudonym(patientId: String): String { - return "${configProperties.prefix}_${generator.generate(patientId)}" + return when (generator) { + is GpasPseudonymGenerator -> generator.generate(patientId) + else -> "${configProperties.prefix}_${generator.generate(patientId)}" + } } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt new file mode 100644 index 0000000..580785d --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -0,0 +1,50 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.MtbFile + +infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { + val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id) + + this.episode.patient = patientPseudonym + this.carePlans.forEach { it.patient = patientPseudonym } + this.patient.id = patientPseudonym + this.claims.forEach { it.patient = patientPseudonym } + this.consent.patient = patientPseudonym + this.claimResponses.forEach { it.patient = patientPseudonym } + this.diagnoses.forEach { it.patient = patientPseudonym } + this.ecogStatus.forEach { it.patient = patientPseudonym } + this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } + this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } + this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } + this.histologyReports.forEach { it.patient = patientPseudonym } + this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } + this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.ngsReports.forEach { it.patient = patientPseudonym } + this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.rebiopsyRequests.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.responses.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 936c1bf..6465e82 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -27,6 +27,7 @@ import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory @@ -47,12 +48,13 @@ class RequestProcessor( fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id - val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - if (isDuplication(pseudonymized)) { + mtbFile pseudonymizeWith pseudonymizeService + + if (isDuplication(mtbFile)) { requestService.save( Request( - patientId = pseudonymized.patient.id, + patientId = mtbFile.patient.id, pid = pid, fingerprint = fingerprint(mtbFile), status = RequestStatus.DUPLICATION, @@ -64,19 +66,19 @@ class RequestProcessor( return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) + val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) val responseStatus = sender.send(request) if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { logger.info( "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } else { logger.error( "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } -- cgit v1.2.3 From 1a640ff9dff1cc182c4ffc1d00dff370e42a25de Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:15:20 +0200 Subject: Decouple request and response processing --- .../etl/processor/config/AppKafkaConfiguration.kt | 6 +- .../etl/processor/output/KafkaMtbFileSender.kt | 13 +- .../dev/dnpm/etl/processor/output/MtbFileSender.kt | 21 +++- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 16 +-- .../etl/processor/services/RequestProcessor.kt | 135 +++++++-------------- .../etl/processor/services/ResponseProcessor.kt | 96 +++++++++++++++ .../services/kafka/KafkaResponseProcessor.kt | 85 ++++++------- 7 files changed, 211 insertions(+), 161 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 6d0254e..309ff2d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order @@ -70,10 +70,10 @@ class AppKafkaConfiguration { @Bean fun kafkaResponseProcessor( - requestRepository: RequestRepository, + applicationEventPublisher: ApplicationEventPublisher, objectMapper: ObjectMapper ): KafkaResponseProcessor { - return KafkaResponseProcessor(requestRepository, objectMapper) + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index d903745..9448e29 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -43,13 +44,13 @@ class KafkaMtbFileSender( ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } @@ -72,13 +73,13 @@ class KafkaMtbFileSender( if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..24cdc49 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,13 +20,13 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { @@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 6465e82..d2f8619 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service @@ -41,73 +42,54 @@ class RequestProcessor( private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many + private val applicationEventPublisher: ApplicationEventPublisher ) { private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) + + requestService.save( + Request( + uuid = requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE + ) + ) + if (isDuplication(mtbFile)) { - requestService.save( - Request( - patientId = mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) - val responseStatus = sender.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { @@ -126,55 +108,31 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = requestStatus, - type = RequestType.DELETE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE + ) + ) + + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } catch (e: Exception) { requestService.save( Request( @@ -188,7 +146,6 @@ class RequestProcessor( ) ) } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..d7ad86f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,96 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many, + private val objectMapper: ObjectMapper +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response code!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional = Optional.empty() +) \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 1e9263d..ef880f4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.Report -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.kafka.listener.MessageListener import java.time.Instant +import java.util.* class KafkaResponseProcessor( - private val requestRepository: RequestRepository, + private val eventPublisher: ApplicationEventPublisher, private val objectMapper: ObjectMapper ) : MessageListener { @@ -39,55 +41,44 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) - requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + } catch (e: Exception) { + Optional.empty() + }.ifPresentOrElse({ responseKey -> + val event = try { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - when (responseBody.statusCode) { - 200 -> { - it.status = RequestStatus.SUCCESS - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - requestRepository.save(it) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - 201 -> { - it.status = RequestStatus.WARNING - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) + else -> { + logger.error("Kafka response: Unknown response code!") + Optional.empty() + } } - - 400, 422 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - in 900..999 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - else -> { - logger.error("Cannot process Kafka response: Unknown response code!") - } - } + ) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + RequestStatus.ERROR, + Optional.of("Cannot process Kafka response") + ) } - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - } + eventPublisher.publishEvent(event) + }, { + logger.error("No response key in Kafka response") + }) } data class ResponseKey(val requestId: String) -- cgit v1.2.3 From 2b42a4d262a846feb1f82facbb151be9cabb57b4 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 12:11:39 +0200 Subject: Tests for RestMtbFileSender --- .../dev/dnpm/etl/processor/config/AppRestConfiguration.kt | 10 ++++++++-- .../kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt | 7 ++++--- 2 files changed, 12 insertions(+), 5 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index 5e77a4f..a830597 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -28,6 +28,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order +import org.springframework.web.client.RestTemplate @Configuration @EnableConfigurationProperties( @@ -43,9 +44,14 @@ class AppRestConfiguration { private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + fun restTemplate(): RestTemplate { + return RestTemplate() + } + + @Bean + fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender { logger.info("Selected 'RestMtbFileSender'") - return RestMtbFileSender(restTargetProperties) + return RestMtbFileSender(restTemplate, restTargetProperties) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 24cdc49..f80ff69 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -28,12 +28,13 @@ import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { +class RestMtbFileSender( + private val restTemplate: RestTemplate, + private val restTargetProperties: RestTargetProperties +) : MtbFileSender { private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java) - private val restTemplate = RestTemplate() - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { try { val headers = HttpHeaders() -- cgit v1.2.3 From 002b0618cf813d48bbff2d287e16f607a4c73d73 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 13:35:35 +0200 Subject: Add tests for KafkaMtbFileSender --- src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 9448e29..e7f9769 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -50,7 +50,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } @@ -79,7 +79,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } -- cgit v1.2.3 From cb9c5904729c90b86357d0668604b74f4f4e61f7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:13:45 +0200 Subject: Issue #2: Do not serialize JSON string as custom string (#4) In addition to that, if REST request did not contain a response body, use empty string as data quality report string.--- src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt | 2 +- src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index f80ff69..1c59f5c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -50,7 +50,7 @@ class RestMtbFileSender( return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(response.statusCode.asRequestStatus()) + return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index d7ad86f..f2e9e2e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -55,14 +55,14 @@ class ResponseProcessor( RequestStatus.WARNING -> { it.report = Report( "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } RequestStatus.ERROR -> { it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } -- cgit v1.2.3 From 6ecb439007b4fa6dec9af1e0334b89fd235a97be Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:22:54 +0200 Subject: Issue #3: Detect the request type of request with last known status (#5) --- .../kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 2 +- src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index d2f8619..34156f7 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -95,7 +95,7 @@ class RequestProcessor( private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { val lastMtbFileRequestForPatient = requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) - val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id) return null != lastMtbFileRequestForPatient && !isLastRequestDeletion diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt index 0f69910..e0043d2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -38,8 +38,8 @@ class RequestService( fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) - fun isLastRequestDeletion(patientPseudonym: String) = - Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) = + Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym)) companion object { @@ -48,7 +48,8 @@ class RequestService( .sortedByDescending { it.processedAt } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - fun isLastRequestDeletion(allRequests: List) = allRequests + fun isLastRequestWithKnownStatusDeletion(allRequests: List) = allRequests + .filter { it.status != RequestStatus.UNKNOWN } .maxByOrNull { it.processedAt }?.type == RequestType.DELETE } -- cgit v1.2.3 From 72295202ec37a76b90a919e39ae094bb7e56d202 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Aug 2023 22:19:29 +0200 Subject: Code cleanup --- .../kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt | 2 +- src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt | 2 +- src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 3 --- .../kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt | 4 +--- .../kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt | 6 +++--- 5 files changed, 6 insertions(+), 11 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 6502a1b..06e730b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(AppConfigProperties.NAME) data class AppConfigProperties( - var bwhc_uri: String?, + var bwhcUri: String?, var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt index 580785d..c0050a4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -38,7 +38,7 @@ infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { this.histologyReports.forEach { it.patient = patientPseudonym } this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } } this.ngsReports.forEach { it.patient = patientPseudonym } this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } this.rebiopsyRequests.forEach { it.patient = patientPseudonym } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 34156f7..3cd912c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -30,7 +30,6 @@ import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils -import org.slf4j.LoggerFactory import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Instant @@ -45,8 +44,6 @@ class RequestProcessor( private val applicationEventPublisher: ApplicationEventPublisher ) { - private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile) { val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index f2e9e2e..677443a 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.services -import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus @@ -33,8 +32,7 @@ import java.util.* @Service class ResponseProcessor( private val requestRepository: RequestRepository, - private val statisticsUpdateProducer: Sinks.Many, - private val objectMapper: ObjectMapper + private val statisticsUpdateProducer: Sinks.Many ) { private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt index a418772..6f0e820 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt @@ -83,9 +83,9 @@ class StatisticsRestController( .groupBy { formatter.format(it.processedAt) } .map { val requestList = it.value - .groupBy { it.status } - .map { - Pair(it.key, it.value.size) + .groupBy { request -> request.status } + .map { request -> + Pair(request.key, request.value.size) } .toMap() Pair( -- cgit v1.2.3 From 2eb5cc61b9809523d51d1fa7af7a1afc1fdb7f0c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 10:58:17 +0200 Subject: Change Kafka response body JSON alias --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index ef880f4..68e3611 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -84,7 +84,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: Map + @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, + @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 8dc82225a4cd45a315fac3efe4d76513e6d536fc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 16 Aug 2023 15:25:46 +0200 Subject: Issue #7: Send and expect requestId in record body, not in record key (#8) --- .../etl/processor/output/KafkaMtbFileSender.kt | 12 ++--- .../etl/processor/services/ResponseProcessor.kt | 2 +- .../services/kafka/KafkaResponseProcessor.kt | 54 +++++++++------------- 3 files changed, 29 insertions(+), 39 deletions(-) (limited to 'src/main/kotlin/dev/dnpm/etl/processor') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index e7f9769..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -40,7 +40,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(request.mtbFile) + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") @@ -68,7 +68,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(dummyMtbFile) + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) if (result.get() != null) { @@ -85,12 +85,12 @@ class KafkaMtbFileSender( private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId}\", " + - "\"requestId\": \"${request.requestId}\"}" + return "{\"pid\": \"${request.patientId}\"}" } + + data class Data(val requestId: String, val content: MtbFile) } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index 677443a..4048348 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -71,7 +71,7 @@ class ResponseProcessor( } else -> { - logger.error("Cannot process response: Unknown response code!") + logger.error("Cannot process response: Unknown response!") return@ifPresentOrElse } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 68e3611..a29010f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -41,50 +41,40 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) Optional.empty() - }.ifPresentOrElse({ responseKey -> - val event = try { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - responseBody.statusCode.asRequestStatus(), - when (responseBody.statusCode.asRequestStatus()) { - RequestStatus.SUCCESS -> { - Optional.empty() - } + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - RequestStatus.WARNING, RequestStatus.ERROR -> { - Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - else -> { - logger.error("Kafka response: Unknown response code!") - Optional.empty() - } + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() } - ) - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - RequestStatus.ERROR, - Optional.of("Cannot process Kafka response") - ) - } + } + ) eventPublisher.publishEvent(event) }, { - logger.error("No response key in Kafka response") + logger.error("No requestId in Kafka response") }) } - data class ResponseKey(val requestId: String) - data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) + } \ No newline at end of file -- cgit v1.2.3