From 51cf7a7917d7376d1e7c685b9c0e56d8929ad9e1 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 11:52:11 +0200 Subject: Add processor to handle responses from Kafka topic --- README.md | 6 +- .../dnpm/etl/processor/EtlProcessorApplication.kt | 1 - .../etl/processor/config/AppConfigProperties.kt | 4 +- .../dnpm/etl/processor/config/AppConfiguration.kt | 16 +--- .../etl/processor/config/AppKafkaConfiguration.kt | 70 +++++++++++++++++ .../dev/dnpm/etl/processor/monitoring/Request.kt | 4 +- .../services/kafka/KafkaResponseProcessor.kt | 87 ++++++++++++++++++++++ src/main/resources/application-dev.yml | 9 ++- 8 files changed, 176 insertions(+), 21 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt diff --git a/README.md b/README.md index db6ae44..1d5a7f3 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,11 @@ Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic übermittelt wird: -* `APP_KAFKA_TOPIC`: Zu verwendendes Topic +* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Versenden von Anfragen +* `APP_KAFKA_RESPONSE_TOPIC`: Topic mit Antworten über den Erfolg des Versendens. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_response". +* `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group". * `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste +Wird keine Rückantwort über Apache Kafka empfangen und gibt es keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`. + Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt index 0c4ab68..5d28c97 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt @@ -28,4 +28,3 @@ class EtlProcessorApplication fun main(args: Array) { runApplication(*args) } - diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 64be70d..6502a1b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -48,7 +48,7 @@ data class GPasConfigProperties( val password: String?, val sslCaLocation: String?, -) { + ) { companion object { const val NAME = "app.pseudonymize.gpas" } @@ -66,6 +66,8 @@ data class RestTargetProperties( @ConfigurationProperties(KafkaTargetProperties.NAME) data class KafkaTargetProperties( val topic: String = "etl-processor", + val responseTopic: String = "${topic}_response", + val groupId: String = "${topic}_group", val servers: String = "" ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index c677f2b..cbba1f1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator @@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks @Configuration @@ -60,7 +58,10 @@ class AppConfiguration { } @Bean - fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService { + fun pseudonymizeService( + generator: Generator, + pseudonymizeConfigProperties: PseudonymizeConfigProperties + ): PseudonymizeService { return PseudonymizeService(generator, pseudonymizeConfigProperties) } @@ -70,15 +71,6 @@ class AppConfiguration { return RestMtbFileSender(restTargetProperties) } - @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) - @Bean - fun kafkaMtbFileSender( - kafkaTemplate: KafkaTemplate, - objectMapper: ObjectMapper - ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt new file mode 100644 index 0000000..f81d3fb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer + +@Configuration +@EnableConfigurationProperties( + value = [KafkaTargetProperties::class] +) +@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +class AppKafkaConfiguration { + + @Bean + fun kafkaMtbFileSender( + kafkaTemplate: KafkaTemplate, + objectMapper: ObjectMapper + ): MtbFileSender { + return KafkaMtbFileSender(kafkaTemplate, objectMapper) + } + + @Bean + fun kafkaListenerContainer( + consumerFactory: ConsumerFactory, + kafkaTargetProperties: KafkaTargetProperties, + kafkaResponseProcessor: KafkaResponseProcessor + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + containerProperties.messageListener = kafkaResponseProcessor + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + fun kafkaResponseProcessor( + requestRepository: RequestRepository, + objectMapper: ObjectMapper + ): KafkaResponseProcessor { + return KafkaResponseProcessor(requestRepository, objectMapper) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt index ecd8219..c1d4d43 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt @@ -36,9 +36,9 @@ data class Request( val patientId: String, val pid: String, val fingerprint: String, - val status: RequestStatus, val type: RequestType, - val processedAt: Instant = Instant.now(), + var status: RequestStatus, + var processedAt: Instant = Instant.now(), @Embedded.Nullable var report: Report? = null ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt new file mode 100644 index 0000000..f0c91cb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -0,0 +1,87 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.LoggerFactory +import org.springframework.kafka.listener.MessageListener +import java.time.Instant + +class KafkaResponseProcessor( + private val requestRepository: RequestRepository, + private val objectMapper: ObjectMapper +) : MessageListener { + + private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java) + + override fun onMessage(data: ConsumerRecord) { + try { + val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) + requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + when (responseBody.statusCode) { + 200 -> { + it.status = RequestStatus.SUCCESS + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + requestRepository.save(it) + } + + 201 -> { + it.status = RequestStatus.WARNING + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Warnungen über mangelhafte Daten", + responseBody.statusBody + ) + requestRepository.save(it) + } + + 400, 422 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + responseBody.statusBody + ) + requestRepository.save(it) + } + + else -> { + logger.error("Cannot process Kafka response: Unknown response code!") + } + } + } + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + } + } + + data class ResponseKey(val requestId: String) + + data class ResponseBody( + @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_body") val statusBody: String + ) +} \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 99e4bbf..551f3f8 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -5,10 +5,11 @@ spring: app: rest: - uri: http://localhost:9000/bwhc/etl/api/MTBFile - #kafka: - # topic: test - # servers: kafka:9092 + uri: http://localhost:9000/bwhc/etl/api + kafka: + topic: test + response-topic: test-response + servers: kafka:9092 server: port: 8000 -- cgit v1.2.3 From 35cb258b13543b37ce061f78eef4427e542ca72a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:18 +0200 Subject: Do not return specific status code based on remote status code --- .../dnpm/etl/processor/services/RequestProcessor.kt | 17 ++++------------- .../dev/dnpm/etl/processor/web/MtbFileController.kt | 21 ++++++--------------- src/main/resources/application.yml | 2 ++ 3 files changed, 12 insertions(+), 28 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 8588ebe..7d110b1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -42,7 +42,7 @@ class RequestProcessor( private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile): RequestStatus { + fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) @@ -62,7 +62,7 @@ class RequestProcessor( ) ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return RequestStatus.DUPLICATION + return } val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) @@ -115,11 +115,9 @@ class RequestProcessor( ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return requestStatus } - fun processDeletion(patientId: String): RequestStatus { + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() try { @@ -178,10 +176,6 @@ class RequestProcessor( } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return overallRequestStatus } catch (e: Exception) { requestRepository.save( Request( @@ -194,11 +188,8 @@ class RequestProcessor( report = Report("Fehler bei der Pseudonymisierung") ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return RequestStatus.ERROR } + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt index a2cc953..cf0e693 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.web import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.services.RequestProcessor import org.slf4j.LoggerFactory import org.springframework.http.ResponseEntity @@ -35,24 +34,16 @@ class MtbFileController( @PostMapping(path = ["/mtbfile"]) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - val requestStatus = requestProcessor.processMtbFile(mtbFile) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + return ResponseEntity.accepted().build() } @DeleteMapping(path = ["/mtbfile/{patientId}"]) fun deleteData(@PathVariable patientId: String): ResponseEntity { - val requestStatus = requestProcessor.processDeletion(patientId) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() } } \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 39acb37..72edde6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,5 +3,7 @@ spring: bootstrap-servers: ${app.kafka.servers} template: default-topic: ${app.kafka.topic} + consumer: + group-id: ${app.kafka.group-id} flyway: locations: "classpath:db/migration/{vendor}" \ No newline at end of file -- cgit v1.2.3 From 70d4fa2f0ff4b38757cabc967b3f38a63674ed47 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:53 +0200 Subject: Use duplication fingerprinting based on MTB file requests only --- src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 7d110b1..afac40b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -48,6 +48,7 @@ class RequestProcessor( val lastRequestForPatient = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { -- cgit v1.2.3 From 7f8b21efd2273bb7b4ee0d93ff4988bade2fa610 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:23:16 +0200 Subject: Handle not parsable data quality reports --- .../dev/dnpm/etl/processor/monitoring/ReportService.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 6ee8ae9..8c31ede 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.monitoring import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.ObjectMapper @@ -33,9 +34,14 @@ class ReportService( } return try { objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues - } catch (e: JsonMappingException) { - e.printStackTrace() - listOf() + } catch (e: Exception) { + val otherIssue = + Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'") + return when (e) { + is JsonMappingException -> listOf(otherIssue) + is JsonParseException -> listOf(otherIssue) + else -> throw e + } } } -- cgit v1.2.3 From 577509e6f2d502d4dbbf1a1b526c43497fde56b3 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:53:09 +0200 Subject: Map 'status_code' and 'status code' to same data value --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index f0c91cb..d6c4da6 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.services.kafka +import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report @@ -81,7 +82,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, @JsonProperty("status_body") val statusBody: String ) } \ No newline at end of file -- cgit v1.2.3 From ac91620651daa2f9aa09709eaa0bb5a8f7222e71 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 12:59:53 +0200 Subject: Use Map as status body since it contains JSON --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index d6c4da6..547833c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -54,7 +54,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Warnungen über mangelhafte Daten", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -64,7 +64,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -83,6 +83,6 @@ class KafkaResponseProcessor( data class ResponseBody( @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: String + @JsonProperty("status_body") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 3dcee41569b462a4b938380b5ac3b208728fb358 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 15:14:49 +0200 Subject: Implement delete request using Apache Kafka This is implemented using a fake MTB file containing a rejected consent state and will be mapped to HTTP DELETE on kafka-to-bwhc consumer. --- .../etl/processor/output/KafkaMtbFileSender.kt | 35 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 55503cf..da25576 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -20,6 +20,8 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -42,16 +44,38 @@ class KafkaMtbFileSender( } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } - } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } } - // TODO not yet implemented override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { - return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + val dummyMtbFile = MtbFile.builder() + .withConsent( + Consent.builder() + .withPatient(request.patientId) + .withStatus(Consent.Status.REJECTED) + .build() + ) + .build() + + return try { + val result = kafkaTemplate.sendDefault( + header(request), + objectMapper.writeValueAsString(dummyMtbFile) + ) + + if (result.get() != null) { + logger.debug("Sent deletion request via KafkaMtbFileSender") + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { + logger.error("An error occurred sending to kafka", e) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + } } private fun header(request: MtbFileSender.MtbFileRequest): String { @@ -59,4 +83,9 @@ class KafkaMtbFileSender( "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } + + private fun header(request: MtbFileSender.DeleteRequest): String { + return "{\"pid\": \"${request.patientId}\", " + + "\"requestId\": \"${request.requestId}\"}" + } } \ No newline at end of file -- cgit v1.2.3 From ec76c775d9ab7c863cd0c55e6ba1232181f6775c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:04:31 +0200 Subject: Explicit producer topic configuration --- .../etl/processor/config/AppKafkaConfiguration.kt | 5 +++-- .../dnpm/etl/processor/output/KafkaMtbFileSender.kt | 20 ++++++++++++-------- .../services/kafka/KafkaResponseProcessor.kt | 13 +++++++++++++ src/main/resources/application.yml | 2 -- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index f81d3fb..7adcb02 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -43,9 +43,10 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, + kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } @Bean @@ -54,7 +55,7 @@ class AppKafkaConfiguration { kafkaTargetProperties: KafkaTargetProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index da25576..d903745 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.config.KafkaTargetProperties import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, + private val kafkaTargetProperties: KafkaTargetProperties, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -34,13 +36,14 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(request.mtbFile) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -61,14 +64,15 @@ class KafkaMtbFileSender( .build() return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(dummyMtbFile) ) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -78,13 +82,13 @@ class KafkaMtbFileSender( } } - private fun header(request: MtbFileSender.MtbFileRequest): String { + private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } - private fun header(request: MtbFileSender.DeleteRequest): String { + private fun key(request: MtbFileSender.DeleteRequest): String { return "{\"pid\": \"${request.patientId}\", " + "\"requestId\": \"${request.requestId}\"}" } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 547833c..fd047d0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -42,6 +42,9 @@ class KafkaResponseProcessor( val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + + println("${responseBody.statusCode}") + when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS @@ -69,6 +72,16 @@ class KafkaResponseProcessor( requestRepository.save(it) } + in 900..999 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", + objectMapper.writeValueAsString(responseBody.statusBody) + ) + requestRepository.save(it) + } + else -> { logger.error("Cannot process Kafka response: Unknown response code!") } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 72edde6..5cd47c0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,8 +1,6 @@ spring: kafka: bootstrap-servers: ${app.kafka.servers} - template: - default-topic: ${app.kafka.topic} consumer: group-id: ${app.kafka.group-id} flyway: -- cgit v1.2.3 From b14f2c1794fe41f9ec5e9e400d51c0fbf991953a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:18:16 +0200 Subject: Add information about 'no connection' responses --- README.md | 7 +++++-- .../dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 2 -- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1d5a7f3..be6d6b5 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,9 @@ Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein * `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group". * `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste -Wird keine Rückantwort über Apache Kafka empfangen und gibt es keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`. +Wird keine Rückantwort über Apache Kafka empfangen und es gibt keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`. -Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. \ No newline at end of file +Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. + +Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es +für HTTP nicht gibt. \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index fd047d0..1e9263d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -43,8 +43,6 @@ class KafkaResponseProcessor( requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - println("${responseBody.statusCode}") - when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS -- cgit v1.2.3 From 459ad59c1d988a5b3ecc60d844f4fa6c9bce11f5 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 4 Aug 2023 11:43:23 +0200 Subject: Do not detect duplicates after deletion request --- .../kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index afac40b..bdf2827 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -46,12 +46,15 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val lastRequestForPatient = - requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + + val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { + val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE + + if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { requestRepository.save( Request( patientId = pseudonymized.patient.id, -- cgit v1.2.3 From 3dea664999b803fb2f62c659fee977b28abc250b Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 12:46:04 +0200 Subject: Update Spring Boot dependencies --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index eecd959..f4f1cf7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -2,7 +2,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { war - id("org.springframework.boot") version "3.1.1" + id("org.springframework.boot") version "3.1.2" id("io.spring.dependency-management") version "1.1.0" kotlin("jvm") version "1.9.0" kotlin("plugin.spring") version "1.9.0" -- cgit v1.2.3 From 3039b4b2a7eb7963d0952a28ca5fd26328640223 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 13:23:18 +0200 Subject: Add basic Testcontainers test setup --- build.gradle.kts | 2 + .../etl/processor/AbstractTestcontainerTest.kt | 45 ++++++++++++++++++++++ .../etl/processor/BwhcMapperApplicationTests.kt | 37 ++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt diff --git a/build.gradle.kts b/build.gradle.kts index f4f1cf7..b59c028 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -50,6 +50,8 @@ dependencies { providedRuntime("org.springframework.boot:spring-boot-starter-tomcat") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.projectreactor:reactor-test") + testImplementation("org.testcontainers:junit-jupiter") + testImplementation("org.testcontainers:postgresql") } tasks.withType { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt new file mode 100644 index 0000000..3bd934f --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -0,0 +1,45 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.junit.jupiter.Container + +abstract class AbstractTestcontainerTest { + + companion object { + @Container + val dbContainer = PostgreSQLContainer("postgres:10-alpine") + .withDatabaseName("test") + .withUsername("test") + .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") + + @DynamicPropertySource + @JvmStatic + fun registerDynamicProperties(registry: DynamicPropertyRegistry) { + registry.add("spring.datasource.url", dbContainer::getJdbcUrl) + registry.add("spring.datasource.username", dbContainer::getUsername) + registry.add("spring.datasource.password", dbContainer::getPassword) + } + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt new file mode 100644 index 0000000..efa6a66 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class BwhcMapperApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} -- cgit v1.2.3 From 1fc09d691ea01415a21f1192cc7b1cf25bc0ac14 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 13:34:53 +0200 Subject: Rename test class to match applications main class name --- .../etl/processor/BwhcMapperApplicationTests.kt | 37 ---------------------- .../etl/processor/EtlProcessorApplicationTests.kt | 37 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 37 deletions(-) delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt diff --git a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt deleted file mode 100644 index efa6a66..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.testcontainers.junit.jupiter.Testcontainers - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -class BwhcMapperApplicationTests : AbstractTestcontainerTest() { - - @Test - fun contextLoads() { - } - -} diff --git a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt new file mode 100644 index 0000000..07a201b --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class EtlProcessorApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} -- cgit v1.2.3 From bcc23f6b14436ba6f4585a583da6c236df68e25a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 14:50:12 +0200 Subject: Add RequestService to handle access to requests --- .../etl/processor/services/RequestProcessor.kt | 17 +- .../dnpm/etl/processor/services/RequestService.kt | 56 ++++++ .../services/RequestServiceIntegrationTest.kt | 131 +++++++++++++ .../etl/processor/services/RequestServiceTest.kt | 205 +++++++++++++++++++++ 4 files changed, 402 insertions(+), 7 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index bdf2827..e04e568 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -21,7 +21,10 @@ package dev.dnpm.etl.processor.services import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.* +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import org.apache.commons.codec.binary.Base32 @@ -35,7 +38,7 @@ import java.util.* class RequestProcessor( private val pseudonymizeService: PseudonymizeService, private val senders: List, - private val requestRepository: RequestRepository, + private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many ) { @@ -46,7 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } @@ -55,7 +58,7 @@ class RequestProcessor( val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { - requestRepository.save( + requestService.save( Request( patientId = pseudonymized.patient.id, pid = pid, @@ -99,7 +102,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = request.requestId, patientId = request.mtbFile.patient.id, @@ -165,7 +168,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = patientPseudonym, @@ -181,7 +184,7 @@ class RequestProcessor( ) ) } catch (e: Exception) { - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = "???", diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt new file mode 100644 index 0000000..0f69910 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -0,0 +1,56 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.springframework.stereotype.Service + +@Service +class RequestService( + private val requestRepository: RequestRepository +) { + + fun save(request: Request) = requestRepository.save(request) + + fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository + .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym) + + fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = + Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) + + fun isLastRequestDeletion(patientPseudonym: String) = + Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + + companion object { + + fun lastMtbFileRequestForPatientPseudonym(allRequests: List) = allRequests + .filter { it.type == RequestType.MTB_FILE } + .sortedByDescending { it.processedAt } + .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + + fun isLastRequestDeletion(allRequests: List) = allRequests + .maxByOrNull { it.processedAt }?.type == RequestType.DELETE + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt new file mode 100644 index 0000000..d71e011 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -0,0 +1,131 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.AbstractTestcontainerTest +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.springframework.transaction.annotation.Transactional +import org.testcontainers.junit.jupiter.Testcontainers +import java.time.Instant +import java.util.* + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +@Transactional +class RequestServiceIntegrationTest : AbstractTestcontainerTest() { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + @BeforeEach + fun setup( + @Autowired requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldResultInEmptyRequestList() { + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).isEmpty() + } + + private fun setupTestData() { + // Prepare DB + this.requestRepository.saveAll( + listOf( + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + // Should be ignored - wrong patient ID --> + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ), + // <-- + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P2", + fingerprint = "0123456789abcdee1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + ) + ) + } + + @Test + fun shouldResultInSortedRequestList() { + setupTestData() + + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).hasSize(2) + assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") + assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") + } + + @Test + fun shouldReturnDeleteRequestAsLastRequest() { + setupTestData() + + val actual = requestService.isLastRequestDeletion("TEST_12345678901") + + assertThat(actual).isTrue() + } + + @Test + fun shouldReturnLastMtbFileRequest() { + setupTestData() + + val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + assertThat(actual).isNotNull + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt new file mode 100644 index 0000000..3e0a979 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt @@ -0,0 +1,205 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.Mockito.* +import org.mockito.junit.jupiter.MockitoExtension +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class RequestServiceTest { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + private fun anyRequest() = any(Request::class.java) ?: Request( + id = 0L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_dummy", + pid = "PX", + fingerprint = "dummy", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldIndicateLastRequestIsDeleteRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.isLastRequestDeletion(requests) + + assertThat(actual).isTrue() + } + + @Test + fun shouldIndicateLastRequestIsNotDeleteRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.isLastRequestDeletion(requests) + + assertThat(actual).isFalse() + } + + @Test + fun shouldReturnPatientsLastRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests) + + assertThat(actual).isInstanceOf(Request::class.java) + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef2") + } + + @Test + fun shouldReturnNullIfNoRequests() { + val requests = listOf() + + val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests) + + assertThat(actual).isNull() + } + + @Test + fun saveShouldSaveRequestUsingRepository() { + doAnswer { + val obj = it.arguments[0] as Request + obj.copy(id = 1L) + }.`when`(requestRepository).save(anyRequest()) + + val request = Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ) + + requestService.save(request) + + verify(requestRepository, times(1)).save(anyRequest()) + } + + @Test + fun allRequestsByPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() { + requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + + @Test + fun lastMtbFileRequestForPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() { + requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + + @Test + fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() { + requestService.isLastRequestDeletion("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + +} \ No newline at end of file -- cgit v1.2.3 From 4051b5094ca8daaa844803d2725b4094f3eed096 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 14:58:10 +0200 Subject: Keep database testcontainer alive until all tests are done --- .../kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt index 3bd934f..13b57d0 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -28,7 +28,7 @@ abstract class AbstractTestcontainerTest { companion object { @Container - val dbContainer = PostgreSQLContainer("postgres:10-alpine") + val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") .withDatabaseName("test") .withUsername("test") .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") @@ -42,4 +42,10 @@ abstract class AbstractTestcontainerTest { } } +} + +class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { + override fun stop() { + // Keep Testcontainer alive until JVM destroys it + } } \ No newline at end of file -- cgit v1.2.3 From b75328b74d361b7afd6197a5b240f5f76ce2c5e0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 15:16:58 +0200 Subject: Move integration tests into own source-set --- build.gradle.kts | 24 +++- .../etl/processor/AbstractTestcontainerTest.kt | 51 ++++++++ .../etl/processor/EtlProcessorApplicationTests.kt | 37 ++++++ .../services/RequestServiceIntegrationTest.kt | 131 +++++++++++++++++++++ .../etl/processor/AbstractTestcontainerTest.kt | 51 -------- .../etl/processor/EtlProcessorApplicationTests.kt | 37 ------ .../services/RequestServiceIntegrationTest.kt | 131 --------------------- 7 files changed, 241 insertions(+), 221 deletions(-) create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index b59c028..89df274 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -15,6 +15,18 @@ java { sourceCompatibility = JavaVersion.VERSION_17 } +sourceSets { + create("integrationTest") { + compileClasspath += sourceSets.main.get().output + runtimeClasspath += sourceSets.main.get().output + } +} + +val integrationTestImplementation: Configuration by configurations.getting { + extendsFrom(configurations.testImplementation.get()) + extendsFrom(configurations.runtimeOnly.get()) +} + configurations { compileOnly { extendsFrom(configurations.annotationProcessor.get()) @@ -50,8 +62,8 @@ dependencies { providedRuntime("org.springframework.boot:spring-boot-starter-tomcat") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.projectreactor:reactor-test") - testImplementation("org.testcontainers:junit-jupiter") - testImplementation("org.testcontainers:postgresql") + integrationTestImplementation("org.testcontainers:junit-jupiter") + integrationTestImplementation("org.testcontainers:postgresql") } tasks.withType { @@ -65,3 +77,11 @@ tasks.withType { useJUnitPlatform() } +task("integrationTest") { + description = "Runs integration tests" + + testClassesDirs = sourceSets["integrationTest"].output.classesDirs + classpath = sourceSets["integrationTest"].runtimeClasspath + + shouldRunAfter("test") +} diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt new file mode 100644 index 0000000..13b57d0 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -0,0 +1,51 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.junit.jupiter.Container + +abstract class AbstractTestcontainerTest { + + companion object { + @Container + val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") + .withDatabaseName("test") + .withUsername("test") + .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") + + @DynamicPropertySource + @JvmStatic + fun registerDynamicProperties(registry: DynamicPropertyRegistry) { + registry.add("spring.datasource.url", dbContainer::getJdbcUrl) + registry.add("spring.datasource.username", dbContainer::getUsername) + registry.add("spring.datasource.password", dbContainer::getPassword) + } + } + +} + +class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { + override fun stop() { + // Keep Testcontainer alive until JVM destroys it + } +} \ No newline at end of file diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt new file mode 100644 index 0000000..07a201b --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class EtlProcessorApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt new file mode 100644 index 0000000..d71e011 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -0,0 +1,131 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.AbstractTestcontainerTest +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.springframework.transaction.annotation.Transactional +import org.testcontainers.junit.jupiter.Testcontainers +import java.time.Instant +import java.util.* + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +@Transactional +class RequestServiceIntegrationTest : AbstractTestcontainerTest() { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + @BeforeEach + fun setup( + @Autowired requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldResultInEmptyRequestList() { + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).isEmpty() + } + + private fun setupTestData() { + // Prepare DB + this.requestRepository.saveAll( + listOf( + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + // Should be ignored - wrong patient ID --> + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ), + // <-- + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P2", + fingerprint = "0123456789abcdee1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + ) + ) + } + + @Test + fun shouldResultInSortedRequestList() { + setupTestData() + + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).hasSize(2) + assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") + assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") + } + + @Test + fun shouldReturnDeleteRequestAsLastRequest() { + setupTestData() + + val actual = requestService.isLastRequestDeletion("TEST_12345678901") + + assertThat(actual).isTrue() + } + + @Test + fun shouldReturnLastMtbFileRequest() { + setupTestData() + + val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + assertThat(actual).isNotNull + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt deleted file mode 100644 index 13b57d0..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import org.testcontainers.containers.PostgreSQLContainer -import org.testcontainers.junit.jupiter.Container - -abstract class AbstractTestcontainerTest { - - companion object { - @Container - val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") - .withDatabaseName("test") - .withUsername("test") - .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") - - @DynamicPropertySource - @JvmStatic - fun registerDynamicProperties(registry: DynamicPropertyRegistry) { - registry.add("spring.datasource.url", dbContainer::getJdbcUrl) - registry.add("spring.datasource.username", dbContainer::getUsername) - registry.add("spring.datasource.password", dbContainer::getPassword) - } - } - -} - -class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { - override fun stop() { - // Keep Testcontainer alive until JVM destroys it - } -} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt deleted file mode 100644 index 07a201b..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.testcontainers.junit.jupiter.Testcontainers - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -class EtlProcessorApplicationTests : AbstractTestcontainerTest() { - - @Test - fun contextLoads() { - } - -} diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt deleted file mode 100644 index d71e011..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor.services - -import dev.dnpm.etl.processor.AbstractTestcontainerTest -import dev.dnpm.etl.processor.monitoring.Request -import dev.dnpm.etl.processor.monitoring.RequestRepository -import dev.dnpm.etl.processor.monitoring.RequestStatus -import dev.dnpm.etl.processor.monitoring.RequestType -import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.springframework.transaction.annotation.Transactional -import org.testcontainers.junit.jupiter.Testcontainers -import java.time.Instant -import java.util.* - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -@Transactional -class RequestServiceIntegrationTest : AbstractTestcontainerTest() { - - private lateinit var requestRepository: RequestRepository - - private lateinit var requestService: RequestService - - @BeforeEach - fun setup( - @Autowired requestRepository: RequestRepository - ) { - this.requestRepository = requestRepository - this.requestService = RequestService(requestRepository) - } - - @Test - fun shouldResultInEmptyRequestList() { - val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") - - assertThat(actual).isEmpty() - } - - private fun setupTestData() { - // Prepare DB - this.requestRepository.saveAll( - listOf( - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678901", - pid = "P1", - fingerprint = "0123456789abcdef1", - type = RequestType.MTB_FILE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-07-07T02:00:00Z") - ), - // Should be ignored - wrong patient ID --> - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", - type = RequestType.MTB_FILE, - status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") - ), - // <-- - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678901", - pid = "P2", - fingerprint = "0123456789abcdee1", - type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-08-08T02:00:00Z") - ) - ) - ) - } - - @Test - fun shouldResultInSortedRequestList() { - setupTestData() - - val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") - - assertThat(actual).hasSize(2) - assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") - assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") - } - - @Test - fun shouldReturnDeleteRequestAsLastRequest() { - setupTestData() - - val actual = requestService.isLastRequestDeletion("TEST_12345678901") - - assertThat(actual).isTrue() - } - - @Test - fun shouldReturnLastMtbFileRequest() { - setupTestData() - - val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") - - assertThat(actual).isNotNull - assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") - } - -} \ No newline at end of file -- cgit v1.2.3 From 422441a3b39806016a952bf7bdff69e0834debca Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 16:46:02 +0200 Subject: Add tests for RequestProcessor --- build.gradle.kts | 1 + .../etl/processor/services/RequestProcessor.kt | 20 +- .../etl/processor/services/RequestProcessorTest.kt | 209 +++++++++++++++++++++ 3 files changed, 221 insertions(+), 9 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 89df274..78bad77 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -62,6 +62,7 @@ dependencies { providedRuntime("org.springframework.boot:spring-boot-starter-tomcat") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.projectreactor:reactor-test") + testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0") integrationTestImplementation("org.testcontainers:junit-jupiter") integrationTestImplementation("org.testcontainers:postgresql") } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index e04e568..fcb0863 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -49,15 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) - - val lastMtbFileRequestForPatient = allRequests - .filter { it.type == RequestType.MTB_FILE } - .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - - val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE - - if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { + if (isDuplication(pseudonymized)) { requestService.save( Request( patientId = pseudonymized.patient.id, @@ -124,6 +116,16 @@ class RequestProcessor( statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } + private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { + val lastMtbFileRequestForPatient = + requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + + return null != lastMtbFileRequestForPatient + && !isLastRequestDeletion + && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile) + } + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt new file mode 100644 index 0000000..c165cf0 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -0,0 +1,209 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.Mockito.* +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + + +@ExtendWith(MockitoExtension::class) +class RequestProcessorTest { + + private lateinit var pseudonymizeService: PseudonymizeService + private lateinit var sender: MtbFileSender + private lateinit var requestService: RequestService + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var requestProcessor: RequestProcessor + + @BeforeEach + fun setup( + @Mock pseudonymizeService: PseudonymizeService, + @Mock sender: RestMtbFileSender, + @Mock requestService: RequestService, + ) { + this.pseudonymizeService = pseudonymizeService + this.sender = sender + this.requestService = requestService + this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + + val objectMapper = ObjectMapper() + + requestProcessor = RequestProcessor( + pseudonymizeService, + listOf(sender), + requestService, + objectMapper, + statisticsUpdateProducer + ) + } + + @Test + fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "cwaxsvectyfj4qcw4hiwzx5fwwo7lekyagpzd2ayuf36jlvi6msa", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSaveRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "different", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + }.`when`(sender).send(any()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleterequestAndSaveRequestStatus() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + +} \ No newline at end of file -- cgit v1.2.3 From 536ecbbd56d2dad166e995256a6793a675dea167 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 18:52:03 +0200 Subject: Add tests for error response status --- .../etl/processor/services/RequestProcessorTest.kt | 93 +++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index c165cf0..12d6e29 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -130,7 +130,7 @@ class RequestProcessorTest { } @Test - fun testShouldSendMtbFileAndSaveRequestStatus() { + fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { doAnswer { Request( id = 1L, @@ -189,7 +189,66 @@ class RequestProcessorTest { } @Test - fun testShouldSendDeleterequestAndSaveRequestStatus() { + fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "different", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + }.`when`(sender).send(any()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + + @Test + fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) @@ -206,4 +265,34 @@ class RequestProcessorTest { assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } + @Test + fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + + @Test + fun testShouldSendDeleteRequestWithPseudonymErrorAndSaveErrorRequestStatus() { + doThrow(RuntimeException()).`when`(pseudonymizeService).patientPseudonym(anyString()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + } \ No newline at end of file -- cgit v1.2.3 From 6ad6ee13a1cae8ed286e80b3a46c458e1052480b Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 19:16:59 +0200 Subject: Ignore unknown properties in DataQualityResponse --- .../dnpm/etl/processor/monitoring/ReportService.kt | 3 + .../etl/processor/services/ReportServiceTest.kt | 70 ++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 8c31ede..ae36705 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.monitoring +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonValue import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException @@ -46,8 +47,10 @@ class ReportService( } + @JsonIgnoreProperties(ignoreUnknown = true) private data class DataQualityReport(val issues: List) + @JsonIgnoreProperties(ignoreUnknown = true) data class Issue(val severity: Severity, val message: String) enum class Severity(@JsonValue val value: String) { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt new file mode 100644 index 0000000..70efe2b --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.ReportService +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class ReportServiceTest { + + private lateinit var reportService: ReportService + + @BeforeEach + fun setup() { + this.reportService = ReportService(ObjectMapper().registerModule(KotlinModule.Builder().build())) + } + + @Test + fun shouldParseDataQualityReport() { + val json = """ + { + "patient": "4711", + "issues": [ + { "severity": "warning", "message": "Warning Message" }, + { "severity": "error", "message": "Error Message" } + ] + } + """.trimIndent() + + val actual = this.reportService.deserialize(json) + + assertThat(actual).hasSize(2) + assertThat(actual[0].severity).isEqualTo(ReportService.Severity.WARNING) + assertThat(actual[0].message).isEqualTo("Warning Message") + assertThat(actual[1].severity).isEqualTo(ReportService.Severity.ERROR) + assertThat(actual[1].message).isEqualTo("Error Message") + } + + @Test + fun shouldReturnSyntheticDataQualityReportOnParserError() { + val invalidResponse = "Invalid Response Data" + + val actual = this.reportService.deserialize(invalidResponse) + + assertThat(actual).hasSize(1) + assertThat(actual[0].severity).isEqualTo(ReportService.Severity.ERROR) + assertThat(actual[0].message).isEqualTo("Not parsable data quality report '$invalidResponse'") + } + +} \ No newline at end of file -- cgit v1.2.3 From 7739afad1fc82f4ffe0debbebae58874f046d82d Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 08:13:27 +0200 Subject: Handle MTB File with rejected consent as deletion request --- .../dnpm/etl/processor/web/MtbFileController.kt | 49 ------- .../etl/processor/web/MtbFileRestController.kt | 55 ++++++++ .../etl/processor/web/MtbFileRestControllerTest.kt | 150 +++++++++++++++++++++ 3 files changed, 205 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt deleted file mode 100644 index cf0e693..0000000 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor.web - -import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.services.RequestProcessor -import org.slf4j.LoggerFactory -import org.springframework.http.ResponseEntity -import org.springframework.web.bind.annotation.* - -@RestController -class MtbFileController( - private val requestProcessor: RequestProcessor, -) { - - private val logger = LoggerFactory.getLogger(MtbFileController::class.java) - - @PostMapping(path = ["/mtbfile"]) - fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - logger.debug("Accepted MTB File for processing") - requestProcessor.processMtbFile(mtbFile) - return ResponseEntity.accepted().build() - } - - @DeleteMapping(path = ["/mtbfile/{patientId}"]) - fun deleteData(@PathVariable patientId: String): ResponseEntity { - logger.debug("Accepted patient ID to process deletion") - requestProcessor.processDeletion(patientId) - return ResponseEntity.accepted().build() - } - -} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt new file mode 100644 index 0000000..9b441f6 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt @@ -0,0 +1,55 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.web + +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.services.RequestProcessor +import org.slf4j.LoggerFactory +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.* + +@RestController +class MtbFileRestController( + private val requestProcessor: RequestProcessor, +) { + + private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java) + + @PostMapping(path = ["/mtbfile"]) + fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { + if (mtbFile.consent.status == Consent.Status.ACTIVE) { + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + } else { + logger.debug("Accepted MTB File and process deletion") + requestProcessor.processDeletion(mtbFile.patient.id) + } + return ResponseEntity.accepted().build() + } + + @DeleteMapping(path = ["/mtbfile/{patientId}"]) + fun deleteData(@PathVariable patientId: String): ResponseEntity { + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt new file mode 100644 index 0000000..2fde35a --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt @@ -0,0 +1,150 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.web + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.services.RequestProcessor +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.springframework.http.MediaType +import org.springframework.test.web.servlet.MockMvc +import org.springframework.test.web.servlet.delete +import org.springframework.test.web.servlet.post +import org.springframework.test.web.servlet.setup.MockMvcBuilders + +@ExtendWith(MockitoExtension::class) +class MtbFileRestControllerTest { + + private lateinit var mockMvc: MockMvc + + private lateinit var requestProcessor: RequestProcessor + + private val objectMapper = ObjectMapper() + + @BeforeEach + fun setup( + @Mock requestProcessor: RequestProcessor + ) { + this.requestProcessor = requestProcessor + val controller = MtbFileRestController(requestProcessor) + this.mockMvc = MockMvcBuilders.standaloneSetup(controller).build() + } + + @Test + fun shouldProcessMtbFilePostRequest() { + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("TEST_12345678") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("TEST_12345678") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("TEST_12345678") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + mockMvc.post("/mtbfile") { + content = objectMapper.writeValueAsString(mtbFile) + contentType = MediaType.APPLICATION_JSON + }.andExpect { + status { + isAccepted() + } + } + + verify(requestProcessor, times(1)).processMtbFile(any()) + } + + @Test + fun shouldProcessMtbFilePostRequestWithRejectedConsent() { + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("TEST_12345678") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.REJECTED) + .withPatient("TEST_12345678") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("TEST_12345678") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + mockMvc.post("/mtbfile") { + content = objectMapper.writeValueAsString(mtbFile) + contentType = MediaType.APPLICATION_JSON + }.andExpect { + status { + isAccepted() + } + } + + val captor = argumentCaptor() + verify(requestProcessor, times(1)).processDeletion(captor.capture()) + assertThat(captor.firstValue).isEqualTo("TEST_12345678") + } + + @Test + fun shouldProcessMtbFileDeleteRequest() { + mockMvc.delete("/mtbfile/TEST_12345678").andExpect { + status { + isAccepted() + } + } + + val captor = argumentCaptor() + verify(requestProcessor, times(1)).processDeletion(captor.capture()) + assertThat(captor.firstValue).isEqualTo("TEST_12345678") + } + +} \ No newline at end of file -- cgit v1.2.3 From 13bfa0018d6c9b48893ef96945659be9e7eec6c0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:20:20 +0200 Subject: Change endpoint configuration to select single endpoint * If REST endpoint is configured, it will be used * If Kafka endpoint is configured, it will be used * If both endpoints are configured, REST configuration has precedence and will be used --- README.md | 5 +- .../etl/processor/config/AppConfigurationTest.kt | 102 +++++++++++++++++++++ .../dnpm/etl/processor/config/AppConfiguration.kt | 12 +-- .../etl/processor/config/AppKafkaConfiguration.kt | 8 ++ .../etl/processor/config/AppRestConfiguration.kt | 52 +++++++++++ 5 files changed, 167 insertions(+), 12 deletions(-) create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt diff --git a/README.md b/README.md index be6d6b5..ea0c02b 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,10 @@ Wurde die Verwendung von gPAS konfiguriert, so sind weitere Angaben zu konfiguri ## Mögliche Endpunkte -Für REST-Requests als auch (parallel) zur Nutzung von Kafka-Topics können Endpunkte konfiguriert werden. +Für REST-Requests als auch zur Nutzung von Kafka-Topics können Endpunkte konfiguriert werden. + +Es ist dabei nur die Konfiguration eines Endpunkts zulässig. +Werden sowohl REST als auch Kafka-Endpunkt konfiguriert, wird nur der REST-Endpunkt verwendet. ### REST diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt new file mode 100644 index 0000000..8bdaa60 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt @@ -0,0 +1,102 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.springframework.beans.factory.NoSuchBeanDefinitionException +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.boot.test.mock.mockito.MockBeans +import org.springframework.context.ApplicationContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource + +@SpringBootTest +@ContextConfiguration(classes = [KafkaAutoConfiguration::class, AppKafkaConfiguration::class, AppRestConfiguration::class]) +class AppConfigurationTest { + + @Nested + @TestPropertySource( + properties = [ + "app.rest.uri=http://localhost:9000" + ] + ) + inner class AppConfigurationRestTest(private val context: ApplicationContext) { + + @Test + fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() { + assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull + assertThrows { context.getBean(KafkaMtbFileSender::class.java) } + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.topic=test", + "app.kafka.response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + @MockBeans(value = [ + MockBean(ObjectMapper::class), + MockBean(RequestRepository::class) + ]) + inner class AppConfigurationKafkaTest(private val context: ApplicationContext) { + + @Test + fun shouldUseKafkaMtbFileSenderNotRestMtbFileSender() { + assertThrows { context.getBean(RestMtbFileSender::class.java) } + assertThat(context.getBean(KafkaMtbFileSender::class.java)).isNotNull + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.rest.uri=http://localhost:9000", + "app.kafka.servers=localhost:9092", + "app.kafka.topic=test", + "app.kafka.response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationRestInPrecedenceTest(private val context: ApplicationContext) { + + @Test + fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() { + assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull + assertThrows { context.getBean(KafkaMtbFileSender::class.java) } + } + + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index cbba1f1..6b15fc0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,8 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.MtbFileSender -import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator @@ -38,9 +36,7 @@ import reactor.core.publisher.Sinks value = [ AppConfigProperties::class, PseudonymizeConfigProperties::class, - GPasConfigProperties::class, - RestTargetProperties::class, - KafkaTargetProperties::class + GPasConfigProperties::class ] ) class AppConfiguration { @@ -65,12 +61,6 @@ class AppConfiguration { return PseudonymizeService(generator, pseudonymizeConfigProperties) } - @ConditionalOnProperty(value = ["app.rest.uri"]) - @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { - return RestMtbFileSender(restTargetProperties) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 7adcb02..6d0254e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -24,10 +24,13 @@ import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.ContainerProperties @@ -38,14 +41,19 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer value = [KafkaTargetProperties::class] ) @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-5) class AppKafkaConfiguration { + private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java) + @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { + logger.info("Selected 'KafkaMtbFileSender'") return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt new file mode 100644 index 0000000..5e77a4f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -0,0 +1,52 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order + +@Configuration +@EnableConfigurationProperties( + value = [ + RestTargetProperties::class + ] +) +@ConditionalOnProperty(value = ["app.rest.uri"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-10) +class AppRestConfiguration { + + private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) + + @Bean + fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + logger.info("Selected 'RestMtbFileSender'") + return RestMtbFileSender(restTargetProperties) + } + +} + -- cgit v1.2.3 From 47830ed9f7774c84674e9399cd347d12424f4f42 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:34:23 +0200 Subject: Use single MtbFileSender --- .../etl/processor/EtlProcessorApplicationTests.kt | 5 +- .../services/RequestServiceIntegrationTest.kt | 3 + .../etl/processor/services/RequestProcessor.kt | 110 +++++++++------------ .../etl/processor/services/RequestProcessorTest.kt | 2 +- 4 files changed, 57 insertions(+), 63 deletions(-) diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt index 07a201b..6c5b150 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -19,19 +19,22 @@ package dev.dnpm.etl.processor +import dev.dnpm.etl.processor.output.MtbFileSender import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.context.junit.jupiter.SpringExtension import org.testcontainers.junit.jupiter.Testcontainers @Testcontainers @ExtendWith(SpringExtension::class) @SpringBootTest +@MockBean(MtbFileSender::class) class EtlProcessorApplicationTests : AbstractTestcontainerTest() { @Test - fun contextLoads() { + fun contextLoadsIfMtbFileSenderConfigured() { } } diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt index d71e011..3af218e 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -24,12 +24,14 @@ import dev.dnpm.etl.processor.monitoring.Request import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType +import dev.dnpm.etl.processor.output.MtbFileSender import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.context.junit.jupiter.SpringExtension import org.springframework.transaction.annotation.Transactional import org.testcontainers.junit.jupiter.Testcontainers @@ -40,6 +42,7 @@ import java.util.* @ExtendWith(SpringExtension::class) @SpringBootTest @Transactional +@MockBean(MtbFileSender::class) class RequestServiceIntegrationTest : AbstractTestcontainerTest() { private lateinit var requestRepository: RequestRepository diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index fcb0863..936c1bf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -37,7 +37,7 @@ import java.util.* @Service class RequestProcessor( private val pseudonymizeService: PseudonymizeService, - private val senders: List, + private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many @@ -66,32 +66,26 @@ class RequestProcessor( val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) - val responses = senders.map { - val responseStatus = it.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } - responseStatus + val responseStatus = sender.send(request) + if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { + logger.info( + "Sent file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) + } else { + logger.error( + "Error sending file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) } - val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { - RequestStatus.WARNING - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -104,9 +98,7 @@ class RequestProcessor( type = RequestType.MTB_FILE, report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", - responses.joinToString("\n") { it.reason }) - + RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null } @@ -132,42 +124,38 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responses = senders.map { - val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + when (responseStatus.status) { + MtbFileSender.ResponseStatus.SUCCESS -> { + logger.info( + "Sent delete for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + MtbFileSender.ResponseStatus.ERROR -> { + logger.error( + "Error deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + else -> { + logger.error( + "Unknown result on deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) } - responseStatus } - val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -176,9 +164,9 @@ class RequestProcessor( patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = overallRequestStatus, + status = requestStatus, type = RequestType.DELETE, - report = when (overallRequestStatus) { + report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 12d6e29..6e97343 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -67,7 +67,7 @@ class RequestProcessorTest { requestProcessor = RequestProcessor( pseudonymizeService, - listOf(sender), + sender, requestService, objectMapper, statisticsUpdateProducer -- cgit v1.2.3 From 7f048e2483138deecc28208af42546097ef929d7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 12:26:57 +0200 Subject: Do not append custom prefix to gPAS pseudonym --- .../etl/processor/pseudonym/PseudonymizeService.kt | 36 +-------- .../dev/dnpm/etl/processor/pseudonym/extensions.kt | 50 +++++++++++++ .../etl/processor/services/RequestProcessor.kt | 14 ++-- .../processor/pseudonym/PseudonymizeServiceTest.kt | 86 ++++++++++++++++++++++ .../etl/processor/services/RequestProcessorTest.kt | 14 ++-- 5 files changed, 155 insertions(+), 45 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 1a79850..ab8ce2f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.pseudonym -import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties class PseudonymizeService( @@ -27,38 +26,11 @@ class PseudonymizeService( private val configProperties: PseudonymizeConfigProperties ) { - fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym = patientPseudonym(mtbFile.patient.id) - - mtbFile.episode.patient = patientPseudonym - mtbFile.carePlans.forEach { it.patient = patientPseudonym } - mtbFile.patient.id = patientPseudonym - mtbFile.claims.forEach { it.patient = patientPseudonym } - mtbFile.consent.patient = patientPseudonym - mtbFile.claimResponses.forEach { it.patient = patientPseudonym } - mtbFile.diagnoses.forEach { it.patient = patientPseudonym } - mtbFile.ecogStatus.forEach { it.patient = patientPseudonym } - mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } - mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReports.forEach { it.patient = patientPseudonym } - mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } - mtbFile.ngsReports.forEach { it.patient = patientPseudonym } - mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.responses.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - - return mtbFile - } - fun patientPseudonym(patientId: String): String { - return "${configProperties.prefix}_${generator.generate(patientId)}" + return when (generator) { + is GpasPseudonymGenerator -> generator.generate(patientId) + else -> "${configProperties.prefix}_${generator.generate(patientId)}" + } } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt new file mode 100644 index 0000000..580785d --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -0,0 +1,50 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.MtbFile + +infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { + val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id) + + this.episode.patient = patientPseudonym + this.carePlans.forEach { it.patient = patientPseudonym } + this.patient.id = patientPseudonym + this.claims.forEach { it.patient = patientPseudonym } + this.consent.patient = patientPseudonym + this.claimResponses.forEach { it.patient = patientPseudonym } + this.diagnoses.forEach { it.patient = patientPseudonym } + this.ecogStatus.forEach { it.patient = patientPseudonym } + this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } + this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } + this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } + this.histologyReports.forEach { it.patient = patientPseudonym } + this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } + this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.ngsReports.forEach { it.patient = patientPseudonym } + this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.rebiopsyRequests.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.responses.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 936c1bf..6465e82 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -27,6 +27,7 @@ import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory @@ -47,12 +48,13 @@ class RequestProcessor( fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id - val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - if (isDuplication(pseudonymized)) { + mtbFile pseudonymizeWith pseudonymizeService + + if (isDuplication(mtbFile)) { requestService.save( Request( - patientId = pseudonymized.patient.id, + patientId = mtbFile.patient.id, pid = pid, fingerprint = fingerprint(mtbFile), status = RequestStatus.DUPLICATION, @@ -64,19 +66,19 @@ class RequestProcessor( return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) + val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) val responseStatus = sender.send(request) if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { logger.info( "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } else { logger.error( "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt new file mode 100644 index 0000000..a30a328 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt @@ -0,0 +1,86 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.doAnswer +import org.mockito.kotlin.whenever + +@ExtendWith(MockitoExtension::class) +class PseudonymizeServiceTest { + + private val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("123") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("123") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + @Test + fun shouldNotUsePseudonymPrefixForGpas(@Mock generator: GpasPseudonymGenerator) { + doAnswer { + it.arguments[0] + }.whenever(generator).generate(anyString()) + + val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties()) + + mtbFile.pseudonymizeWith(pseudonymizeService) + + assertThat(mtbFile.patient.id).isEqualTo("123") + } + + @Test + fun shouldUsePseudonymPrefixForBuiltin(@Mock generator: AnonymizingGenerator) { + doAnswer { + it.arguments[0] + }.whenever(generator).generate(anyString()) + + val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties()) + + mtbFile.pseudonymizeWith(pseudonymizeService) + + assertThat(mtbFile.patient.id).isEqualTo("UNKNOWN_123") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 6e97343..8552bbb 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -82,7 +82,7 @@ class RequestProcessorTest { uuid = UUID.randomUUID().toString(), patientId = "TEST_12345678901", pid = "P1", - fingerprint = "cwaxsvectyfj4qcw4hiwzx5fwwo7lekyagpzd2ayuf36jlvi6msa", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", type = RequestType.MTB_FILE, status = RequestStatus.SUCCESS, processedAt = Instant.parse("2023-08-08T02:00:00Z") @@ -94,8 +94,8 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( @@ -153,8 +153,8 @@ class RequestProcessorTest { }.`when`(sender).send(any()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( @@ -212,8 +212,8 @@ class RequestProcessorTest { }.`when`(sender).send(any()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( -- cgit v1.2.3 From 1a640ff9dff1cc182c4ffc1d00dff370e42a25de Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:15:20 +0200 Subject: Decouple request and response processing --- .../etl/processor/config/AppKafkaConfiguration.kt | 6 +- .../etl/processor/output/KafkaMtbFileSender.kt | 13 +- .../dev/dnpm/etl/processor/output/MtbFileSender.kt | 21 ++- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 16 +-- .../etl/processor/services/RequestProcessor.kt | 135 +++++++------------- .../etl/processor/services/ResponseProcessor.kt | 96 ++++++++++++++ .../services/kafka/KafkaResponseProcessor.kt | 85 ++++++------ .../etl/processor/services/RequestProcessorTest.kt | 128 +++++++++++++++---- .../processor/services/ResponseProcessorTest.kt | 142 +++++++++++++++++++++ .../services/kafka/KafkaResponseProcessorTest.kt | 119 +++++++++++++++++ 10 files changed, 573 insertions(+), 188 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 6d0254e..309ff2d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order @@ -70,10 +70,10 @@ class AppKafkaConfiguration { @Bean fun kafkaResponseProcessor( - requestRepository: RequestRepository, + applicationEventPublisher: ApplicationEventPublisher, objectMapper: ObjectMapper ): KafkaResponseProcessor { - return KafkaResponseProcessor(requestRepository, objectMapper) + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index d903745..9448e29 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -43,13 +44,13 @@ class KafkaMtbFileSender( ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } @@ -72,13 +73,13 @@ class KafkaMtbFileSender( if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..24cdc49 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,13 +20,13 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { @@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 6465e82..d2f8619 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service @@ -41,73 +42,54 @@ class RequestProcessor( private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many + private val applicationEventPublisher: ApplicationEventPublisher ) { private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) + + requestService.save( + Request( + uuid = requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE + ) + ) + if (isDuplication(mtbFile)) { - requestService.save( - Request( - patientId = mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) - val responseStatus = sender.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { @@ -126,55 +108,31 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = requestStatus, - type = RequestType.DELETE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE + ) + ) + + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } catch (e: Exception) { requestService.save( Request( @@ -188,7 +146,6 @@ class RequestProcessor( ) ) } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..d7ad86f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,96 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many, + private val objectMapper: ObjectMapper +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response code!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional = Optional.empty() +) \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 1e9263d..ef880f4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.Report -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.kafka.listener.MessageListener import java.time.Instant +import java.util.* class KafkaResponseProcessor( - private val requestRepository: RequestRepository, + private val eventPublisher: ApplicationEventPublisher, private val objectMapper: ObjectMapper ) : MessageListener { @@ -39,55 +41,44 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) - requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + } catch (e: Exception) { + Optional.empty() + }.ifPresentOrElse({ responseKey -> + val event = try { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - when (responseBody.statusCode) { - 200 -> { - it.status = RequestStatus.SUCCESS - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - requestRepository.save(it) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - 201 -> { - it.status = RequestStatus.WARNING - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) + else -> { + logger.error("Kafka response: Unknown response code!") + Optional.empty() + } } - - 400, 422 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - in 900..999 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - else -> { - logger.error("Cannot process Kafka response: Unknown response code!") - } - } + ) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + RequestStatus.ERROR, + Optional.of("Cannot process Kafka response") + ) } - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - } + eventPublisher.publishEvent(event) + }, { + logger.error("No response key in Kafka response") + }) } data class ResponseKey(val requestId: String) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 8552bbb..f9d8182 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -37,7 +37,7 @@ import org.mockito.Mockito.* import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor -import reactor.core.publisher.Sinks +import org.springframework.context.ApplicationEventPublisher import java.time.Instant import java.util.* @@ -48,7 +48,7 @@ class RequestProcessorTest { private lateinit var pseudonymizeService: PseudonymizeService private lateinit var sender: MtbFileSender private lateinit var requestService: RequestService - private lateinit var statisticsUpdateProducer: Sinks.Many + private lateinit var applicationEventPublisher: ApplicationEventPublisher private lateinit var requestProcessor: RequestProcessor @@ -57,11 +57,12 @@ class RequestProcessorTest { @Mock pseudonymizeService: PseudonymizeService, @Mock sender: RestMtbFileSender, @Mock requestService: RequestService, + @Mock applicationEventPublisher: ApplicationEventPublisher ) { this.pseudonymizeService = pseudonymizeService this.sender = sender this.requestService = requestService - this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + this.applicationEventPublisher = applicationEventPublisher val objectMapper = ObjectMapper() @@ -70,12 +71,12 @@ class RequestProcessorTest { sender, requestService, objectMapper, - statisticsUpdateProducer + applicationEventPublisher ) } @Test - fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() { doAnswer { Request( id = 1L, @@ -126,11 +127,66 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { + fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSendSuccessEvent() { doAnswer { Request( id = 1L, @@ -149,7 +205,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) doAnswer { @@ -182,14 +238,14 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } @Test - fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + fun testShouldSendMtbFileAndSendErrorEvent() { doAnswer { Request( id = 1L, @@ -208,7 +264,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.ERROR) }.`when`(sender).send(any()) doAnswer { @@ -241,20 +297,20 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test - fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { + fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.UNKNOWN) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") @@ -262,25 +318,43 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + fun testShouldSendDeleteRequestAndSendSuccessEvent() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleteRequestAndSendErrorEvent() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = RequestStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt new file mode 100644 index 0000000..cfb1111 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -0,0 +1,142 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class ResponseProcessorTest { + + private lateinit var requestRepository: RequestRepository + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var responseProcessor: ResponseProcessor + + private val testRequest = Request( + 1L, + "TestID1234", + "PSEUDONYM-A", + "1", + "dummyfingerprint", + RequestType.MTB_FILE, + RequestStatus.UNKNOWN + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository, + @Mock statisticsUpdateProducer: Sinks.Many + ) { + val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.requestRepository = requestRepository + this.statisticsUpdateProducer = statisticsUpdateProducer + + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + } + + @Test + fun shouldNotSaveStatusForUnknownRequest() { + doAnswer { + Optional.empty() + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.SUCCESS + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @Test + fun shouldNotSaveStatusWithUnknownState() { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.UNKNOWN + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @ParameterizedTest + @MethodSource("requestStatusSource") + fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + requestStatus + ) + + this.responseProcessor.handleResponseEvent(event) + + val captor = argumentCaptor() + verify(requestRepository, times(1)).save(captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue.status).isEqualTo(requestStatus) + } + + companion object { + + @JvmStatic + fun requestStatusSource(): Set { + return setOf( + RequestStatus.SUCCESS, + RequestStatus.WARNING, + RequestStatus.ERROR, + RequestStatus.DUPLICATION + ) + } + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt new file mode 100644 index 0000000..0f524ca --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -0,0 +1,119 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.services.ResponseEvent +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.springframework.context.ApplicationEventPublisher +import org.springframework.http.HttpStatus + +@ExtendWith(MockitoExtension::class) +class KafkaResponseProcessorTest { + + private lateinit var eventPublisher: ApplicationEventPublisher + private lateinit var objectMapper: ObjectMapper + + private lateinit var kafkaResponseProcessor: KafkaResponseProcessor + + private fun createkafkaRecord( + requestId: String? = null, + statusCode: Int = 200, + statusBody: Map? = mapOf() + ): ConsumerRecord { + return ConsumerRecord( + "test-topic", + 0, + 0, + if (requestId == null) { + null + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) + }, + if (statusBody == null) { + "" + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + } + ) + } + + @BeforeEach + fun setup( + @Mock eventPublisher: ApplicationEventPublisher + ) { + this.eventPublisher = eventPublisher + this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper) + } + + @Test + fun shouldNotProcessRecordsWithoutValidKey() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldNotProcessRecordsWithoutValidBody() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @ParameterizedTest + @MethodSource("statusCodeSource") + fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { + this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + verify(eventPublisher, times(1)).publishEvent(any()) + } + + companion object { + + @JvmStatic + fun statusCodeSource(): Set { + return setOf( + HttpStatus.OK, + HttpStatus.CREATED, + HttpStatus.BAD_REQUEST, + HttpStatus.NOT_FOUND, + HttpStatus.UNPROCESSABLE_ENTITY, + HttpStatus.INTERNAL_SERVER_ERROR + ) + .map { it.value() } + .toSet() + } + + } + +} \ No newline at end of file -- cgit v1.2.3 From 501762d4513ba6050e99e5e670a67a6cb672020d Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:32:03 +0200 Subject: Add test logging --- build.gradle.kts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index 78bad77..1f1a123 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,3 +1,4 @@ +import org.gradle.api.tasks.testing.logging.TestLogEvent import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { @@ -76,6 +77,9 @@ tasks.withType { tasks.withType { useJUnitPlatform() + testLogging { + events(TestLogEvent.FAILED, TestLogEvent.PASSED, TestLogEvent.SKIPPED) + } } task("integrationTest") { -- cgit v1.2.3 From 2b42a4d262a846feb1f82facbb151be9cabb57b4 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 12:11:39 +0200 Subject: Tests for RestMtbFileSender --- .../etl/processor/config/AppRestConfiguration.kt | 10 +- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 7 +- .../etl/processor/output/RestMtbFileSenderTest.kt | 159 +++++++++++++++++++++ 3 files changed, 171 insertions(+), 5 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index 5e77a4f..a830597 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -28,6 +28,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order +import org.springframework.web.client.RestTemplate @Configuration @EnableConfigurationProperties( @@ -43,9 +44,14 @@ class AppRestConfiguration { private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + fun restTemplate(): RestTemplate { + return RestTemplate() + } + + @Bean + fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender { logger.info("Selected 'RestMtbFileSender'") - return RestMtbFileSender(restTargetProperties) + return RestMtbFileSender(restTemplate, restTargetProperties) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 24cdc49..f80ff69 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -28,12 +28,13 @@ import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { +class RestMtbFileSender( + private val restTemplate: RestTemplate, + private val restTargetProperties: RestTargetProperties +) : MtbFileSender { private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java) - private val restTemplate = RestTemplate() - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { try { val headers = HttpHeaders() diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt new file mode 100644 index 0000000..17d420a --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -0,0 +1,159 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.output + +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.springframework.http.HttpMethod +import org.springframework.http.HttpStatus +import org.springframework.test.web.client.MockRestServiceServer +import org.springframework.test.web.client.match.MockRestRequestMatchers.method +import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo +import org.springframework.test.web.client.response.MockRestResponseCreators.withStatus +import org.springframework.web.client.RestTemplate + +class RestMtbFileSenderTest { + + private lateinit var mockRestServiceServer: MockRestServiceServer + + private lateinit var restMtbFileSender: RestMtbFileSender + + @BeforeEach + fun setup() { + val restTemplate = RestTemplate() + val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile") + + this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate) + + this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties) + } + + @ParameterizedTest + @MethodSource("deleteRequestWithResponseSource") + fun shouldReturnExpectedResponseForDelete(requestWithResponse: RequestWithResponse) { + this.mockRestServiceServer.expect { + method(HttpMethod.DELETE) + requestTo("/mtbfile") + }.andRespond { + withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it) + } + + val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + } + + @ParameterizedTest + @MethodSource("mtbFileRequestWithResponseSource") + fun shouldReturnExpectedResponseForMtbFilePost(requestWithResponse: RequestWithResponse) { + this.mockRestServiceServer.expect { + method(HttpMethod.POST) + requestTo("/mtbfile") + }.andRespond { + withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it) + } + + val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile)) + assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + } + + companion object { + data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus) + + private val warningBody = """ + { + "patient_id": "PID", + "issues": [ + { "severity": "warning", "message": "Something is not right" } + ] + } + """.trimIndent() + + private val errorBody = """ + { + "patient_id": "PID", + "issues": [ + { "severity": "error", "message": "Something is very bad" } + ] + } + """.trimIndent() + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("PID") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("PID") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("PID") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + /** + * Synthetic http responses with related request status + * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API + */ + @JvmStatic + fun mtbFileRequestWithResponseSource(): Set { + return setOf( + RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS), + RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING), + RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR), + // Some more errors not mentioned in documentation + RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + ) + } + + /** + * Synthetic http responses with related request status + * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API + */ + @JvmStatic + fun deleteRequestWithResponseSource(): Set { + return setOf( + RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS), + // Some more errors not mentioned in documentation + RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + ) + } + } + + +} \ No newline at end of file -- cgit v1.2.3 From 002b0618cf813d48bbff2d287e16f607a4c73d73 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 13:35:35 +0200 Subject: Add tests for KafkaMtbFileSender --- .../etl/processor/output/KafkaMtbFileSender.kt | 4 +- .../etl/processor/output/KafkaMtbFileSenderTest.kt | 169 +++++++++++++++++++++ 2 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 9448e29..e7f9769 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -50,7 +50,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } @@ -79,7 +79,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt new file mode 100644 index 0000000..14bdd5d --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -0,0 +1,169 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.output + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.SendResult +import java.util.concurrent.CompletableFuture.completedFuture +import java.util.concurrent.ExecutionException + +@ExtendWith(MockitoExtension::class) +class KafkaMtbFileSenderTest { + + private lateinit var kafkaTemplate: KafkaTemplate + + private lateinit var kafkaMtbFileSender: KafkaMtbFileSender + + private lateinit var objectMapper: ObjectMapper + + @BeforeEach + fun setup( + @Mock kafkaTemplate: KafkaTemplate + ) { + val kafkaTargetProperties = KafkaTargetProperties("testtopic") + this.objectMapper = ObjectMapper() + this.kafkaTemplate = kafkaTemplate + + this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) + } + + @ParameterizedTest + @MethodSource("requestWithResponseSource") + fun shouldSendMtbFileRequestAndReturnExpectedState(testData: TestData) { + doAnswer { + if (null != testData.exception) { + throw testData.exception + } + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + val response = kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE))) + assertThat(response.status).isEqualTo(testData.requestStatus) + } + + @ParameterizedTest + @MethodSource("requestWithResponseSource") + fun shouldSendDeleteRequestAndReturnExpectedState(testData: TestData) { + doAnswer { + if (null != testData.exception) { + throw testData.exception + } + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + val response = kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + assertThat(response.status).isEqualTo(testData.requestStatus) + } + + @Test + fun shouldSendMtbFileRequestWithCorrectKeyAndBody() { + doAnswer { + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE))) + + val captor = argumentCaptor() + verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") + assertThat(captor.secondValue).isNotNull + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) + } + + @Test + fun shouldSendDeleteRequestWithCorrectKeyAndBody() { + doAnswer { + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + + val captor = argumentCaptor() + verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") + assertThat(captor.secondValue).isNotNull + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) + } + + companion object { + fun mtbFile(consentStatus: Consent.Status): MtbFile { + return if (consentStatus == Consent.Status.ACTIVE) { + MtbFile.builder() + .withPatient( + Patient.builder() + .withId("PID") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(consentStatus) + .withPatient("PID") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("PID") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + } else { + MtbFile.builder() + .withConsent( + Consent.builder() + .withStatus(consentStatus) + .withPatient("PID") + .build() + ) + }.build() + } + + data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) + + @JvmStatic + fun requestWithResponseSource(): Set { + return setOf( + TestData(RequestStatus.UNKNOWN), + TestData(RequestStatus.ERROR, InterruptedException("Test interrupted")), + TestData(RequestStatus.ERROR, ExecutionException(RuntimeException("Test execution aborted"))) + ) + } + } + +} \ No newline at end of file -- cgit v1.2.3 From b956eba6c746e511028e257ae102164867c70766 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 16:44:26 +0200 Subject: Add workflow to run tests on push or pull request --- .github/workflows/test.yml | 28 ++++++++++++++++++++++++++++ build.gradle.kts | 11 +++++++++++ 2 files changed, 39 insertions(+) create mode 100644 .github/workflows/test.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..5ff199b --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,28 @@ +name: "Run Tests" + +on: + push: + branches: + - 'master' + pull-request: + branches: + - '*' + +jobs: + docker: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Setup Gradle + uses: gradle/gradle-build-action@v2 + + - name: Execute tests + run: ./gradlew test + + - name: Execute integration tests + run: ./gradlew integrationTest \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 1f1a123..7650575 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,6 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import org.springframework.boot.gradle.tasks.bundling.BootBuildImage plugins { war @@ -90,3 +91,13 @@ task("integrationTest") { shouldRunAfter("test") } + +tasks.named("bootBuildImage") { + imageName.set("ghcr.io/ccc-mf/etl-processor") + + environment.set(environment.get() + mapOf( + "BP_OCI_SOURCE" to "https://github.com/CCC-MF/etl-processor", + "BP_OCI_LICENSES" to "AGPLv3", + "BP_OCI_DESCRIPTION" to "ETL Processor for bwHC MTB files" + )) +} -- cgit v1.2.3 From 1e29ecc891fdfa9750af49e5a3506b6fe5ad8f46 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 17:32:02 +0200 Subject: Fix event name in workflow file --- .github/workflows/test.yml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5ff199b..d7d3e39 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,11 +2,9 @@ name: "Run Tests" on: push: - branches: - - 'master' - pull-request: - branches: - - '*' + branches: [ 'master' ] + pull_request: + branches: [ '*' ] jobs: docker: -- cgit v1.2.3 From d24d9a7fd0ddd725bb81da29e40b80a644c66249 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 17:46:11 +0200 Subject: Add docker deploy workflow --- .github/workflows/docker.yml | 30 ++++++++++++++++++++++++++++++ .github/workflows/test.yml | 5 +++-- 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/docker.yml diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 0000000..b74eaac --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,30 @@ +name: "Run docker build and deploy" + +on: + push: + tags: + - '*' + +jobs: + call-test-workflow: + uses: ./.github/workflows/test.yml + + docker: + runs-on: ubuntu-latest + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Execute image build and push + run: | + ./gradlew bootBuildImage + docker tag ghcr.io/ccc-mf/etl-processor ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} + docker push ghcr.io/ccc-mf/etl-processor + docker push ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d7d3e39..98067f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,13 +1,14 @@ name: "Run Tests" on: + workflow_call: push: branches: [ 'master' ] pull_request: branches: [ '*' ] jobs: - docker: + tests: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -17,7 +18,7 @@ jobs: distribution: 'temurin' - name: Setup Gradle - uses: gradle/gradle-build-action@v2 + uses: gradle/gradle-build-action@v2.4.2 - name: Execute tests run: ./gradlew test -- cgit v1.2.3 From 2ec5e27a402f36d87d3cf9a6369e77028200c3b2 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 18:07:25 +0200 Subject: Full setup for docker build --- .github/workflows/docker.yml | 14 +++++++++++--- .github/workflows/test.yml | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index b74eaac..175250c 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -2,16 +2,24 @@ name: "Run docker build and deploy" on: push: - tags: - - '*' + tags: [ '*' ] jobs: - call-test-workflow: + test: uses: ./.github/workflows/test.yml docker: runs-on: ubuntu-latest steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Setup Gradle + uses: gradle/gradle-build-action@v2.4.2 + - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 98067f1..fa00f69 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,6 +4,7 @@ on: workflow_call: push: branches: [ 'master' ] + tags-ignore: [ '*' ] pull_request: branches: [ '*' ] -- cgit v1.2.3 From 2264d85bd10cf18c8a6e7e0c7fb8f79bf7c24b87 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 18:29:25 +0200 Subject: Run docker workflow after test workflow --- .github/workflows/docker.yml | 9 ++++----- .github/workflows/test.yml | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 175250c..bf4f6a0 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -1,14 +1,13 @@ name: "Run docker build and deploy" on: - push: - tags: [ '*' ] + workflow_run: + workflows: [ 'Run Tests' ] + types: [ 'completed' ] jobs: - test: - uses: ./.github/workflows/test.yml - docker: + if: ${{ github.event.workflow_run.conclusion == 'success' && contains(github.event.ref, '/tags/') }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fa00f69..e7ee0d3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,10 +1,8 @@ -name: "Run Tests" +name: 'Run Tests' on: - workflow_call: push: branches: [ 'master' ] - tags-ignore: [ '*' ] pull_request: branches: [ '*' ] @@ -24,5 +22,17 @@ jobs: - name: Execute tests run: ./gradlew test + integrationTests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Setup Gradle + uses: gradle/gradle-build-action@v2.4.2 + - name: Execute integration tests run: ./gradlew integrationTest \ No newline at end of file -- cgit v1.2.3 From 25ec557c25378774194627775b460af290bfccee Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 18:44:33 +0200 Subject: Change condition when to run docker job --- .github/workflows/docker.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index bf4f6a0..d6f3e3e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -7,7 +7,7 @@ on: jobs: docker: - if: ${{ github.event.workflow_run.conclusion == 'success' && contains(github.event.ref, '/tags/') }} + if: ${{ github.event.workflow_run.conclusion == 'success' && github.ref_type == 'tag' }} runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 -- cgit v1.2.3 From 55153d805048a91c10543969e583c12b23e10c78 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 18:55:33 +0200 Subject: Add information about docker image --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index ea0c02b..33148df 100644 --- a/README.md +++ b/README.md @@ -53,4 +53,8 @@ Wird keine Rückantwort über Apache Kafka empfangen und es gibt keine weitere M Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es -für HTTP nicht gibt. \ No newline at end of file +für HTTP nicht gibt. + +## Docker-Images + +Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor \ No newline at end of file -- cgit v1.2.3 From 4dde13e79a0d7d35648c498f7a98b62aea05b9ec Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 18:59:04 +0200 Subject: Run tests on each tag --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e7ee0d3..f7c37f3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,6 +3,7 @@ name: 'Run Tests' on: push: branches: [ 'master' ] + tags: [ '*' ] pull_request: branches: [ '*' ] -- cgit v1.2.3 From 044d01534b1183449bfe7d2a783481b81feac455 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 20:17:10 +0200 Subject: Build and deploy docker image on new release --- .github/workflows/deploy.yml | 35 +++++++++++++++++++++++++++++++++++ .github/workflows/docker.yml | 37 ------------------------------------- build.gradle.kts | 2 +- 3 files changed, 36 insertions(+), 38 deletions(-) create mode 100644 .github/workflows/deploy.yml delete mode 100644 .github/workflows/docker.yml diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..6d15376 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,35 @@ +name: "Run build and deploy" + +on: + release: + types: [ 'published' ] + +jobs: + docker: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-java@v3 + with: + java-version: '17' + distribution: 'temurin' + + - name: Setup Gradle + uses: gradle/gradle-build-action@v2.4.2 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Execute image build and push + run: | + ./gradlew bootBuildImage + docker tag ghcr.io/ccc-mf/etl-processor ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} + docker push ghcr.io/ccc-mf/etl-processor + docker push ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} \ No newline at end of file diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml deleted file mode 100644 index d6f3e3e..0000000 --- a/.github/workflows/docker.yml +++ /dev/null @@ -1,37 +0,0 @@ -name: "Run docker build and deploy" - -on: - workflow_run: - workflows: [ 'Run Tests' ] - types: [ 'completed' ] - -jobs: - docker: - if: ${{ github.event.workflow_run.conclusion == 'success' && github.ref_type == 'tag' }} - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: actions/setup-java@v3 - with: - java-version: '17' - distribution: 'temurin' - - - name: Setup Gradle - uses: gradle/gradle-build-action@v2.4.2 - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - - - name: Login to Docker Hub - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Execute image build and push - run: | - ./gradlew bootBuildImage - docker tag ghcr.io/ccc-mf/etl-processor ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} - docker push ghcr.io/ccc-mf/etl-processor - docker push ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 7650575..ed8056d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { } group = "de.ukw.ccc" -version = "0.1.0-SNAPSHOT" +version = "0.1.0" java { sourceCompatibility = JavaVersion.VERSION_17 -- cgit v1.2.3 From cb9c5904729c90b86357d0668604b74f4f4e61f7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:13:45 +0200 Subject: Issue #2: Do not serialize JSON string as custom string (#4) In addition to that, if REST request did not contain a response body, use empty string as data quality report string.--- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 2 +- .../etl/processor/services/ResponseProcessor.kt | 4 +- .../etl/processor/output/RestMtbFileSenderTest.kt | 60 +++++++++++++++++----- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index f80ff69..1c59f5c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -50,7 +50,7 @@ class RestMtbFileSender( return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(response.statusCode.asRequestStatus()) + return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index d7ad86f..f2e9e2e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -55,14 +55,14 @@ class ResponseProcessor( RequestStatus.WARNING -> { it.report = Report( "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } RequestStatus.ERROR -> { it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt index 17d420a..78b5a45 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -61,7 +61,8 @@ class RestMtbFileSenderTest { } val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) - assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + assertThat(response.status).isEqualTo(requestWithResponse.response.status) + assertThat(response.body).isEqualTo(requestWithResponse.response.body) } @ParameterizedTest @@ -75,11 +76,16 @@ class RestMtbFileSenderTest { } val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile)) - assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + assertThat(response.status).isEqualTo(requestWithResponse.response.status) + assertThat(response.body).isEqualTo(requestWithResponse.response.body) } companion object { - data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus) + data class RequestWithResponse( + val httpStatus: HttpStatus, + val body: String, + val response: MtbFileSender.Response + ) private val warningBody = """ { @@ -123,6 +129,8 @@ class RestMtbFileSenderTest { ) .build() + private val errorResponseBody = "Sonstiger Fehler bei der Übertragung" + /** * Synthetic http responses with related request status * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API @@ -130,13 +138,33 @@ class RestMtbFileSenderTest { @JvmStatic fun mtbFileRequestWithResponseSource(): Set { return setOf( - RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS), - RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING), - RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR), + RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")), + RequestWithResponse( + HttpStatus.CREATED, + warningBody, + MtbFileSender.Response(RequestStatus.WARNING, warningBody) + ), + RequestWithResponse( + HttpStatus.BAD_REQUEST, + "??", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.UNPROCESSABLE_ENTITY, + errorBody, + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), // Some more errors not mentioned in documentation - RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + RequestWithResponse( + HttpStatus.NOT_FOUND, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.INTERNAL_SERVER_ERROR, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ) ) } @@ -147,10 +175,18 @@ class RestMtbFileSenderTest { @JvmStatic fun deleteRequestWithResponseSource(): Set { return setOf( - RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS), + RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)), // Some more errors not mentioned in documentation - RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + RequestWithResponse( + HttpStatus.NOT_FOUND, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.INTERNAL_SERVER_ERROR, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ) ) } } -- cgit v1.2.3 From 6ecb439007b4fa6dec9af1e0334b89fd235a97be Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:22:54 +0200 Subject: Issue #3: Detect the request type of request with last known status (#5) --- .../services/RequestServiceIntegrationTest.kt | 2 +- .../etl/processor/services/RequestProcessor.kt | 2 +- .../dnpm/etl/processor/services/RequestService.kt | 7 +-- .../etl/processor/services/RequestProcessorTest.kt | 8 +-- .../etl/processor/services/RequestServiceTest.kt | 58 +++++++++++++++------- 5 files changed, 49 insertions(+), 28 deletions(-) diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt index 3af218e..ff85296 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -116,7 +116,7 @@ class RequestServiceIntegrationTest : AbstractTestcontainerTest() { fun shouldReturnDeleteRequestAsLastRequest() { setupTestData() - val actual = requestService.isLastRequestDeletion("TEST_12345678901") + val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901") assertThat(actual).isTrue() } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index d2f8619..34156f7 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -95,7 +95,7 @@ class RequestProcessor( private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { val lastMtbFileRequestForPatient = requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) - val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id) return null != lastMtbFileRequestForPatient && !isLastRequestDeletion diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt index 0f69910..e0043d2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -38,8 +38,8 @@ class RequestService( fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) - fun isLastRequestDeletion(patientPseudonym: String) = - Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) = + Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym)) companion object { @@ -48,7 +48,8 @@ class RequestService( .sortedByDescending { it.processedAt } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - fun isLastRequestDeletion(allRequests: List) = allRequests + fun isLastRequestWithKnownStatusDeletion(allRequests: List) = allRequests + .filter { it.status != RequestStatus.UNKNOWN } .maxByOrNull { it.processedAt }?.type == RequestType.DELETE } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index f9d8182..7856833 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -92,7 +92,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { it.arguments[0] as String @@ -147,7 +147,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { it.arguments[0] as String @@ -202,7 +202,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { MtbFileSender.Response(status = RequestStatus.SUCCESS) @@ -261,7 +261,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { MtbFileSender.Response(status = RequestStatus.ERROR) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt index 3e0a979..3cf8804 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt @@ -68,23 +68,33 @@ class RequestServiceTest { patientId = "TEST_12345678901", pid = "P1", fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T00:00:00Z") + ), + Request( + id = 2L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdefd", type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-08-08T02:00:00Z") + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T02:00:00Z") ), Request( - id = 1L, + id = 3L, uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", type = RequestType.MTB_FILE, - status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") + status = RequestStatus.UNKNOWN, + processedAt = Instant.parse("2023-08-11T00:00:00Z") ) ) - val actual = RequestService.isLastRequestDeletion(requests) + val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests) assertThat(actual).isTrue() } @@ -98,23 +108,33 @@ class RequestServiceTest { patientId = "TEST_12345678901", pid = "P1", fingerprint = "0123456789abcdef1", - type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-07-07T02:00:00Z") + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T00:00:00Z") ), Request( - id = 1L, + id = 2L, uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", type = RequestType.MTB_FILE, status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 3L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.UNKNOWN, + processedAt = Instant.parse("2023-08-11T00:00:00Z") ) ) - val actual = RequestService.isLastRequestDeletion(requests) + val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests) assertThat(actual).isFalse() } @@ -197,7 +217,7 @@ class RequestServiceTest { @Test fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() { - requestService.isLastRequestDeletion("TEST_12345678901") + requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901") verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) } -- cgit v1.2.3 From 0e1034d964639fe295726c2f3c8bc801a1ff7017 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:47:20 +0200 Subject: New version and add status badge to README.md --- README.md | 2 +- build.gradle.kts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 33148df..43ed3c4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# ETL-Processor for bwHC data +# ETL-Processor for bwHC data [![Run Tests](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml/badge.svg)](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml) Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID. diff --git a/build.gradle.kts b/build.gradle.kts index ed8056d..37fe4e1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { } group = "de.ukw.ccc" -version = "0.1.0" +version = "0.1.1" java { sourceCompatibility = JavaVersion.VERSION_17 -- cgit v1.2.3 From bc48a7217eb98e9ec95e5c8b0908b2a1d8a6b27c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 14:37:48 +0200 Subject: Add more information about usage in an ETl process --- README.md | 22 ++++++++++++++++++++++ docs/etl.png | Bin 0 -> 76404 bytes 2 files changed, 22 insertions(+) create mode 100644 docs/etl.png diff --git a/README.md b/README.md index 43ed3c4..a547ab5 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,28 @@ Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID. +### Einordnung innerhalb einer DNPM-ETL-Strecke + +Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **onkostar-pligin-dnpmexport**. + +Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft. +Duplikate werden verworfen, Änderungen werden weitergeleitet. + +Löschanfragen werden immer als Löschanfrage an das bwHC-backend weitergeleitet. + +![Modell DNPM-ETL-Strecke](docs/etl.png) + +#### HTTP/REST-Konfiguration + +Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung direkt an das bwHC-Backend gesendet. + +#### Konfiguration für Apache Kafka + +Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung an Apache Kafka übergeben. +Eine Antwort wird dabei ebenfalls mithilfe von Apache Kafka übermittelt und nach der Entgegennahme verarbeitet. + +Siehe hierzu auch: https://github.com/CCC-MF/kafka-to-bwhc + ## Pseudonymisierung der Patienten-ID Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet. diff --git a/docs/etl.png b/docs/etl.png new file mode 100644 index 0000000..0ca5def Binary files /dev/null and b/docs/etl.png differ -- cgit v1.2.3 From 72295202ec37a76b90a919e39ae094bb7e56d202 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Aug 2023 22:19:29 +0200 Subject: Code cleanup --- .../dnpm/etl/processor/EtlProcessorApplicationTests.kt | 7 ++++++- .../etl/processor/pseudonym/GpasPseudonymGenerator.java | 4 +--- .../dev/dnpm/etl/processor/config/AppConfigProperties.kt | 2 +- .../dev/dnpm/etl/processor/pseudonym/extensions.kt | 2 +- .../dev/dnpm/etl/processor/services/RequestProcessor.kt | 3 --- .../dev/dnpm/etl/processor/services/ResponseProcessor.kt | 4 +--- .../dnpm/etl/processor/web/StatisticsRestController.kt | 6 +++--- .../dnpm/etl/processor/output/RestMtbFileSenderTest.kt | 16 ++++++++-------- .../dnpm/etl/processor/services/ResponseProcessorTest.kt | 6 +----- .../services/kafka/KafkaResponseProcessorTest.kt | 8 ++++---- 10 files changed, 26 insertions(+), 32 deletions(-) diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt index 6c5b150..c5a20bb 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -20,10 +20,13 @@ package dev.dnpm.etl.processor import dev.dnpm.etl.processor.output.MtbFileSender +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.ApplicationContext import org.springframework.test.context.junit.jupiter.SpringExtension import org.testcontainers.junit.jupiter.Testcontainers @@ -34,7 +37,9 @@ import org.testcontainers.junit.jupiter.Testcontainers class EtlProcessorApplicationTests : AbstractTestcontainerTest() { @Test - fun contextLoadsIfMtbFileSenderConfigured() { + fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) { + // Simply check bean configuration + assertThat(context).isNotNull } } diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index f13a034..732a770 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -69,13 +69,11 @@ import java.util.HashMap; public class GpasPseudonymGenerator implements Generator { + private final static FhirContext r4Context = FhirContext.forR4(); private final String gPasUrl; private final String psnTargetDomain; - private static FhirContext r4Context = FhirContext.forR4(); private final HttpHeaders httpHeader; - private final RetryTemplate retryTemplate = defaultTemplate(); - private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class); private SSLContext customSslContext; diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 6502a1b..06e730b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(AppConfigProperties.NAME) data class AppConfigProperties( - var bwhc_uri: String?, + var bwhcUri: String?, var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt index 580785d..c0050a4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -38,7 +38,7 @@ infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { this.histologyReports.forEach { it.patient = patientPseudonym } this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } } this.ngsReports.forEach { it.patient = patientPseudonym } this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } this.rebiopsyRequests.forEach { it.patient = patientPseudonym } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 34156f7..3cd912c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -30,7 +30,6 @@ import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils -import org.slf4j.LoggerFactory import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Instant @@ -45,8 +44,6 @@ class RequestProcessor( private val applicationEventPublisher: ApplicationEventPublisher ) { - private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile) { val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index f2e9e2e..677443a 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.services -import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus @@ -33,8 +32,7 @@ import java.util.* @Service class ResponseProcessor( private val requestRepository: RequestRepository, - private val statisticsUpdateProducer: Sinks.Many, - private val objectMapper: ObjectMapper + private val statisticsUpdateProducer: Sinks.Many ) { private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt index a418772..6f0e820 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt @@ -83,9 +83,9 @@ class StatisticsRestController( .groupBy { formatter.format(it.processedAt) } .map { val requestList = it.value - .groupBy { it.status } - .map { - Pair(it.key, it.value.size) + .groupBy { request -> request.status } + .map { request -> + Pair(request.key, request.value.size) } .toMap() Pair( diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt index 78b5a45..0cad285 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -105,7 +105,7 @@ class RestMtbFileSenderTest { } """.trimIndent() - val mtbFile = MtbFile.builder() + val mtbFile: MtbFile = MtbFile.builder() .withPatient( Patient.builder() .withId("PID") @@ -129,7 +129,7 @@ class RestMtbFileSenderTest { ) .build() - private val errorResponseBody = "Sonstiger Fehler bei der Übertragung" + private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung" /** * Synthetic http responses with related request status @@ -147,23 +147,23 @@ class RestMtbFileSenderTest { RequestWithResponse( HttpStatus.BAD_REQUEST, "??", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.UNPROCESSABLE_ENTITY, errorBody, - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), // Some more errors not mentioned in documentation RequestWithResponse( HttpStatus.NOT_FOUND, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.INTERNAL_SERVER_ERROR, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ) ) } @@ -180,12 +180,12 @@ class RestMtbFileSenderTest { RequestWithResponse( HttpStatus.NOT_FOUND, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.INTERNAL_SERVER_ERROR, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ) ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt index cfb1111..b9e4b7f 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -19,8 +19,6 @@ package dev.dnpm.etl.processor.services -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.KotlinModule import dev.dnpm.etl.processor.monitoring.Request import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus @@ -62,12 +60,10 @@ class ResponseProcessorTest { @Mock requestRepository: RequestRepository, @Mock statisticsUpdateProducer: Sinks.Many ) { - val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) - this.requestRepository = requestRepository this.statisticsUpdateProducer = statisticsUpdateProducer - this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt index 0f524ca..6d83146 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -45,7 +45,7 @@ class KafkaResponseProcessorTest { private lateinit var kafkaResponseProcessor: KafkaResponseProcessor - private fun createkafkaRecord( + private fun createKafkaRecord( requestId: String? = null, statusCode: Int = 200, statusBody: Map? = mapOf() @@ -79,14 +79,14 @@ class KafkaResponseProcessorTest { @Test fun shouldNotProcessRecordsWithoutValidKey() { - this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) verify(eventPublisher, never()).publishEvent(any()) } @Test fun shouldNotProcessRecordsWithoutValidBody() { - this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) verify(eventPublisher, never()).publishEvent(any()) } @@ -94,7 +94,7 @@ class KafkaResponseProcessorTest { @ParameterizedTest @MethodSource("statusCodeSource") fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { - this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode)) verify(eventPublisher, times(1)).publishEvent(any()) } -- cgit v1.2.3 From 7186a45f6c9e9d6a4919027236450113e4b666b0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Aug 2023 22:27:20 +0200 Subject: Add link to onkostar-plugin-dnpmexport --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a547ab5..48dc27c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisi ### Einordnung innerhalb einer DNPM-ETL-Strecke -Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **onkostar-pligin-dnpmexport**. +Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)**. Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft. Duplikate werden verworfen, Änderungen werden weitergeleitet. -- cgit v1.2.3 From 2e7ef25a4936ba0ea188cfd9a237ad6be0c6bffe Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Aug 2023 23:16:17 +0200 Subject: Update project version and versions in gradle file --- build.gradle.kts | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 37fe4e1..8eee6d0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -5,13 +5,20 @@ import org.springframework.boot.gradle.tasks.bundling.BootBuildImage plugins { war id("org.springframework.boot") version "3.1.2" - id("io.spring.dependency-management") version "1.1.0" + id("io.spring.dependency-management") version "1.1.3" kotlin("jvm") version "1.9.0" kotlin("plugin.spring") version "1.9.0" } group = "de.ukw.ccc" -version = "0.1.1" +version = "0.2.0-SNAPSHOT" + +var versions = mapOf( + "bwhc-dto-java" to "0.2.0", + "hapi-fhir" to "6.6.2", + "httpclient5" to "5.2.1", + "mockito-kotlin" to "5.1.0" +) java { sourceCompatibility = JavaVersion.VERSION_17 @@ -52,10 +59,10 @@ dependencies { implementation("org.flywaydb:flyway-mysql") implementation("commons-codec:commons-codec") implementation("io.projectreactor.kotlin:reactor-kotlin-extensions") - implementation("de.ukw.ccc:bwhc-dto-java:0.2.0") - implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2") - implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2") - implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1") + implementation("de.ukw.ccc:bwhc-dto-java:${versions["bwhc-dto-java"]}") + implementation("ca.uhn.hapi.fhir:hapi-fhir-base:${versions["hapi-fhir"]}") + implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:${versions["hapi-fhir"]}") + implementation("org.apache.httpcomponents.client5:httpclient5:${versions["httpclient5"]}") runtimeOnly("org.mariadb.jdbc:mariadb-java-client") runtimeOnly("org.postgresql:postgresql") developmentOnly("org.springframework.boot:spring-boot-devtools") @@ -64,7 +71,7 @@ dependencies { providedRuntime("org.springframework.boot:spring-boot-starter-tomcat") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.projectreactor:reactor-test") - testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0") + testImplementation("org.mockito.kotlin:mockito-kotlin:${versions["mockito-kotlin"]}") integrationTestImplementation("org.testcontainers:junit-jupiter") integrationTestImplementation("org.testcontainers:postgresql") } -- cgit v1.2.3 From 64b8636145291a3cd28b4354af9ce20e052d672a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 00:49:01 +0200 Subject: Update Apache Kafka service config for KRaft mode --- dev-compose.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dev-compose.yml b/dev-compose.yml index 9b25794..5012bab 100644 --- a/dev-compose.yml +++ b/dev-compose.yml @@ -6,6 +6,12 @@ services: - "9092:9092" environment: ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_CFG_NODE_ID: "0" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER mariadb: image: mariadb:10 -- cgit v1.2.3 From 66dc96680da5e263550413493578ebe936dde149 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 01:09:22 +0200 Subject: Update dev config and added related information into README.md --- README.md | 22 +++++++++++++++++++++- dev-compose.yml | 3 +++ src/main/resources/application-dev.yml | 7 +++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 48dc27c..805514f 100644 --- a/README.md +++ b/README.md @@ -79,4 +79,24 @@ für HTTP nicht gibt. ## Docker-Images -Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor \ No newline at end of file +Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor + +## Entwicklungssetup + +Zum Starten einer lokalen Entwicklungs- und Testumgebung kann die beiliegende Datei `dev-compose.yml` verwendet werden. +Diese kann zur Nutzung der Datenbanken **MariaDB** als auch **PostgreSQL** angepasst werden. + +Zur Nutzung von Apache Kafka muss dazu ein Eintrag im hosts-File vorgenommen werden und der Hostname `kafka` auf die lokale +IP-Adresse verweisen. Ohne diese Einstellung ist eine Nutzung von Apache Kafka außerhalb der Docker-Umgebung nicht möglich. + +Beim Start der Anwendung mit dem Profil `dev` wird die in `dev-compose.yml` definierte Umgebung beim Start der +Anwendung mit gestartet: + +``` +SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun +``` + +Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`. + +Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet. +Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`. \ No newline at end of file diff --git a/dev-compose.yml b/dev-compose.yml index 5012bab..8f0780f 100644 --- a/dev-compose.yml +++ b/dev-compose.yml @@ -1,4 +1,7 @@ services: + + # Note: Make sure, hostname "kafka" points to 127.0.0.1 + # otherwise connection will not be available kafka: image: bitnami/kafka hostname: kafka diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 551f3f8..b1cc2fc 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -4,8 +4,11 @@ spring: file: ./dev-compose.yml app: - rest: - uri: http://localhost:9000/bwhc/etl/api + #rest: + # uri: http://localhost:9000/bwhc/etl/api + + # Note: Make sure, hostname "kafka" points to 127.0.0.1 + # otherwise connection will not be available kafka: topic: test response-topic: test-response -- cgit v1.2.3 From 78b228716396d6e761d08a11a846deb83bdc2e50 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 08:51:23 +0200 Subject: Add information about Kafka retention time --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index 805514f..58092ba 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,19 @@ Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es für HTTP nicht gibt. +#### Retention Time + +Generell werden in Apache Kafka alle Records entsprechend der Konfiguration vorgehalten. +So wird ohne spezielle Konfiguration ein Record für 7 Tage in Apache Kafka gespeichert. +Es sind innerhalb dieses Zeitraums auch alte Informationen weiterhin enthalten, wenn der Consent später abgelehnt wurde. + +Durch eine entsprechende Konfiguration des Topics kann dies verhindert werden. + +Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records nach einem Tag +``` +kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000 +``` + ## Docker-Images Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor -- cgit v1.2.3 From 2eb5cc61b9809523d51d1fa7af7a1afc1fdb7f0c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 10:58:17 +0200 Subject: Change Kafka response body JSON alias --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index ef880f4..68e3611 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -84,7 +84,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: Map + @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, + @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 8dc82225a4cd45a315fac3efe4d76513e6d536fc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 16 Aug 2023 15:25:46 +0200 Subject: Issue #7: Send and expect requestId in record body, not in record key (#8) --- .../etl/processor/output/KafkaMtbFileSender.kt | 12 ++--- .../etl/processor/services/ResponseProcessor.kt | 2 +- .../services/kafka/KafkaResponseProcessor.kt | 54 +++++++++------------- src/main/resources/application-dev.yml | 2 +- .../etl/processor/output/KafkaMtbFileSenderTest.kt | 12 +++-- .../services/kafka/KafkaResponseProcessorTest.kt | 54 +++++++++++++++++----- 6 files changed, 80 insertions(+), 56 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index e7f9769..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -40,7 +40,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(request.mtbFile) + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") @@ -68,7 +68,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(dummyMtbFile) + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) if (result.get() != null) { @@ -85,12 +85,12 @@ class KafkaMtbFileSender( private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId}\", " + - "\"requestId\": \"${request.requestId}\"}" + return "{\"pid\": \"${request.patientId}\"}" } + + data class Data(val requestId: String, val content: MtbFile) } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index 677443a..4048348 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -71,7 +71,7 @@ class ResponseProcessor( } else -> { - logger.error("Cannot process response: Unknown response code!") + logger.error("Cannot process response: Unknown response!") return@ifPresentOrElse } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 68e3611..a29010f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -41,50 +41,40 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) Optional.empty() - }.ifPresentOrElse({ responseKey -> - val event = try { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - responseBody.statusCode.asRequestStatus(), - when (responseBody.statusCode.asRequestStatus()) { - RequestStatus.SUCCESS -> { - Optional.empty() - } + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - RequestStatus.WARNING, RequestStatus.ERROR -> { - Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - else -> { - logger.error("Kafka response: Unknown response code!") - Optional.empty() - } + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() } - ) - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - RequestStatus.ERROR, - Optional.of("Cannot process Kafka response") - ) - } + } + ) eventPublisher.publishEvent(event) }, { - logger.error("No response key in Kafka response") + logger.error("No requestId in Kafka response") }) } - data class ResponseKey(val requestId: String) - data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) + } \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index b1cc2fc..a60cd8a 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -11,7 +11,7 @@ app: # otherwise connection will not be available kafka: topic: test - response-topic: test-response + response-topic: test_response servers: kafka:9092 server: diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index 14bdd5d..3ec9757 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE))) } @Test @@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED))) } companion object { @@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest { }.build() } + fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data { + return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus)) + } + data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) @JvmStatic diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt index 6d83146..95bf41b 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -46,7 +46,7 @@ class KafkaResponseProcessorTest { private lateinit var kafkaResponseProcessor: KafkaResponseProcessor private fun createKafkaRecord( - requestId: String? = null, + requestId: String, statusCode: Int = 200, statusBody: Map? = mapOf() ): ConsumerRecord { @@ -54,15 +54,11 @@ class KafkaResponseProcessorTest { "test-topic", 0, 0, - if (requestId == null) { - null - } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) - }, + null, if (statusBody == null) { "" } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody)) } ) } @@ -78,17 +74,51 @@ class KafkaResponseProcessorTest { } @Test - fun shouldNotProcessRecordsWithoutValidKey() { - this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) + fun shouldNotProcessRecordsWithoutRequestIdInBody() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "statusCode": 200, + "statusBody": {} + } + """.trimIndent() + ) + + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldProcessRecordsWithAliasNames() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "request_id": "test0123456789", + "status_code": 200, + "status_body": {} + } + """.trimIndent() + ) - verify(eventPublisher, never()).publishEvent(any()) + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, times(1)).publishEvent(any()) } @Test - fun shouldNotProcessRecordsWithoutValidBody() { + fun shouldNotProcessRecordsWithoutValidStatusBody() { this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) - verify(eventPublisher, never()).publishEvent(any()) + verify(eventPublisher, never()).publishEvent(any()) } @ParameterizedTest -- cgit v1.2.3 From 5bd26b894c3cd08ce8aee75c778083e20abefee9 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 18 Aug 2023 22:15:10 +0200 Subject: Add information about key based retention config --- README.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 58092ba..12acf52 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,32 @@ Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records n kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000 ``` +#### Key based Retention + +Möchten Sie hingegen immer nur die letzte Meldung für einen Patienten und eine Erkrankung in Apache Kafka vorhalten, +so ist die nachfolgend genannte Konfiguration der Kafka-Topics hilfreich. + + +* `retention.ms`: Möglichst kurze Zeit in der alte Records noch erhalten bleiben, z.B. 10 Sekunden 10000 +* `cleanup.policy`: Löschen alter Records und Beibehalten des letzten Records zu einem Key [delete,compact] + +Beispiele für ein Topic `test`, hier bitte an die verwendeten Topics anpassen. + +``` +kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=10000 +kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config cleanup.policy=[delete,compact] +``` + +Da als Key eines Records die (pseudonymisierte) Patienten-ID und die (anonymisierte) Erkrankungs-ID verwendet wird, +stehen mit obiger Konfiguration der Kafka-Topics nach 10 Sekunden nur noch der jeweils letzte Eintrag für den entsprechenden +Key zur Verfügung. + +Da der Key sowohl für die Records in Richtung bwHC-Backend für die Rückantwort identisch aufgebaut ist, lassen sich so +auch im Falle eines Consent-Widerspruchs die enthaltenen Daten als auch die Offenlegung durch Verifikationsdaten in der +Antwort effektiv verhindern, da diese nach 10 Sekunden gelöscht werden. +Es steht dann nur noch die jeweils letzten Information zur Verfügung, dass für einen Patienten/eine Erkrankung +ein Consent-Widerspruch erfolgte. + ## Docker-Images Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor @@ -112,4 +138,4 @@ SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`. Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet. -Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`. \ No newline at end of file +Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`. -- cgit v1.2.3 From 9921e1e684cbc236ac645d5172a2385fa69e5bbc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 19 Aug 2023 11:45:21 +0200 Subject: Throw PseudonymRequestFailed exception with error message This will throw an exception with error message describing what the error is instead of having a more generic NoSuchElementException to be thrown if Optional.get() has no value after calling findFirst() on an empty stream. --- .../etl/processor/pseudonym/GpasPseudonymGenerator.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 732a770..91e465b 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -108,12 +108,19 @@ public class GpasPseudonymGenerator implements Generator { @NotNull public static String unwrapPseudonym(Parameters gPasPseudonymResult) { - Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst() - .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst() - .orElseGet(ParametersParameterComponent::new).getValue(); + final var parameters = gPasPseudonymResult.getParameter().stream().findFirst(); + + if (parameters.isEmpty()) { + throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one"); + } + + final var identifier = (Identifier) parameters.get().getPart().stream() + .filter(a -> a.getName().equals("pseudonym")) + .findFirst() + .orElseGet(ParametersParameterComponent::new).getValue(); // pseudonym - return pseudonym.getSystem() + "|" + pseudonym.getValue(); + return identifier.getSystem() + "|" + identifier.getValue(); } -- cgit v1.2.3