diff options
Diffstat (limited to 'src/main/kotlin/dev')
18 files changed, 598 insertions, 235 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt index 0c4ab68..5d28c97 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt @@ -28,4 +28,3 @@ class EtlProcessorApplication fun main(args: Array<String>) { runApplication<EtlProcessorApplication>(*args) } - diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 64be70d..06e730b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(AppConfigProperties.NAME) data class AppConfigProperties( - var bwhc_uri: String?, + var bwhcUri: String?, var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN ) { companion object { @@ -48,7 +48,7 @@ data class GPasConfigProperties( val password: String?, val sslCaLocation: String?, -) { + ) { companion object { const val NAME = "app.pseudonymize.gpas" } @@ -66,6 +66,8 @@ data class RestTargetProperties( @ConfigurationProperties(KafkaTargetProperties.NAME) data class KafkaTargetProperties( val topic: String = "etl-processor", + val responseTopic: String = "${topic}_response", + val groupId: String = "${topic}_group", val servers: String = "" ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index c677f2b..6b15fc0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,9 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.KafkaMtbFileSender -import dev.dnpm.etl.processor.output.MtbFileSender -import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator @@ -32,7 +29,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks @Configuration @@ -40,9 +36,7 @@ import reactor.core.publisher.Sinks value = [ AppConfigProperties::class, PseudonymizeConfigProperties::class, - GPasConfigProperties::class, - RestTargetProperties::class, - KafkaTargetProperties::class + GPasConfigProperties::class ] ) class AppConfiguration { @@ -60,25 +54,13 @@ class AppConfiguration { } @Bean - fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService { + fun pseudonymizeService( + generator: Generator, + pseudonymizeConfigProperties: PseudonymizeConfigProperties + ): PseudonymizeService { return PseudonymizeService(generator, pseudonymizeConfigProperties) } - @ConditionalOnProperty(value = ["app.rest.uri"]) - @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { - return RestMtbFileSender(restTargetProperties) - } - - @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) - @Bean - fun kafkaMtbFileSender( - kafkaTemplate: KafkaTemplate<String, String>, - objectMapper: ObjectMapper - ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt new file mode 100644 index 0000000..309ff2d --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -0,0 +1,79 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer + +@Configuration +@EnableConfigurationProperties( + value = [KafkaTargetProperties::class] +) +@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-5) +class AppKafkaConfiguration { + + private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java) + + @Bean + fun kafkaMtbFileSender( + kafkaTemplate: KafkaTemplate<String, String>, + kafkaTargetProperties: KafkaTargetProperties, + objectMapper: ObjectMapper + ): MtbFileSender { + logger.info("Selected 'KafkaMtbFileSender'") + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) + } + + @Bean + fun kafkaListenerContainer( + consumerFactory: ConsumerFactory<String, String>, + kafkaTargetProperties: KafkaTargetProperties, + kafkaResponseProcessor: KafkaResponseProcessor + ): KafkaMessageListenerContainer<String, String> { + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) + containerProperties.messageListener = kafkaResponseProcessor + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + fun kafkaResponseProcessor( + applicationEventPublisher: ApplicationEventPublisher, + objectMapper: ObjectMapper + ): KafkaResponseProcessor { + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) + } + +}
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt new file mode 100644 index 0000000..a830597 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -0,0 +1,58 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.config + +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order +import org.springframework.web.client.RestTemplate + +@Configuration +@EnableConfigurationProperties( + value = [ + RestTargetProperties::class + ] +) +@ConditionalOnProperty(value = ["app.rest.uri"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-10) +class AppRestConfiguration { + + private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) + + @Bean + fun restTemplate(): RestTemplate { + return RestTemplate() + } + + @Bean + fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender { + logger.info("Selected 'RestMtbFileSender'") + return RestMtbFileSender(restTemplate, restTargetProperties) + } + +} + diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 6ee8ae9..ae36705 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -19,7 +19,9 @@ package dev.dnpm.etl.processor.monitoring +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.ObjectMapper @@ -33,15 +35,22 @@ class ReportService( } return try { objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues - } catch (e: JsonMappingException) { - e.printStackTrace() - listOf() + } catch (e: Exception) { + val otherIssue = + Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'") + return when (e) { + is JsonMappingException -> listOf(otherIssue) + is JsonParseException -> listOf(otherIssue) + else -> throw e + } } } + @JsonIgnoreProperties(ignoreUnknown = true) private data class DataQualityReport(val issues: List<Issue>) + @JsonIgnoreProperties(ignoreUnknown = true) data class Issue(val severity: Severity, val message: String) enum class Severity(@JsonValue val value: String) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt index ecd8219..c1d4d43 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt @@ -36,9 +36,9 @@ data class Request( val patientId: String, val pid: String, val fingerprint: String, - val status: RequestStatus, val type: RequestType, - val processedAt: Instant = Instant.now(), + var status: RequestStatus, + var processedAt: Instant = Instant.now(), @Embedded.Nullable var report: Report? = null ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 55503cf..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -20,11 +20,16 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate<String, String>, + private val kafkaTargetProperties: KafkaTargetProperties, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -32,31 +37,60 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.sendDefault( - header(request), - objectMapper.writeValueAsString(request.mtbFile) + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } - } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } - // TODO not yet implemented override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { - return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + val dummyMtbFile = MtbFile.builder() + .withConsent( + Consent.builder() + .withPatient(request.patientId) + .withStatus(Consent.Status.REJECTED) + .build() + ) + .build() + + return try { + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) + ) + + if (result.get() != null) { + logger.debug("Sent deletion request via KafkaMtbFileSender") + MtbFileSender.Response(RequestStatus.UNKNOWN) + } else { + MtbFileSender.Response(RequestStatus.ERROR) + } + } catch (e: Exception) { + logger.error("An error occurred sending to kafka", e) + MtbFileSender.Response(RequestStatus.ERROR) + } } - private fun header(request: MtbFileSender.MtbFileRequest): String { + private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } + + private fun key(request: MtbFileSender.DeleteRequest): String { + return "{\"pid\": \"${request.patientId}\"}" + } + + data class Data(val requestId: String, val content: MtbFile) }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..1c59f5c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,20 +20,21 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder -class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { +class RestMtbFileSender( + private val restTemplate: RestTemplate, + private val restTargetProperties: RestTargetProperties +) : MtbFileSender { private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java) - private val restTemplate = RestTemplate() - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { try { val headers = HttpHeaders() @@ -46,21 +47,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +71,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 1a79850..ab8ce2f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.pseudonym -import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties class PseudonymizeService( @@ -27,38 +26,11 @@ class PseudonymizeService( private val configProperties: PseudonymizeConfigProperties ) { - fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym = patientPseudonym(mtbFile.patient.id) - - mtbFile.episode.patient = patientPseudonym - mtbFile.carePlans.forEach { it.patient = patientPseudonym } - mtbFile.patient.id = patientPseudonym - mtbFile.claims.forEach { it.patient = patientPseudonym } - mtbFile.consent.patient = patientPseudonym - mtbFile.claimResponses.forEach { it.patient = patientPseudonym } - mtbFile.diagnoses.forEach { it.patient = patientPseudonym } - mtbFile.ecogStatus.forEach { it.patient = patientPseudonym } - mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } - mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReports.forEach { it.patient = patientPseudonym } - mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } - mtbFile.ngsReports.forEach { it.patient = patientPseudonym } - mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.responses.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - - return mtbFile - } - fun patientPseudonym(patientId: String): String { - return "${configProperties.prefix}_${generator.generate(patientId)}" + return when (generator) { + is GpasPseudonymGenerator -> generator.generate(patientId) + else -> "${configProperties.prefix}_${generator.generate(patientId)}" + } } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt new file mode 100644 index 0000000..c0050a4 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -0,0 +1,50 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.MtbFile + +infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { + val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id) + + this.episode.patient = patientPseudonym + this.carePlans.forEach { it.patient = patientPseudonym } + this.patient.id = patientPseudonym + this.claims.forEach { it.patient = patientPseudonym } + this.consent.patient = patientPseudonym + this.claimResponses.forEach { it.patient = patientPseudonym } + this.diagnoses.forEach { it.patient = patientPseudonym } + this.ecogStatus.forEach { it.patient = patientPseudonym } + this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } + this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } + this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } + this.histologyReports.forEach { it.patient = patientPseudonym } + this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } + this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } } + this.ngsReports.forEach { it.patient = patientPseudonym } + this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.rebiopsyRequests.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.responses.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } +}
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 8588ebe..3cd912c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -21,169 +21,117 @@ package dev.dnpm.etl.processor.services import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.* +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils -import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service class RequestProcessor( private val pseudonymizeService: PseudonymizeService, - private val senders: List<MtbFileSender>, - private val requestRepository: RequestRepository, + private val sender: MtbFileSender, + private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many<Any> + private val applicationEventPublisher: ApplicationEventPublisher ) { - private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - - fun processMtbFile(mtbFile: MtbFile): RequestStatus { + fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id - val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val lastRequestForPatient = - requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) - .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + mtbFile pseudonymizeWith pseudonymizeService - if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { - requestRepository.save( - Request( - patientId = pseudonymized.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") - ) - ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return RequestStatus.DUPLICATION - } + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) + requestService.save( + Request( + uuid = requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE + ) + ) - val responses = senders.map { - val responseStatus = it.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName + if (isDuplication(mtbFile)) { + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } - responseStatus + ) + return } - val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { - RequestStatus.WARNING - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN - } + val responseStatus = sender.send(request) - requestRepository.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", - responses.joinToString("\n") { it.reason }) - - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { + val lastMtbFileRequestForPatient = + requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id) - return requestStatus + return null != lastMtbFileRequestForPatient + && !isLastRequestDeletion + && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile) } - fun processDeletion(patientId: String): RequestStatus { + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responses = senders.map { - val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - } - responseStatus - } - - val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN - } - - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = overallRequestStatus, - type = RequestType.DELETE, - report = when (overallRequestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null - } + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() + } + ) + ) - return overallRequestStatus } catch (e: Exception) { - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = "???", @@ -194,10 +142,6 @@ class RequestProcessor( report = Report("Fehler bei der Pseudonymisierung") ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return RequestStatus.ERROR } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt new file mode 100644 index 0000000..e0043d2 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -0,0 +1,57 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.springframework.stereotype.Service + +@Service +class RequestService( + private val requestRepository: RequestRepository +) { + + fun save(request: Request) = requestRepository.save(request) + + fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository + .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym) + + fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = + Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) + + fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) = + Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + + companion object { + + fun lastMtbFileRequestForPatientPseudonym(allRequests: List<Request>) = allRequests + .filter { it.type == RequestType.MTB_FILE } + .sortedByDescending { it.processedAt } + .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + + fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests + .filter { it.status != RequestStatus.UNKNOWN } + .maxByOrNull { it.processedAt }?.type == RequestType.DELETE + + } + +}
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..4048348 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,94 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many<Any> +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + event.body.orElse("") + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + event.body.orElse("") + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional<String> = Optional.empty() +)
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt new file mode 100644 index 0000000..a29010f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -0,0 +1,80 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.annotation.JsonAlias +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher +import org.springframework.kafka.listener.MessageListener +import java.time.Instant +import java.util.* + +class KafkaResponseProcessor( + private val eventPublisher: ApplicationEventPublisher, + private val objectMapper: ObjectMapper +) : MessageListener<String, String> { + + private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java) + + override fun onMessage(data: ConsumerRecord<String, String>) { + try { + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + Optional.empty() + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } + + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } + + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() + } + } + ) + eventPublisher.publishEvent(event) + }, { + logger.error("No requestId in Kafka response") + }) + } + + data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, + @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, + @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any> + ) + +}
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt index a2cc953..9b441f6 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt @@ -19,40 +19,37 @@ package dev.dnpm.etl.processor.web +import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.services.RequestProcessor import org.slf4j.LoggerFactory import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.* @RestController -class MtbFileController( +class MtbFileRestController( private val requestProcessor: RequestProcessor, ) { - private val logger = LoggerFactory.getLogger(MtbFileController::class.java) + private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java) @PostMapping(path = ["/mtbfile"]) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Void> { - val requestStatus = requestProcessor.processMtbFile(mtbFile) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() + if (mtbFile.consent.status == Consent.Status.ACTIVE) { + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) } else { - ResponseEntity.noContent().build() + logger.debug("Accepted MTB File and process deletion") + requestProcessor.processDeletion(mtbFile.patient.id) } + return ResponseEntity.accepted().build() } @DeleteMapping(path = ["/mtbfile/{patientId}"]) fun deleteData(@PathVariable patientId: String): ResponseEntity<Void> { - val requestStatus = requestProcessor.processDeletion(patientId) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt index a418772..6f0e820 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt @@ -83,9 +83,9 @@ class StatisticsRestController( .groupBy { formatter.format(it.processedAt) } .map { val requestList = it.value - .groupBy { it.status } - .map { - Pair(it.key, it.value.size) + .groupBy { request -> request.status } + .map { request -> + Pair(request.key, request.value.size) } .toMap() Pair( |
