From 51cf7a7917d7376d1e7c685b9c0e56d8929ad9e1 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 11:52:11 +0200 Subject: Add processor to handle responses from Kafka topic --- .../dnpm/etl/processor/EtlProcessorApplication.kt | 1 - .../etl/processor/config/AppConfigProperties.kt | 4 +- .../dnpm/etl/processor/config/AppConfiguration.kt | 16 +--- .../etl/processor/config/AppKafkaConfiguration.kt | 70 +++++++++++++++++ .../dev/dnpm/etl/processor/monitoring/Request.kt | 4 +- .../services/kafka/KafkaResponseProcessor.kt | 87 ++++++++++++++++++++++ src/main/resources/application-dev.yml | 9 ++- 7 files changed, 171 insertions(+), 20 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt index 0c4ab68..5d28c97 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt @@ -28,4 +28,3 @@ class EtlProcessorApplication fun main(args: Array) { runApplication(*args) } - diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 64be70d..6502a1b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -48,7 +48,7 @@ data class GPasConfigProperties( val password: String?, val sslCaLocation: String?, -) { + ) { companion object { const val NAME = "app.pseudonymize.gpas" } @@ -66,6 +66,8 @@ data class RestTargetProperties( @ConfigurationProperties(KafkaTargetProperties.NAME) data class KafkaTargetProperties( val topic: String = "etl-processor", + val responseTopic: String = "${topic}_response", + val groupId: String = "${topic}_group", val servers: String = "" ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index c677f2b..cbba1f1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator @@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks @Configuration @@ -60,7 +58,10 @@ class AppConfiguration { } @Bean - fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService { + fun pseudonymizeService( + generator: Generator, + pseudonymizeConfigProperties: PseudonymizeConfigProperties + ): PseudonymizeService { return PseudonymizeService(generator, pseudonymizeConfigProperties) } @@ -70,15 +71,6 @@ class AppConfiguration { return RestMtbFileSender(restTargetProperties) } - @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) - @Bean - fun kafkaMtbFileSender( - kafkaTemplate: KafkaTemplate, - objectMapper: ObjectMapper - ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt new file mode 100644 index 0000000..f81d3fb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer + +@Configuration +@EnableConfigurationProperties( + value = [KafkaTargetProperties::class] +) +@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +class AppKafkaConfiguration { + + @Bean + fun kafkaMtbFileSender( + kafkaTemplate: KafkaTemplate, + objectMapper: ObjectMapper + ): MtbFileSender { + return KafkaMtbFileSender(kafkaTemplate, objectMapper) + } + + @Bean + fun kafkaListenerContainer( + consumerFactory: ConsumerFactory, + kafkaTargetProperties: KafkaTargetProperties, + kafkaResponseProcessor: KafkaResponseProcessor + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + containerProperties.messageListener = kafkaResponseProcessor + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + fun kafkaResponseProcessor( + requestRepository: RequestRepository, + objectMapper: ObjectMapper + ): KafkaResponseProcessor { + return KafkaResponseProcessor(requestRepository, objectMapper) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt index ecd8219..c1d4d43 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt @@ -36,9 +36,9 @@ data class Request( val patientId: String, val pid: String, val fingerprint: String, - val status: RequestStatus, val type: RequestType, - val processedAt: Instant = Instant.now(), + var status: RequestStatus, + var processedAt: Instant = Instant.now(), @Embedded.Nullable var report: Report? = null ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt new file mode 100644 index 0000000..f0c91cb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -0,0 +1,87 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.LoggerFactory +import org.springframework.kafka.listener.MessageListener +import java.time.Instant + +class KafkaResponseProcessor( + private val requestRepository: RequestRepository, + private val objectMapper: ObjectMapper +) : MessageListener { + + private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java) + + override fun onMessage(data: ConsumerRecord) { + try { + val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) + requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + when (responseBody.statusCode) { + 200 -> { + it.status = RequestStatus.SUCCESS + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + requestRepository.save(it) + } + + 201 -> { + it.status = RequestStatus.WARNING + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Warnungen über mangelhafte Daten", + responseBody.statusBody + ) + requestRepository.save(it) + } + + 400, 422 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + responseBody.statusBody + ) + requestRepository.save(it) + } + + else -> { + logger.error("Cannot process Kafka response: Unknown response code!") + } + } + } + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + } + } + + data class ResponseKey(val requestId: String) + + data class ResponseBody( + @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_body") val statusBody: String + ) +} \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 99e4bbf..551f3f8 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -5,10 +5,11 @@ spring: app: rest: - uri: http://localhost:9000/bwhc/etl/api/MTBFile - #kafka: - # topic: test - # servers: kafka:9092 + uri: http://localhost:9000/bwhc/etl/api + kafka: + topic: test + response-topic: test-response + servers: kafka:9092 server: port: 8000 -- cgit v1.2.3 From 35cb258b13543b37ce061f78eef4427e542ca72a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:18 +0200 Subject: Do not return specific status code based on remote status code --- .../dnpm/etl/processor/services/RequestProcessor.kt | 17 ++++------------- .../dev/dnpm/etl/processor/web/MtbFileController.kt | 21 ++++++--------------- src/main/resources/application.yml | 2 ++ 3 files changed, 12 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 8588ebe..7d110b1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -42,7 +42,7 @@ class RequestProcessor( private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile): RequestStatus { + fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) @@ -62,7 +62,7 @@ class RequestProcessor( ) ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return RequestStatus.DUPLICATION + return } val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) @@ -115,11 +115,9 @@ class RequestProcessor( ) statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return requestStatus } - fun processDeletion(patientId: String): RequestStatus { + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() try { @@ -178,10 +176,6 @@ class RequestProcessor( } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return overallRequestStatus } catch (e: Exception) { requestRepository.save( Request( @@ -194,11 +188,8 @@ class RequestProcessor( report = Report("Fehler bei der Pseudonymisierung") ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - - return RequestStatus.ERROR } + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt index a2cc953..cf0e693 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.web import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.services.RequestProcessor import org.slf4j.LoggerFactory import org.springframework.http.ResponseEntity @@ -35,24 +34,16 @@ class MtbFileController( @PostMapping(path = ["/mtbfile"]) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - val requestStatus = requestProcessor.processMtbFile(mtbFile) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + return ResponseEntity.accepted().build() } @DeleteMapping(path = ["/mtbfile/{patientId}"]) fun deleteData(@PathVariable patientId: String): ResponseEntity { - val requestStatus = requestProcessor.processDeletion(patientId) - - return if (requestStatus == RequestStatus.ERROR) { - ResponseEntity.unprocessableEntity().build() - } else { - ResponseEntity.noContent().build() - } + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() } } \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 39acb37..72edde6 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,5 +3,7 @@ spring: bootstrap-servers: ${app.kafka.servers} template: default-topic: ${app.kafka.topic} + consumer: + group-id: ${app.kafka.group-id} flyway: locations: "classpath:db/migration/{vendor}" \ No newline at end of file -- cgit v1.2.3 From 70d4fa2f0ff4b38757cabc967b3f38a63674ed47 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:10:53 +0200 Subject: Use duplication fingerprinting based on MTB file requests only --- src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 7d110b1..afac40b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -48,6 +48,7 @@ class RequestProcessor( val lastRequestForPatient = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { -- cgit v1.2.3 From 7f8b21efd2273bb7b4ee0d93ff4988bade2fa610 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:23:16 +0200 Subject: Handle not parsable data quality reports --- .../dev/dnpm/etl/processor/monitoring/ReportService.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 6ee8ae9..8c31ede 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.monitoring import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException import com.fasterxml.jackson.databind.ObjectMapper @@ -33,9 +34,14 @@ class ReportService( } return try { objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues - } catch (e: JsonMappingException) { - e.printStackTrace() - listOf() + } catch (e: Exception) { + val otherIssue = + Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'") + return when (e) { + is JsonMappingException -> listOf(otherIssue) + is JsonParseException -> listOf(otherIssue) + else -> throw e + } } } -- cgit v1.2.3 From 577509e6f2d502d4dbbf1a1b526c43497fde56b3 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 2 Aug 2023 16:53:09 +0200 Subject: Map 'status_code' and 'status code' to same data value --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index f0c91cb..d6c4da6 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.services.kafka +import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report @@ -81,7 +82,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, @JsonProperty("status_body") val statusBody: String ) } \ No newline at end of file -- cgit v1.2.3 From ac91620651daa2f9aa09709eaa0bb5a8f7222e71 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 12:59:53 +0200 Subject: Use Map as status body since it contains JSON --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index d6c4da6..547833c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -54,7 +54,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Warnungen über mangelhafte Daten", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -64,7 +64,7 @@ class KafkaResponseProcessor( it.processedAt = Instant.ofEpochMilli(data.timestamp()) it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - responseBody.statusBody + objectMapper.writeValueAsString(responseBody.statusBody) ) requestRepository.save(it) } @@ -83,6 +83,6 @@ class KafkaResponseProcessor( data class ResponseBody( @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: String + @JsonProperty("status_body") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 3dcee41569b462a4b938380b5ac3b208728fb358 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 15:14:49 +0200 Subject: Implement delete request using Apache Kafka This is implemented using a fake MTB file containing a rejected consent state and will be mapped to HTTP DELETE on kafka-to-bwhc consumer. --- .../etl/processor/output/KafkaMtbFileSender.kt | 35 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 55503cf..da25576 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -20,6 +20,8 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -42,16 +44,38 @@ class KafkaMtbFileSender( } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } - } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } } - // TODO not yet implemented override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { - return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + val dummyMtbFile = MtbFile.builder() + .withConsent( + Consent.builder() + .withPatient(request.patientId) + .withStatus(Consent.Status.REJECTED) + .build() + ) + .build() + + return try { + val result = kafkaTemplate.sendDefault( + header(request), + objectMapper.writeValueAsString(dummyMtbFile) + ) + + if (result.get() != null) { + logger.debug("Sent deletion request via KafkaMtbFileSender") + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { + logger.error("An error occurred sending to kafka", e) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + } } private fun header(request: MtbFileSender.MtbFileRequest): String { @@ -59,4 +83,9 @@ class KafkaMtbFileSender( "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } + + private fun header(request: MtbFileSender.DeleteRequest): String { + return "{\"pid\": \"${request.patientId}\", " + + "\"requestId\": \"${request.requestId}\"}" + } } \ No newline at end of file -- cgit v1.2.3 From ec76c775d9ab7c863cd0c55e6ba1232181f6775c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:04:31 +0200 Subject: Explicit producer topic configuration --- .../etl/processor/config/AppKafkaConfiguration.kt | 5 +++-- .../dnpm/etl/processor/output/KafkaMtbFileSender.kt | 20 ++++++++++++-------- .../services/kafka/KafkaResponseProcessor.kt | 13 +++++++++++++ src/main/resources/application.yml | 2 -- 4 files changed, 28 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index f81d3fb..7adcb02 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -43,9 +43,10 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, + kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } @Bean @@ -54,7 +55,7 @@ class AppKafkaConfiguration { kafkaTargetProperties: KafkaTargetProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index da25576..d903745 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.config.KafkaTargetProperties import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, + private val kafkaTargetProperties: KafkaTargetProperties, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -34,13 +36,14 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(request.mtbFile) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -61,14 +64,15 @@ class KafkaMtbFileSender( .build() return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(dummyMtbFile) ) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -78,13 +82,13 @@ class KafkaMtbFileSender( } } - private fun header(request: MtbFileSender.MtbFileRequest): String { + private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } - private fun header(request: MtbFileSender.DeleteRequest): String { + private fun key(request: MtbFileSender.DeleteRequest): String { return "{\"pid\": \"${request.patientId}\", " + "\"requestId\": \"${request.requestId}\"}" } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 547833c..fd047d0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -42,6 +42,9 @@ class KafkaResponseProcessor( val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + + println("${responseBody.statusCode}") + when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS @@ -69,6 +72,16 @@ class KafkaResponseProcessor( requestRepository.save(it) } + in 900..999 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", + objectMapper.writeValueAsString(responseBody.statusBody) + ) + requestRepository.save(it) + } + else -> { logger.error("Cannot process Kafka response: Unknown response code!") } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 72edde6..5cd47c0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,8 +1,6 @@ spring: kafka: bootstrap-servers: ${app.kafka.servers} - template: - default-topic: ${app.kafka.topic} consumer: group-id: ${app.kafka.group-id} flyway: -- cgit v1.2.3 From b14f2c1794fe41f9ec5e9e400d51c0fbf991953a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 3 Aug 2023 16:18:16 +0200 Subject: Add information about 'no connection' responses --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 2 -- 1 file changed, 2 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index fd047d0..1e9263d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -43,8 +43,6 @@ class KafkaResponseProcessor( requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - println("${responseBody.statusCode}") - when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS -- cgit v1.2.3 From 459ad59c1d988a5b3ecc60d844f4fa6c9bce11f5 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 4 Aug 2023 11:43:23 +0200 Subject: Do not detect duplicates after deletion request --- .../kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index afac40b..bdf2827 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -46,12 +46,15 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val lastRequestForPatient = - requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + + val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { + val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE + + if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { requestRepository.save( Request( patientId = pseudonymized.patient.id, -- cgit v1.2.3 From 3039b4b2a7eb7963d0952a28ca5fd26328640223 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 13:23:18 +0200 Subject: Add basic Testcontainers test setup --- .../etl/processor/AbstractTestcontainerTest.kt | 45 ++++++++++++++++++++++ .../etl/processor/BwhcMapperApplicationTests.kt | 37 ++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt (limited to 'src') diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt new file mode 100644 index 0000000..3bd934f --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -0,0 +1,45 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.junit.jupiter.Container + +abstract class AbstractTestcontainerTest { + + companion object { + @Container + val dbContainer = PostgreSQLContainer("postgres:10-alpine") + .withDatabaseName("test") + .withUsername("test") + .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") + + @DynamicPropertySource + @JvmStatic + fun registerDynamicProperties(registry: DynamicPropertyRegistry) { + registry.add("spring.datasource.url", dbContainer::getJdbcUrl) + registry.add("spring.datasource.username", dbContainer::getUsername) + registry.add("spring.datasource.password", dbContainer::getPassword) + } + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt new file mode 100644 index 0000000..efa6a66 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class BwhcMapperApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} -- cgit v1.2.3 From 1fc09d691ea01415a21f1192cc7b1cf25bc0ac14 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 13:34:53 +0200 Subject: Rename test class to match applications main class name --- .../etl/processor/BwhcMapperApplicationTests.kt | 37 ---------------------- .../etl/processor/EtlProcessorApplicationTests.kt | 37 ++++++++++++++++++++++ 2 files changed, 37 insertions(+), 37 deletions(-) delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt (limited to 'src') diff --git a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt deleted file mode 100644 index efa6a66..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/BwhcMapperApplicationTests.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.testcontainers.junit.jupiter.Testcontainers - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -class BwhcMapperApplicationTests : AbstractTestcontainerTest() { - - @Test - fun contextLoads() { - } - -} diff --git a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt new file mode 100644 index 0000000..07a201b --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class EtlProcessorApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} -- cgit v1.2.3 From bcc23f6b14436ba6f4585a583da6c236df68e25a Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 14:50:12 +0200 Subject: Add RequestService to handle access to requests --- .../etl/processor/services/RequestProcessor.kt | 17 +- .../dnpm/etl/processor/services/RequestService.kt | 56 ++++++ .../services/RequestServiceIntegrationTest.kt | 131 +++++++++++++ .../etl/processor/services/RequestServiceTest.kt | 205 +++++++++++++++++++++ 4 files changed, 402 insertions(+), 7 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index bdf2827..e04e568 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -21,7 +21,10 @@ package dev.dnpm.etl.processor.services import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.* +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import org.apache.commons.codec.binary.Base32 @@ -35,7 +38,7 @@ import java.util.* class RequestProcessor( private val pseudonymizeService: PseudonymizeService, private val senders: List, - private val requestRepository: RequestRepository, + private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many ) { @@ -46,7 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) val lastMtbFileRequestForPatient = allRequests .filter { it.type == RequestType.MTB_FILE } @@ -55,7 +58,7 @@ class RequestProcessor( val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { - requestRepository.save( + requestService.save( Request( patientId = pseudonymized.patient.id, pid = pid, @@ -99,7 +102,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = request.requestId, patientId = request.mtbFile.patient.id, @@ -165,7 +168,7 @@ class RequestProcessor( RequestStatus.UNKNOWN } - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = patientPseudonym, @@ -181,7 +184,7 @@ class RequestProcessor( ) ) } catch (e: Exception) { - requestRepository.save( + requestService.save( Request( uuid = requestId, patientId = "???", diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt new file mode 100644 index 0000000..0f69910 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -0,0 +1,56 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.springframework.stereotype.Service + +@Service +class RequestService( + private val requestRepository: RequestRepository +) { + + fun save(request: Request) = requestRepository.save(request) + + fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository + .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym) + + fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = + Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) + + fun isLastRequestDeletion(patientPseudonym: String) = + Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + + companion object { + + fun lastMtbFileRequestForPatientPseudonym(allRequests: List) = allRequests + .filter { it.type == RequestType.MTB_FILE } + .sortedByDescending { it.processedAt } + .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + + fun isLastRequestDeletion(allRequests: List) = allRequests + .maxByOrNull { it.processedAt }?.type == RequestType.DELETE + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt new file mode 100644 index 0000000..d71e011 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -0,0 +1,131 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.AbstractTestcontainerTest +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.springframework.transaction.annotation.Transactional +import org.testcontainers.junit.jupiter.Testcontainers +import java.time.Instant +import java.util.* + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +@Transactional +class RequestServiceIntegrationTest : AbstractTestcontainerTest() { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + @BeforeEach + fun setup( + @Autowired requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldResultInEmptyRequestList() { + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).isEmpty() + } + + private fun setupTestData() { + // Prepare DB + this.requestRepository.saveAll( + listOf( + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + // Should be ignored - wrong patient ID --> + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ), + // <-- + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P2", + fingerprint = "0123456789abcdee1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + ) + ) + } + + @Test + fun shouldResultInSortedRequestList() { + setupTestData() + + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).hasSize(2) + assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") + assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") + } + + @Test + fun shouldReturnDeleteRequestAsLastRequest() { + setupTestData() + + val actual = requestService.isLastRequestDeletion("TEST_12345678901") + + assertThat(actual).isTrue() + } + + @Test + fun shouldReturnLastMtbFileRequest() { + setupTestData() + + val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + assertThat(actual).isNotNull + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt new file mode 100644 index 0000000..3e0a979 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt @@ -0,0 +1,205 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.Mockito.* +import org.mockito.junit.jupiter.MockitoExtension +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class RequestServiceTest { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + private fun anyRequest() = any(Request::class.java) ?: Request( + id = 0L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_dummy", + pid = "PX", + fingerprint = "dummy", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldIndicateLastRequestIsDeleteRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.isLastRequestDeletion(requests) + + assertThat(actual).isTrue() + } + + @Test + fun shouldIndicateLastRequestIsNotDeleteRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.isLastRequestDeletion(requests) + + assertThat(actual).isFalse() + } + + @Test + fun shouldReturnPatientsLastRequest() { + val requests = listOf( + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ) + ) + + val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests) + + assertThat(actual).isInstanceOf(Request::class.java) + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef2") + } + + @Test + fun shouldReturnNullIfNoRequests() { + val requests = listOf() + + val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests) + + assertThat(actual).isNull() + } + + @Test + fun saveShouldSaveRequestUsingRepository() { + doAnswer { + val obj = it.arguments[0] as Request + obj.copy(id = 1L) + }.`when`(requestRepository).save(anyRequest()) + + val request = Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ) + + requestService.save(request) + + verify(requestRepository, times(1)).save(anyRequest()) + } + + @Test + fun allRequestsByPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() { + requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + + @Test + fun lastMtbFileRequestForPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() { + requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + + @Test + fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() { + requestService.isLastRequestDeletion("TEST_12345678901") + + verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) + } + +} \ No newline at end of file -- cgit v1.2.3 From 4051b5094ca8daaa844803d2725b4094f3eed096 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 14:58:10 +0200 Subject: Keep database testcontainer alive until all tests are done --- .../kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt index 3bd934f..13b57d0 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -28,7 +28,7 @@ abstract class AbstractTestcontainerTest { companion object { @Container - val dbContainer = PostgreSQLContainer("postgres:10-alpine") + val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") .withDatabaseName("test") .withUsername("test") .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") @@ -42,4 +42,10 @@ abstract class AbstractTestcontainerTest { } } +} + +class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { + override fun stop() { + // Keep Testcontainer alive until JVM destroys it + } } \ No newline at end of file -- cgit v1.2.3 From b75328b74d361b7afd6197a5b240f5f76ce2c5e0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 15:16:58 +0200 Subject: Move integration tests into own source-set --- .../etl/processor/AbstractTestcontainerTest.kt | 51 ++++++++ .../etl/processor/EtlProcessorApplicationTests.kt | 37 ++++++ .../services/RequestServiceIntegrationTest.kt | 131 +++++++++++++++++++++ .../etl/processor/AbstractTestcontainerTest.kt | 51 -------- .../etl/processor/EtlProcessorApplicationTests.kt | 37 ------ .../services/RequestServiceIntegrationTest.kt | 131 --------------------- 6 files changed, 219 insertions(+), 219 deletions(-) create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt delete mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt new file mode 100644 index 0000000..13b57d0 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt @@ -0,0 +1,51 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.PostgreSQLContainer +import org.testcontainers.junit.jupiter.Container + +abstract class AbstractTestcontainerTest { + + companion object { + @Container + val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") + .withDatabaseName("test") + .withUsername("test") + .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") + + @DynamicPropertySource + @JvmStatic + fun registerDynamicProperties(registry: DynamicPropertyRegistry) { + registry.add("spring.datasource.url", dbContainer::getJdbcUrl) + registry.add("spring.datasource.username", dbContainer::getUsername) + registry.add("spring.datasource.password", dbContainer::getPassword) + } + } + +} + +class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { + override fun stop() { + // Keep Testcontainer alive until JVM destroys it + } +} \ No newline at end of file diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt new file mode 100644 index 0000000..07a201b --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -0,0 +1,37 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor + +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.junit.jupiter.Testcontainers + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +class EtlProcessorApplicationTests : AbstractTestcontainerTest() { + + @Test + fun contextLoads() { + } + +} diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt new file mode 100644 index 0000000..d71e011 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -0,0 +1,131 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import dev.dnpm.etl.processor.AbstractTestcontainerTest +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.springframework.transaction.annotation.Transactional +import org.testcontainers.junit.jupiter.Testcontainers +import java.time.Instant +import java.util.* + +@Testcontainers +@ExtendWith(SpringExtension::class) +@SpringBootTest +@Transactional +class RequestServiceIntegrationTest : AbstractTestcontainerTest() { + + private lateinit var requestRepository: RequestRepository + + private lateinit var requestService: RequestService + + @BeforeEach + fun setup( + @Autowired requestRepository: RequestRepository + ) { + this.requestRepository = requestRepository + this.requestService = RequestService(requestRepository) + } + + @Test + fun shouldResultInEmptyRequestList() { + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).isEmpty() + } + + private fun setupTestData() { + // Prepare DB + this.requestRepository.saveAll( + listOf( + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + // Should be ignored - wrong patient ID --> + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678902", + pid = "P2", + fingerprint = "0123456789abcdef2", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-08-08T00:00:00Z") + ), + // <-- + Request( + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P2", + fingerprint = "0123456789abcdee1", + type = RequestType.DELETE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + ) + ) + } + + @Test + fun shouldResultInSortedRequestList() { + setupTestData() + + val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") + + assertThat(actual).hasSize(2) + assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") + assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") + } + + @Test + fun shouldReturnDeleteRequestAsLastRequest() { + setupTestData() + + val actual = requestService.isLastRequestDeletion("TEST_12345678901") + + assertThat(actual).isTrue() + } + + @Test + fun shouldReturnLastMtbFileRequest() { + setupTestData() + + val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") + + assertThat(actual).isNotNull + assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt deleted file mode 100644 index 13b57d0..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import org.testcontainers.containers.PostgreSQLContainer -import org.testcontainers.junit.jupiter.Container - -abstract class AbstractTestcontainerTest { - - companion object { - @Container - val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine") - .withDatabaseName("test") - .withUsername("test") - .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!") - - @DynamicPropertySource - @JvmStatic - fun registerDynamicProperties(registry: DynamicPropertyRegistry) { - registry.add("spring.datasource.url", dbContainer::getJdbcUrl) - registry.add("spring.datasource.username", dbContainer::getUsername) - registry.add("spring.datasource.password", dbContainer::getPassword) - } - } - -} - -class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer(dockerImageName) { - override fun stop() { - // Keep Testcontainer alive until JVM destroys it - } -} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt deleted file mode 100644 index 07a201b..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor - -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.testcontainers.junit.jupiter.Testcontainers - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -class EtlProcessorApplicationTests : AbstractTestcontainerTest() { - - @Test - fun contextLoads() { - } - -} diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt deleted file mode 100644 index d71e011..0000000 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor.services - -import dev.dnpm.etl.processor.AbstractTestcontainerTest -import dev.dnpm.etl.processor.monitoring.Request -import dev.dnpm.etl.processor.monitoring.RequestRepository -import dev.dnpm.etl.processor.monitoring.RequestStatus -import dev.dnpm.etl.processor.monitoring.RequestType -import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.extension.ExtendWith -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.junit.jupiter.SpringExtension -import org.springframework.transaction.annotation.Transactional -import org.testcontainers.junit.jupiter.Testcontainers -import java.time.Instant -import java.util.* - -@Testcontainers -@ExtendWith(SpringExtension::class) -@SpringBootTest -@Transactional -class RequestServiceIntegrationTest : AbstractTestcontainerTest() { - - private lateinit var requestRepository: RequestRepository - - private lateinit var requestService: RequestService - - @BeforeEach - fun setup( - @Autowired requestRepository: RequestRepository - ) { - this.requestRepository = requestRepository - this.requestService = RequestService(requestRepository) - } - - @Test - fun shouldResultInEmptyRequestList() { - val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") - - assertThat(actual).isEmpty() - } - - private fun setupTestData() { - // Prepare DB - this.requestRepository.saveAll( - listOf( - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678901", - pid = "P1", - fingerprint = "0123456789abcdef1", - type = RequestType.MTB_FILE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-07-07T02:00:00Z") - ), - // Should be ignored - wrong patient ID --> - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", - type = RequestType.MTB_FILE, - status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") - ), - // <-- - Request( - uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678901", - pid = "P2", - fingerprint = "0123456789abcdee1", - type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-08-08T02:00:00Z") - ) - ) - ) - } - - @Test - fun shouldResultInSortedRequestList() { - setupTestData() - - val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901") - - assertThat(actual).hasSize(2) - assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1") - assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1") - } - - @Test - fun shouldReturnDeleteRequestAsLastRequest() { - setupTestData() - - val actual = requestService.isLastRequestDeletion("TEST_12345678901") - - assertThat(actual).isTrue() - } - - @Test - fun shouldReturnLastMtbFileRequest() { - setupTestData() - - val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901") - - assertThat(actual).isNotNull - assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1") - } - -} \ No newline at end of file -- cgit v1.2.3 From 422441a3b39806016a952bf7bdff69e0834debca Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 16:46:02 +0200 Subject: Add tests for RequestProcessor --- .../etl/processor/services/RequestProcessor.kt | 20 +- .../etl/processor/services/RequestProcessorTest.kt | 209 +++++++++++++++++++++ 2 files changed, 220 insertions(+), 9 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index e04e568..fcb0863 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -49,15 +49,7 @@ class RequestProcessor( val pid = mtbFile.patient.id val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - val allRequests = requestService.allRequestsByPatientPseudonym(pseudonymized.patient.id) - - val lastMtbFileRequestForPatient = allRequests - .filter { it.type == RequestType.MTB_FILE } - .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - - val isLastRequestDeletion = allRequests.firstOrNull()?.type == RequestType.DELETE - - if (null != lastMtbFileRequestForPatient && lastMtbFileRequestForPatient.fingerprint == fingerprint(mtbFile) && !isLastRequestDeletion) { + if (isDuplication(pseudonymized)) { requestService.save( Request( patientId = pseudonymized.patient.id, @@ -124,6 +116,16 @@ class RequestProcessor( statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } + private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { + val lastMtbFileRequestForPatient = + requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + + return null != lastMtbFileRequestForPatient + && !isLastRequestDeletion + && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile) + } + fun processDeletion(patientId: String) { val requestId = UUID.randomUUID().toString() diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt new file mode 100644 index 0000000..c165cf0 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -0,0 +1,209 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.Mockito.* +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + + +@ExtendWith(MockitoExtension::class) +class RequestProcessorTest { + + private lateinit var pseudonymizeService: PseudonymizeService + private lateinit var sender: MtbFileSender + private lateinit var requestService: RequestService + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var requestProcessor: RequestProcessor + + @BeforeEach + fun setup( + @Mock pseudonymizeService: PseudonymizeService, + @Mock sender: RestMtbFileSender, + @Mock requestService: RequestService, + ) { + this.pseudonymizeService = pseudonymizeService + this.sender = sender + this.requestService = requestService + this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + + val objectMapper = ObjectMapper() + + requestProcessor = RequestProcessor( + pseudonymizeService, + listOf(sender), + requestService, + objectMapper, + statisticsUpdateProducer + ) + } + + @Test + fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "cwaxsvectyfj4qcw4hiwzx5fwwo7lekyagpzd2ayuf36jlvi6msa", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSaveRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "different", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + }.`when`(sender).send(any()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleterequestAndSaveRequestStatus() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + +} \ No newline at end of file -- cgit v1.2.3 From 536ecbbd56d2dad166e995256a6793a675dea167 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 18:52:03 +0200 Subject: Add tests for error response status --- .../etl/processor/services/RequestProcessorTest.kt | 93 +++++++++++++++++++++- 1 file changed, 91 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index c165cf0..12d6e29 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -130,7 +130,7 @@ class RequestProcessorTest { } @Test - fun testShouldSendMtbFileAndSaveRequestStatus() { + fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { doAnswer { Request( id = 1L, @@ -189,7 +189,66 @@ class RequestProcessorTest { } @Test - fun testShouldSendDeleterequestAndSaveRequestStatus() { + fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "different", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + }.`when`(sender).send(any()) + + doAnswer { + it.arguments[0] as MtbFile + }.`when`(pseudonymizeService).pseudonymize(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + + @Test + fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) @@ -206,4 +265,34 @@ class RequestProcessorTest { assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } + @Test + fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + + @Test + fun testShouldSendDeleteRequestWithPseudonymErrorAndSaveErrorRequestStatus() { + doThrow(RuntimeException()).`when`(pseudonymizeService).patientPseudonym(anyString()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val requestCaptor = argumentCaptor() + verify(requestService, times(1)).save(requestCaptor.capture()) + assertThat(requestCaptor.firstValue).isNotNull + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + } + } \ No newline at end of file -- cgit v1.2.3 From 6ad6ee13a1cae8ed286e80b3a46c458e1052480b Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 8 Aug 2023 19:16:59 +0200 Subject: Ignore unknown properties in DataQualityResponse --- .../dnpm/etl/processor/monitoring/ReportService.kt | 3 + .../etl/processor/services/ReportServiceTest.kt | 70 ++++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt index 8c31ede..ae36705 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.monitoring +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonValue import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.JsonMappingException @@ -46,8 +47,10 @@ class ReportService( } + @JsonIgnoreProperties(ignoreUnknown = true) private data class DataQualityReport(val issues: List) + @JsonIgnoreProperties(ignoreUnknown = true) data class Issue(val severity: Severity, val message: String) enum class Severity(@JsonValue val value: String) { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt new file mode 100644 index 0000000..70efe2b --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.ReportService +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class ReportServiceTest { + + private lateinit var reportService: ReportService + + @BeforeEach + fun setup() { + this.reportService = ReportService(ObjectMapper().registerModule(KotlinModule.Builder().build())) + } + + @Test + fun shouldParseDataQualityReport() { + val json = """ + { + "patient": "4711", + "issues": [ + { "severity": "warning", "message": "Warning Message" }, + { "severity": "error", "message": "Error Message" } + ] + } + """.trimIndent() + + val actual = this.reportService.deserialize(json) + + assertThat(actual).hasSize(2) + assertThat(actual[0].severity).isEqualTo(ReportService.Severity.WARNING) + assertThat(actual[0].message).isEqualTo("Warning Message") + assertThat(actual[1].severity).isEqualTo(ReportService.Severity.ERROR) + assertThat(actual[1].message).isEqualTo("Error Message") + } + + @Test + fun shouldReturnSyntheticDataQualityReportOnParserError() { + val invalidResponse = "Invalid Response Data" + + val actual = this.reportService.deserialize(invalidResponse) + + assertThat(actual).hasSize(1) + assertThat(actual[0].severity).isEqualTo(ReportService.Severity.ERROR) + assertThat(actual[0].message).isEqualTo("Not parsable data quality report '$invalidResponse'") + } + +} \ No newline at end of file -- cgit v1.2.3 From 7739afad1fc82f4ffe0debbebae58874f046d82d Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 08:13:27 +0200 Subject: Handle MTB File with rejected consent as deletion request --- .../dnpm/etl/processor/web/MtbFileController.kt | 49 ------- .../etl/processor/web/MtbFileRestController.kt | 55 ++++++++ .../etl/processor/web/MtbFileRestControllerTest.kt | 150 +++++++++++++++++++++ 3 files changed, 205 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt deleted file mode 100644 index cf0e693..0000000 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ /dev/null @@ -1,49 +0,0 @@ -/* - * This file is part of ETL-Processor - * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package dev.dnpm.etl.processor.web - -import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.services.RequestProcessor -import org.slf4j.LoggerFactory -import org.springframework.http.ResponseEntity -import org.springframework.web.bind.annotation.* - -@RestController -class MtbFileController( - private val requestProcessor: RequestProcessor, -) { - - private val logger = LoggerFactory.getLogger(MtbFileController::class.java) - - @PostMapping(path = ["/mtbfile"]) - fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - logger.debug("Accepted MTB File for processing") - requestProcessor.processMtbFile(mtbFile) - return ResponseEntity.accepted().build() - } - - @DeleteMapping(path = ["/mtbfile/{patientId}"]) - fun deleteData(@PathVariable patientId: String): ResponseEntity { - logger.debug("Accepted patient ID to process deletion") - requestProcessor.processDeletion(patientId) - return ResponseEntity.accepted().build() - } - -} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt new file mode 100644 index 0000000..9b441f6 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt @@ -0,0 +1,55 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.web + +import de.ukw.ccc.bwhc.dto.Consent +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.services.RequestProcessor +import org.slf4j.LoggerFactory +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.* + +@RestController +class MtbFileRestController( + private val requestProcessor: RequestProcessor, +) { + + private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java) + + @PostMapping(path = ["/mtbfile"]) + fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { + if (mtbFile.consent.status == Consent.Status.ACTIVE) { + logger.debug("Accepted MTB File for processing") + requestProcessor.processMtbFile(mtbFile) + } else { + logger.debug("Accepted MTB File and process deletion") + requestProcessor.processDeletion(mtbFile.patient.id) + } + return ResponseEntity.accepted().build() + } + + @DeleteMapping(path = ["/mtbfile/{patientId}"]) + fun deleteData(@PathVariable patientId: String): ResponseEntity { + logger.debug("Accepted patient ID to process deletion") + requestProcessor.processDeletion(patientId) + return ResponseEntity.accepted().build() + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt new file mode 100644 index 0000000..2fde35a --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt @@ -0,0 +1,150 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.web + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.services.RequestProcessor +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.Mockito.times +import org.mockito.Mockito.verify +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.springframework.http.MediaType +import org.springframework.test.web.servlet.MockMvc +import org.springframework.test.web.servlet.delete +import org.springframework.test.web.servlet.post +import org.springframework.test.web.servlet.setup.MockMvcBuilders + +@ExtendWith(MockitoExtension::class) +class MtbFileRestControllerTest { + + private lateinit var mockMvc: MockMvc + + private lateinit var requestProcessor: RequestProcessor + + private val objectMapper = ObjectMapper() + + @BeforeEach + fun setup( + @Mock requestProcessor: RequestProcessor + ) { + this.requestProcessor = requestProcessor + val controller = MtbFileRestController(requestProcessor) + this.mockMvc = MockMvcBuilders.standaloneSetup(controller).build() + } + + @Test + fun shouldProcessMtbFilePostRequest() { + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("TEST_12345678") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("TEST_12345678") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("TEST_12345678") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + mockMvc.post("/mtbfile") { + content = objectMapper.writeValueAsString(mtbFile) + contentType = MediaType.APPLICATION_JSON + }.andExpect { + status { + isAccepted() + } + } + + verify(requestProcessor, times(1)).processMtbFile(any()) + } + + @Test + fun shouldProcessMtbFilePostRequestWithRejectedConsent() { + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("TEST_12345678") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.REJECTED) + .withPatient("TEST_12345678") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("TEST_12345678") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + mockMvc.post("/mtbfile") { + content = objectMapper.writeValueAsString(mtbFile) + contentType = MediaType.APPLICATION_JSON + }.andExpect { + status { + isAccepted() + } + } + + val captor = argumentCaptor() + verify(requestProcessor, times(1)).processDeletion(captor.capture()) + assertThat(captor.firstValue).isEqualTo("TEST_12345678") + } + + @Test + fun shouldProcessMtbFileDeleteRequest() { + mockMvc.delete("/mtbfile/TEST_12345678").andExpect { + status { + isAccepted() + } + } + + val captor = argumentCaptor() + verify(requestProcessor, times(1)).processDeletion(captor.capture()) + assertThat(captor.firstValue).isEqualTo("TEST_12345678") + } + +} \ No newline at end of file -- cgit v1.2.3 From 13bfa0018d6c9b48893ef96945659be9e7eec6c0 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:20:20 +0200 Subject: Change endpoint configuration to select single endpoint * If REST endpoint is configured, it will be used * If Kafka endpoint is configured, it will be used * If both endpoints are configured, REST configuration has precedence and will be used --- .../etl/processor/config/AppConfigurationTest.kt | 102 +++++++++++++++++++++ .../dnpm/etl/processor/config/AppConfiguration.kt | 12 +-- .../etl/processor/config/AppKafkaConfiguration.kt | 8 ++ .../etl/processor/config/AppRestConfiguration.kt | 52 +++++++++++ 4 files changed, 163 insertions(+), 11 deletions(-) create mode 100644 src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt new file mode 100644 index 0000000..8bdaa60 --- /dev/null +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt @@ -0,0 +1,102 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.springframework.beans.factory.NoSuchBeanDefinitionException +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.boot.test.mock.mockito.MockBeans +import org.springframework.context.ApplicationContext +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource + +@SpringBootTest +@ContextConfiguration(classes = [KafkaAutoConfiguration::class, AppKafkaConfiguration::class, AppRestConfiguration::class]) +class AppConfigurationTest { + + @Nested + @TestPropertySource( + properties = [ + "app.rest.uri=http://localhost:9000" + ] + ) + inner class AppConfigurationRestTest(private val context: ApplicationContext) { + + @Test + fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() { + assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull + assertThrows { context.getBean(KafkaMtbFileSender::class.java) } + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.topic=test", + "app.kafka.response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + @MockBeans(value = [ + MockBean(ObjectMapper::class), + MockBean(RequestRepository::class) + ]) + inner class AppConfigurationKafkaTest(private val context: ApplicationContext) { + + @Test + fun shouldUseKafkaMtbFileSenderNotRestMtbFileSender() { + assertThrows { context.getBean(RestMtbFileSender::class.java) } + assertThat(context.getBean(KafkaMtbFileSender::class.java)).isNotNull + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.rest.uri=http://localhost:9000", + "app.kafka.servers=localhost:9092", + "app.kafka.topic=test", + "app.kafka.response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationRestInPrecedenceTest(private val context: ApplicationContext) { + + @Test + fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() { + assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull + assertThrows { context.getBean(KafkaMtbFileSender::class.java) } + } + + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index cbba1f1..6b15fc0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,8 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.MtbFileSender -import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator @@ -38,9 +36,7 @@ import reactor.core.publisher.Sinks value = [ AppConfigProperties::class, PseudonymizeConfigProperties::class, - GPasConfigProperties::class, - RestTargetProperties::class, - KafkaTargetProperties::class + GPasConfigProperties::class ] ) class AppConfiguration { @@ -65,12 +61,6 @@ class AppConfiguration { return PseudonymizeService(generator, pseudonymizeConfigProperties) } - @ConditionalOnProperty(value = ["app.rest.uri"]) - @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { - return RestMtbFileSender(restTargetProperties) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 7adcb02..6d0254e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -24,10 +24,13 @@ import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.ContainerProperties @@ -38,14 +41,19 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer value = [KafkaTargetProperties::class] ) @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-5) class AppKafkaConfiguration { + private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java) + @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { + logger.info("Selected 'KafkaMtbFileSender'") return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt new file mode 100644 index 0000000..5e77a4f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -0,0 +1,52 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.output.RestMtbFileSender +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.annotation.Order + +@Configuration +@EnableConfigurationProperties( + value = [ + RestTargetProperties::class + ] +) +@ConditionalOnProperty(value = ["app.rest.uri"]) +@ConditionalOnMissingBean(MtbFileSender::class) +@Order(-10) +class AppRestConfiguration { + + private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) + + @Bean + fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + logger.info("Selected 'RestMtbFileSender'") + return RestMtbFileSender(restTargetProperties) + } + +} + -- cgit v1.2.3 From 47830ed9f7774c84674e9399cd347d12424f4f42 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 10:34:23 +0200 Subject: Use single MtbFileSender --- .../etl/processor/EtlProcessorApplicationTests.kt | 5 +- .../services/RequestServiceIntegrationTest.kt | 3 + .../etl/processor/services/RequestProcessor.kt | 110 +++++++++------------ .../etl/processor/services/RequestProcessorTest.kt | 2 +- 4 files changed, 57 insertions(+), 63 deletions(-) (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt index 07a201b..6c5b150 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -19,19 +19,22 @@ package dev.dnpm.etl.processor +import dev.dnpm.etl.processor.output.MtbFileSender import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.context.junit.jupiter.SpringExtension import org.testcontainers.junit.jupiter.Testcontainers @Testcontainers @ExtendWith(SpringExtension::class) @SpringBootTest +@MockBean(MtbFileSender::class) class EtlProcessorApplicationTests : AbstractTestcontainerTest() { @Test - fun contextLoads() { + fun contextLoadsIfMtbFileSenderConfigured() { } } diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt index d71e011..3af218e 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -24,12 +24,14 @@ import dev.dnpm.etl.processor.monitoring.Request import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType +import dev.dnpm.etl.processor.output.MtbFileSender import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.test.context.junit.jupiter.SpringExtension import org.springframework.transaction.annotation.Transactional import org.testcontainers.junit.jupiter.Testcontainers @@ -40,6 +42,7 @@ import java.util.* @ExtendWith(SpringExtension::class) @SpringBootTest @Transactional +@MockBean(MtbFileSender::class) class RequestServiceIntegrationTest : AbstractTestcontainerTest() { private lateinit var requestRepository: RequestRepository diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index fcb0863..936c1bf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -37,7 +37,7 @@ import java.util.* @Service class RequestProcessor( private val pseudonymizeService: PseudonymizeService, - private val senders: List, + private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, private val statisticsUpdateProducer: Sinks.Many @@ -66,32 +66,26 @@ class RequestProcessor( val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) - val responses = senders.map { - val responseStatus = it.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } - responseStatus + val responseStatus = sender.send(request) + if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { + logger.info( + "Sent file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) + } else { + logger.error( + "Error sending file for Patient '{}' using '{}'", + pseudonymized.patient.id, + sender.javaClass.simpleName + ) } - val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { - RequestStatus.WARNING - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -104,9 +98,7 @@ class RequestProcessor( type = RequestType.MTB_FILE, report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", - responses.joinToString("\n") { it.reason }) - + RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null } @@ -132,42 +124,38 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responses = senders.map { - val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + when (responseStatus.status) { + MtbFileSender.ResponseStatus.SUCCESS -> { + logger.info( + "Sent delete for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + MtbFileSender.ResponseStatus.ERROR -> { + logger.error( + "Error deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) + } - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } + else -> { + logger.error( + "Unknown result on deleting data for Patient '{}' using '{}'", + patientPseudonym, + sender.javaClass.simpleName + ) } - responseStatus } - val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN + val requestStatus = when (responseStatus.status) { + MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR + MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING + MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS + else -> RequestStatus.UNKNOWN } requestService.save( @@ -176,9 +164,9 @@ class RequestProcessor( patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = overallRequestStatus, + status = requestStatus, type = RequestType.DELETE, - report = when (overallRequestStatus) { + report = when (requestStatus) { RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") RequestStatus.UNKNOWN -> Report("Keine Informationen") else -> null diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 12d6e29..6e97343 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -67,7 +67,7 @@ class RequestProcessorTest { requestProcessor = RequestProcessor( pseudonymizeService, - listOf(sender), + sender, requestService, objectMapper, statisticsUpdateProducer -- cgit v1.2.3 From 7f048e2483138deecc28208af42546097ef929d7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 12:26:57 +0200 Subject: Do not append custom prefix to gPAS pseudonym --- .../etl/processor/pseudonym/PseudonymizeService.kt | 36 +-------- .../dev/dnpm/etl/processor/pseudonym/extensions.kt | 50 +++++++++++++ .../etl/processor/services/RequestProcessor.kt | 14 ++-- .../processor/pseudonym/PseudonymizeServiceTest.kt | 86 ++++++++++++++++++++++ .../etl/processor/services/RequestProcessorTest.kt | 14 ++-- 5 files changed, 155 insertions(+), 45 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 1a79850..ab8ce2f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.pseudonym -import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties class PseudonymizeService( @@ -27,38 +26,11 @@ class PseudonymizeService( private val configProperties: PseudonymizeConfigProperties ) { - fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym = patientPseudonym(mtbFile.patient.id) - - mtbFile.episode.patient = patientPseudonym - mtbFile.carePlans.forEach { it.patient = patientPseudonym } - mtbFile.patient.id = patientPseudonym - mtbFile.claims.forEach { it.patient = patientPseudonym } - mtbFile.consent.patient = patientPseudonym - mtbFile.claimResponses.forEach { it.patient = patientPseudonym } - mtbFile.diagnoses.forEach { it.patient = patientPseudonym } - mtbFile.ecogStatus.forEach { it.patient = patientPseudonym } - mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } - mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } - mtbFile.histologyReports.forEach { it.patient = patientPseudonym } - mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } - mtbFile.ngsReports.forEach { it.patient = patientPseudonym } - mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } - mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.recommendations.forEach { it.patient = patientPseudonym } - mtbFile.responses.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - mtbFile.specimens.forEach { it.patient = patientPseudonym } - - return mtbFile - } - fun patientPseudonym(patientId: String): String { - return "${configProperties.prefix}_${generator.generate(patientId)}" + return when (generator) { + is GpasPseudonymGenerator -> generator.generate(patientId) + else -> "${configProperties.prefix}_${generator.generate(patientId)}" + } } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt new file mode 100644 index 0000000..580785d --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -0,0 +1,50 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.MtbFile + +infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { + val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id) + + this.episode.patient = patientPseudonym + this.carePlans.forEach { it.patient = patientPseudonym } + this.patient.id = patientPseudonym + this.claims.forEach { it.patient = patientPseudonym } + this.consent.patient = patientPseudonym + this.claimResponses.forEach { it.patient = patientPseudonym } + this.diagnoses.forEach { it.patient = patientPseudonym } + this.ecogStatus.forEach { it.patient = patientPseudonym } + this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym } + this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym } + this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym } + this.histologyReports.forEach { it.patient = patientPseudonym } + this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } + this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.ngsReports.forEach { it.patient = patientPseudonym } + this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } + this.rebiopsyRequests.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.recommendations.forEach { it.patient = patientPseudonym } + this.responses.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } + this.specimens.forEach { it.patient = patientPseudonym } +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 936c1bf..6465e82 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -27,6 +27,7 @@ import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory @@ -47,12 +48,13 @@ class RequestProcessor( fun processMtbFile(mtbFile: MtbFile) { val pid = mtbFile.patient.id - val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - if (isDuplication(pseudonymized)) { + mtbFile pseudonymizeWith pseudonymizeService + + if (isDuplication(mtbFile)) { requestService.save( Request( - patientId = pseudonymized.patient.id, + patientId = mtbFile.patient.id, pid = pid, fingerprint = fingerprint(mtbFile), status = RequestStatus.DUPLICATION, @@ -64,19 +66,19 @@ class RequestProcessor( return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) + val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) val responseStatus = sender.send(request) if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { logger.info( "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } else { logger.error( "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, + mtbFile.patient.id, sender.javaClass.simpleName ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt new file mode 100644 index 0000000..a30a328 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt @@ -0,0 +1,86 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.pseudonym + +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.doAnswer +import org.mockito.kotlin.whenever + +@ExtendWith(MockitoExtension::class) +class PseudonymizeServiceTest { + + private val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("123") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("123") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + @Test + fun shouldNotUsePseudonymPrefixForGpas(@Mock generator: GpasPseudonymGenerator) { + doAnswer { + it.arguments[0] + }.whenever(generator).generate(anyString()) + + val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties()) + + mtbFile.pseudonymizeWith(pseudonymizeService) + + assertThat(mtbFile.patient.id).isEqualTo("123") + } + + @Test + fun shouldUsePseudonymPrefixForBuiltin(@Mock generator: AnonymizingGenerator) { + doAnswer { + it.arguments[0] + }.whenever(generator).generate(anyString()) + + val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties()) + + mtbFile.pseudonymizeWith(pseudonymizeService) + + assertThat(mtbFile.patient.id).isEqualTo("UNKNOWN_123") + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 6e97343..8552bbb 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -82,7 +82,7 @@ class RequestProcessorTest { uuid = UUID.randomUUID().toString(), patientId = "TEST_12345678901", pid = "P1", - fingerprint = "cwaxsvectyfj4qcw4hiwzx5fwwo7lekyagpzd2ayuf36jlvi6msa", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", type = RequestType.MTB_FILE, status = RequestStatus.SUCCESS, processedAt = Instant.parse("2023-08-08T02:00:00Z") @@ -94,8 +94,8 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( @@ -153,8 +153,8 @@ class RequestProcessorTest { }.`when`(sender).send(any()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( @@ -212,8 +212,8 @@ class RequestProcessorTest { }.`when`(sender).send(any()) doAnswer { - it.arguments[0] as MtbFile - }.`when`(pseudonymizeService).pseudonymize(any()) + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) val mtbFile = MtbFile.builder() .withPatient( -- cgit v1.2.3 From 1a640ff9dff1cc182c4ffc1d00dff370e42a25de Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:15:20 +0200 Subject: Decouple request and response processing --- .../etl/processor/config/AppKafkaConfiguration.kt | 6 +- .../etl/processor/output/KafkaMtbFileSender.kt | 13 +- .../dev/dnpm/etl/processor/output/MtbFileSender.kt | 21 ++- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 16 +-- .../etl/processor/services/RequestProcessor.kt | 135 +++++++------------- .../etl/processor/services/ResponseProcessor.kt | 96 ++++++++++++++ .../services/kafka/KafkaResponseProcessor.kt | 85 ++++++------ .../etl/processor/services/RequestProcessorTest.kt | 128 +++++++++++++++---- .../processor/services/ResponseProcessorTest.kt | 142 +++++++++++++++++++++ .../services/kafka/KafkaResponseProcessorTest.kt | 119 +++++++++++++++++ 10 files changed, 573 insertions(+), 188 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 6d0254e..309ff2d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order @@ -70,10 +70,10 @@ class AppKafkaConfiguration { @Bean fun kafkaResponseProcessor( - requestRepository: RequestRepository, + applicationEventPublisher: ApplicationEventPublisher, objectMapper: ObjectMapper ): KafkaResponseProcessor { - return KafkaResponseProcessor(requestRepository, objectMapper) + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index d903745..9448e29 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -43,13 +44,13 @@ class KafkaMtbFileSender( ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } @@ -72,13 +73,13 @@ class KafkaMtbFileSender( if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..24cdc49 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,13 +20,13 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { @@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 6465e82..d2f8619 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service @@ -41,73 +42,54 @@ class RequestProcessor( private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many + private val applicationEventPublisher: ApplicationEventPublisher ) { private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) + + requestService.save( + Request( + uuid = requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE + ) + ) + if (isDuplication(mtbFile)) { - requestService.save( - Request( - patientId = mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) - val responseStatus = sender.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { @@ -126,55 +108,31 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = requestStatus, - type = RequestType.DELETE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE + ) + ) + + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } catch (e: Exception) { requestService.save( Request( @@ -188,7 +146,6 @@ class RequestProcessor( ) ) } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..d7ad86f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,96 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many, + private val objectMapper: ObjectMapper +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response code!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional = Optional.empty() +) \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 1e9263d..ef880f4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.Report -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.kafka.listener.MessageListener import java.time.Instant +import java.util.* class KafkaResponseProcessor( - private val requestRepository: RequestRepository, + private val eventPublisher: ApplicationEventPublisher, private val objectMapper: ObjectMapper ) : MessageListener { @@ -39,55 +41,44 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) - requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + } catch (e: Exception) { + Optional.empty() + }.ifPresentOrElse({ responseKey -> + val event = try { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - when (responseBody.statusCode) { - 200 -> { - it.status = RequestStatus.SUCCESS - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - requestRepository.save(it) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - 201 -> { - it.status = RequestStatus.WARNING - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) + else -> { + logger.error("Kafka response: Unknown response code!") + Optional.empty() + } } - - 400, 422 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - in 900..999 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - else -> { - logger.error("Cannot process Kafka response: Unknown response code!") - } - } + ) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + RequestStatus.ERROR, + Optional.of("Cannot process Kafka response") + ) } - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - } + eventPublisher.publishEvent(event) + }, { + logger.error("No response key in Kafka response") + }) } data class ResponseKey(val requestId: String) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 8552bbb..f9d8182 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -37,7 +37,7 @@ import org.mockito.Mockito.* import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor -import reactor.core.publisher.Sinks +import org.springframework.context.ApplicationEventPublisher import java.time.Instant import java.util.* @@ -48,7 +48,7 @@ class RequestProcessorTest { private lateinit var pseudonymizeService: PseudonymizeService private lateinit var sender: MtbFileSender private lateinit var requestService: RequestService - private lateinit var statisticsUpdateProducer: Sinks.Many + private lateinit var applicationEventPublisher: ApplicationEventPublisher private lateinit var requestProcessor: RequestProcessor @@ -57,11 +57,12 @@ class RequestProcessorTest { @Mock pseudonymizeService: PseudonymizeService, @Mock sender: RestMtbFileSender, @Mock requestService: RequestService, + @Mock applicationEventPublisher: ApplicationEventPublisher ) { this.pseudonymizeService = pseudonymizeService this.sender = sender this.requestService = requestService - this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + this.applicationEventPublisher = applicationEventPublisher val objectMapper = ObjectMapper() @@ -70,12 +71,12 @@ class RequestProcessorTest { sender, requestService, objectMapper, - statisticsUpdateProducer + applicationEventPublisher ) } @Test - fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() { doAnswer { Request( id = 1L, @@ -126,11 +127,66 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { + fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSendSuccessEvent() { doAnswer { Request( id = 1L, @@ -149,7 +205,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) doAnswer { @@ -182,14 +238,14 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } @Test - fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + fun testShouldSendMtbFileAndSendErrorEvent() { doAnswer { Request( id = 1L, @@ -208,7 +264,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.ERROR) }.`when`(sender).send(any()) doAnswer { @@ -241,20 +297,20 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test - fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { + fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.UNKNOWN) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") @@ -262,25 +318,43 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + fun testShouldSendDeleteRequestAndSendSuccessEvent() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleteRequestAndSendErrorEvent() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = RequestStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt new file mode 100644 index 0000000..cfb1111 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -0,0 +1,142 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class ResponseProcessorTest { + + private lateinit var requestRepository: RequestRepository + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var responseProcessor: ResponseProcessor + + private val testRequest = Request( + 1L, + "TestID1234", + "PSEUDONYM-A", + "1", + "dummyfingerprint", + RequestType.MTB_FILE, + RequestStatus.UNKNOWN + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository, + @Mock statisticsUpdateProducer: Sinks.Many + ) { + val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.requestRepository = requestRepository + this.statisticsUpdateProducer = statisticsUpdateProducer + + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + } + + @Test + fun shouldNotSaveStatusForUnknownRequest() { + doAnswer { + Optional.empty() + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.SUCCESS + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @Test + fun shouldNotSaveStatusWithUnknownState() { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.UNKNOWN + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @ParameterizedTest + @MethodSource("requestStatusSource") + fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + requestStatus + ) + + this.responseProcessor.handleResponseEvent(event) + + val captor = argumentCaptor() + verify(requestRepository, times(1)).save(captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue.status).isEqualTo(requestStatus) + } + + companion object { + + @JvmStatic + fun requestStatusSource(): Set { + return setOf( + RequestStatus.SUCCESS, + RequestStatus.WARNING, + RequestStatus.ERROR, + RequestStatus.DUPLICATION + ) + } + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt new file mode 100644 index 0000000..0f524ca --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -0,0 +1,119 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.services.ResponseEvent +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.springframework.context.ApplicationEventPublisher +import org.springframework.http.HttpStatus + +@ExtendWith(MockitoExtension::class) +class KafkaResponseProcessorTest { + + private lateinit var eventPublisher: ApplicationEventPublisher + private lateinit var objectMapper: ObjectMapper + + private lateinit var kafkaResponseProcessor: KafkaResponseProcessor + + private fun createkafkaRecord( + requestId: String? = null, + statusCode: Int = 200, + statusBody: Map? = mapOf() + ): ConsumerRecord { + return ConsumerRecord( + "test-topic", + 0, + 0, + if (requestId == null) { + null + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) + }, + if (statusBody == null) { + "" + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + } + ) + } + + @BeforeEach + fun setup( + @Mock eventPublisher: ApplicationEventPublisher + ) { + this.eventPublisher = eventPublisher + this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper) + } + + @Test + fun shouldNotProcessRecordsWithoutValidKey() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldNotProcessRecordsWithoutValidBody() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @ParameterizedTest + @MethodSource("statusCodeSource") + fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { + this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + verify(eventPublisher, times(1)).publishEvent(any()) + } + + companion object { + + @JvmStatic + fun statusCodeSource(): Set { + return setOf( + HttpStatus.OK, + HttpStatus.CREATED, + HttpStatus.BAD_REQUEST, + HttpStatus.NOT_FOUND, + HttpStatus.UNPROCESSABLE_ENTITY, + HttpStatus.INTERNAL_SERVER_ERROR + ) + .map { it.value() } + .toSet() + } + + } + +} \ No newline at end of file -- cgit v1.2.3 From 2b42a4d262a846feb1f82facbb151be9cabb57b4 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 12:11:39 +0200 Subject: Tests for RestMtbFileSender --- .../etl/processor/config/AppRestConfiguration.kt | 10 +- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 7 +- .../etl/processor/output/RestMtbFileSenderTest.kt | 159 +++++++++++++++++++++ 3 files changed, 171 insertions(+), 5 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index 5e77a4f..a830597 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -28,6 +28,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order +import org.springframework.web.client.RestTemplate @Configuration @EnableConfigurationProperties( @@ -43,9 +44,14 @@ class AppRestConfiguration { private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java) @Bean - fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender { + fun restTemplate(): RestTemplate { + return RestTemplate() + } + + @Bean + fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender { logger.info("Selected 'RestMtbFileSender'") - return RestMtbFileSender(restTargetProperties) + return RestMtbFileSender(restTemplate, restTargetProperties) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 24cdc49..f80ff69 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -28,12 +28,13 @@ import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { +class RestMtbFileSender( + private val restTemplate: RestTemplate, + private val restTargetProperties: RestTargetProperties +) : MtbFileSender { private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java) - private val restTemplate = RestTemplate() - override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { try { val headers = HttpHeaders() diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt new file mode 100644 index 0000000..17d420a --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -0,0 +1,159 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.output + +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.springframework.http.HttpMethod +import org.springframework.http.HttpStatus +import org.springframework.test.web.client.MockRestServiceServer +import org.springframework.test.web.client.match.MockRestRequestMatchers.method +import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo +import org.springframework.test.web.client.response.MockRestResponseCreators.withStatus +import org.springframework.web.client.RestTemplate + +class RestMtbFileSenderTest { + + private lateinit var mockRestServiceServer: MockRestServiceServer + + private lateinit var restMtbFileSender: RestMtbFileSender + + @BeforeEach + fun setup() { + val restTemplate = RestTemplate() + val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile") + + this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate) + + this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties) + } + + @ParameterizedTest + @MethodSource("deleteRequestWithResponseSource") + fun shouldReturnExpectedResponseForDelete(requestWithResponse: RequestWithResponse) { + this.mockRestServiceServer.expect { + method(HttpMethod.DELETE) + requestTo("/mtbfile") + }.andRespond { + withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it) + } + + val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + } + + @ParameterizedTest + @MethodSource("mtbFileRequestWithResponseSource") + fun shouldReturnExpectedResponseForMtbFilePost(requestWithResponse: RequestWithResponse) { + this.mockRestServiceServer.expect { + method(HttpMethod.POST) + requestTo("/mtbfile") + }.andRespond { + withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it) + } + + val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile)) + assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + } + + companion object { + data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus) + + private val warningBody = """ + { + "patient_id": "PID", + "issues": [ + { "severity": "warning", "message": "Something is not right" } + ] + } + """.trimIndent() + + private val errorBody = """ + { + "patient_id": "PID", + "issues": [ + { "severity": "error", "message": "Something is very bad" } + ] + } + """.trimIndent() + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("PID") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("PID") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("PID") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + /** + * Synthetic http responses with related request status + * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API + */ + @JvmStatic + fun mtbFileRequestWithResponseSource(): Set { + return setOf( + RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS), + RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING), + RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR), + // Some more errors not mentioned in documentation + RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + ) + } + + /** + * Synthetic http responses with related request status + * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API + */ + @JvmStatic + fun deleteRequestWithResponseSource(): Set { + return setOf( + RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS), + // Some more errors not mentioned in documentation + RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), + RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + ) + } + } + + +} \ No newline at end of file -- cgit v1.2.3 From 002b0618cf813d48bbff2d287e16f607a4c73d73 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 10 Aug 2023 13:35:35 +0200 Subject: Add tests for KafkaMtbFileSender --- .../etl/processor/output/KafkaMtbFileSender.kt | 4 +- .../etl/processor/output/KafkaMtbFileSenderTest.kt | 169 +++++++++++++++++++++ 2 files changed, 171 insertions(+), 2 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 9448e29..e7f9769 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -50,7 +50,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } @@ -79,7 +79,7 @@ class KafkaMtbFileSender( } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(RequestStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.ERROR) } } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt new file mode 100644 index 0000000..14bdd5d --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -0,0 +1,169 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.output + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.* +import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.SendResult +import java.util.concurrent.CompletableFuture.completedFuture +import java.util.concurrent.ExecutionException + +@ExtendWith(MockitoExtension::class) +class KafkaMtbFileSenderTest { + + private lateinit var kafkaTemplate: KafkaTemplate + + private lateinit var kafkaMtbFileSender: KafkaMtbFileSender + + private lateinit var objectMapper: ObjectMapper + + @BeforeEach + fun setup( + @Mock kafkaTemplate: KafkaTemplate + ) { + val kafkaTargetProperties = KafkaTargetProperties("testtopic") + this.objectMapper = ObjectMapper() + this.kafkaTemplate = kafkaTemplate + + this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) + } + + @ParameterizedTest + @MethodSource("requestWithResponseSource") + fun shouldSendMtbFileRequestAndReturnExpectedState(testData: TestData) { + doAnswer { + if (null != testData.exception) { + throw testData.exception + } + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + val response = kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE))) + assertThat(response.status).isEqualTo(testData.requestStatus) + } + + @ParameterizedTest + @MethodSource("requestWithResponseSource") + fun shouldSendDeleteRequestAndReturnExpectedState(testData: TestData) { + doAnswer { + if (null != testData.exception) { + throw testData.exception + } + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + val response = kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + assertThat(response.status).isEqualTo(testData.requestStatus) + } + + @Test + fun shouldSendMtbFileRequestWithCorrectKeyAndBody() { + doAnswer { + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE))) + + val captor = argumentCaptor() + verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") + assertThat(captor.secondValue).isNotNull + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) + } + + @Test + fun shouldSendDeleteRequestWithCorrectKeyAndBody() { + doAnswer { + completedFuture(SendResult(null, null)) + }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString()) + + kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) + + val captor = argumentCaptor() + verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") + assertThat(captor.secondValue).isNotNull + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) + } + + companion object { + fun mtbFile(consentStatus: Consent.Status): MtbFile { + return if (consentStatus == Consent.Status.ACTIVE) { + MtbFile.builder() + .withPatient( + Patient.builder() + .withId("PID") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(consentStatus) + .withPatient("PID") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("PID") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + } else { + MtbFile.builder() + .withConsent( + Consent.builder() + .withStatus(consentStatus) + .withPatient("PID") + .build() + ) + }.build() + } + + data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) + + @JvmStatic + fun requestWithResponseSource(): Set { + return setOf( + TestData(RequestStatus.UNKNOWN), + TestData(RequestStatus.ERROR, InterruptedException("Test interrupted")), + TestData(RequestStatus.ERROR, ExecutionException(RuntimeException("Test execution aborted"))) + ) + } + } + +} \ No newline at end of file -- cgit v1.2.3 From cb9c5904729c90b86357d0668604b74f4f4e61f7 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:13:45 +0200 Subject: Issue #2: Do not serialize JSON string as custom string (#4) In addition to that, if REST request did not contain a response body, use empty string as data quality report string.--- .../dnpm/etl/processor/output/RestMtbFileSender.kt | 2 +- .../etl/processor/services/ResponseProcessor.kt | 4 +- .../etl/processor/output/RestMtbFileSenderTest.kt | 60 +++++++++++++++++----- 3 files changed, 51 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index f80ff69..1c59f5c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -50,7 +50,7 @@ class RestMtbFileSender( return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(response.statusCode.asRequestStatus()) + return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index d7ad86f..f2e9e2e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -55,14 +55,14 @@ class ResponseProcessor( RequestStatus.WARNING -> { it.report = Report( "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } RequestStatus.ERROR -> { it.report = Report( "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(event.body) + event.body.orElse("") ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt index 17d420a..78b5a45 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -61,7 +61,8 @@ class RestMtbFileSenderTest { } val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID")) - assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + assertThat(response.status).isEqualTo(requestWithResponse.response.status) + assertThat(response.body).isEqualTo(requestWithResponse.response.body) } @ParameterizedTest @@ -75,11 +76,16 @@ class RestMtbFileSenderTest { } val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile)) - assertThat(response.status).isEqualTo(requestWithResponse.requestStatus) + assertThat(response.status).isEqualTo(requestWithResponse.response.status) + assertThat(response.body).isEqualTo(requestWithResponse.response.body) } companion object { - data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus) + data class RequestWithResponse( + val httpStatus: HttpStatus, + val body: String, + val response: MtbFileSender.Response + ) private val warningBody = """ { @@ -123,6 +129,8 @@ class RestMtbFileSenderTest { ) .build() + private val errorResponseBody = "Sonstiger Fehler bei der Übertragung" + /** * Synthetic http responses with related request status * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API @@ -130,13 +138,33 @@ class RestMtbFileSenderTest { @JvmStatic fun mtbFileRequestWithResponseSource(): Set { return setOf( - RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS), - RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING), - RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR), + RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")), + RequestWithResponse( + HttpStatus.CREATED, + warningBody, + MtbFileSender.Response(RequestStatus.WARNING, warningBody) + ), + RequestWithResponse( + HttpStatus.BAD_REQUEST, + "??", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.UNPROCESSABLE_ENTITY, + errorBody, + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), // Some more errors not mentioned in documentation - RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + RequestWithResponse( + HttpStatus.NOT_FOUND, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.INTERNAL_SERVER_ERROR, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ) ) } @@ -147,10 +175,18 @@ class RestMtbFileSenderTest { @JvmStatic fun deleteRequestWithResponseSource(): Set { return setOf( - RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS), + RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)), // Some more errors not mentioned in documentation - RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR), - RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR) + RequestWithResponse( + HttpStatus.NOT_FOUND, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ), + RequestWithResponse( + HttpStatus.INTERNAL_SERVER_ERROR, + "what????", + MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + ) ) } } -- cgit v1.2.3 From 6ecb439007b4fa6dec9af1e0334b89fd235a97be Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 11 Aug 2023 09:22:54 +0200 Subject: Issue #3: Detect the request type of request with last known status (#5) --- .../services/RequestServiceIntegrationTest.kt | 2 +- .../etl/processor/services/RequestProcessor.kt | 2 +- .../dnpm/etl/processor/services/RequestService.kt | 7 +-- .../etl/processor/services/RequestProcessorTest.kt | 8 +-- .../etl/processor/services/RequestServiceTest.kt | 58 +++++++++++++++------- 5 files changed, 49 insertions(+), 28 deletions(-) (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt index 3af218e..ff85296 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt @@ -116,7 +116,7 @@ class RequestServiceIntegrationTest : AbstractTestcontainerTest() { fun shouldReturnDeleteRequestAsLastRequest() { setupTestData() - val actual = requestService.isLastRequestDeletion("TEST_12345678901") + val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901") assertThat(actual).isTrue() } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index d2f8619..34156f7 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -95,7 +95,7 @@ class RequestProcessor( private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { val lastMtbFileRequestForPatient = requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id) - val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id) + val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id) return null != lastMtbFileRequestForPatient && !isLastRequestDeletion diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt index 0f69910..e0043d2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt @@ -38,8 +38,8 @@ class RequestService( fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) = Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym)) - fun isLastRequestDeletion(patientPseudonym: String) = - Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym)) + fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) = + Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym)) companion object { @@ -48,7 +48,8 @@ class RequestService( .sortedByDescending { it.processedAt } .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - fun isLastRequestDeletion(allRequests: List) = allRequests + fun isLastRequestWithKnownStatusDeletion(allRequests: List) = allRequests + .filter { it.status != RequestStatus.UNKNOWN } .maxByOrNull { it.processedAt }?.type == RequestType.DELETE } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index f9d8182..7856833 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -92,7 +92,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { it.arguments[0] as String @@ -147,7 +147,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { it.arguments[0] as String @@ -202,7 +202,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { MtbFileSender.Response(status = RequestStatus.SUCCESS) @@ -261,7 +261,7 @@ class RequestProcessorTest { doAnswer { false - }.`when`(requestService).isLastRequestDeletion(anyString()) + }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString()) doAnswer { MtbFileSender.Response(status = RequestStatus.ERROR) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt index 3e0a979..3cf8804 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt @@ -68,23 +68,33 @@ class RequestServiceTest { patientId = "TEST_12345678901", pid = "P1", fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T00:00:00Z") + ), + Request( + id = 2L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdefd", type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-08-08T02:00:00Z") + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T02:00:00Z") ), Request( - id = 1L, + id = 3L, uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", type = RequestType.MTB_FILE, - status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") + status = RequestStatus.UNKNOWN, + processedAt = Instant.parse("2023-08-11T00:00:00Z") ) ) - val actual = RequestService.isLastRequestDeletion(requests) + val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests) assertThat(actual).isTrue() } @@ -98,23 +108,33 @@ class RequestServiceTest { patientId = "TEST_12345678901", pid = "P1", fingerprint = "0123456789abcdef1", - type = RequestType.DELETE, - status = RequestStatus.SUCCESS, - processedAt = Instant.parse("2023-07-07T02:00:00Z") + type = RequestType.MTB_FILE, + status = RequestStatus.WARNING, + processedAt = Instant.parse("2023-07-07T00:00:00Z") ), Request( - id = 1L, + id = 2L, uuid = UUID.randomUUID().toString(), - patientId = "TEST_12345678902", - pid = "P2", - fingerprint = "0123456789abcdef2", + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", type = RequestType.MTB_FILE, status = RequestStatus.WARNING, - processedAt = Instant.parse("2023-08-08T00:00:00Z") + processedAt = Instant.parse("2023-07-07T02:00:00Z") + ), + Request( + id = 3L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "0123456789abcdef1", + type = RequestType.MTB_FILE, + status = RequestStatus.UNKNOWN, + processedAt = Instant.parse("2023-08-11T00:00:00Z") ) ) - val actual = RequestService.isLastRequestDeletion(requests) + val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests) assertThat(actual).isFalse() } @@ -197,7 +217,7 @@ class RequestServiceTest { @Test fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() { - requestService.isLastRequestDeletion("TEST_12345678901") + requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901") verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString()) } -- cgit v1.2.3 From 72295202ec37a76b90a919e39ae094bb7e56d202 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Aug 2023 22:19:29 +0200 Subject: Code cleanup --- .../dnpm/etl/processor/EtlProcessorApplicationTests.kt | 7 ++++++- .../etl/processor/pseudonym/GpasPseudonymGenerator.java | 4 +--- .../dev/dnpm/etl/processor/config/AppConfigProperties.kt | 2 +- .../dev/dnpm/etl/processor/pseudonym/extensions.kt | 2 +- .../dev/dnpm/etl/processor/services/RequestProcessor.kt | 3 --- .../dev/dnpm/etl/processor/services/ResponseProcessor.kt | 4 +--- .../dnpm/etl/processor/web/StatisticsRestController.kt | 6 +++--- .../dnpm/etl/processor/output/RestMtbFileSenderTest.kt | 16 ++++++++-------- .../dnpm/etl/processor/services/ResponseProcessorTest.kt | 6 +----- .../services/kafka/KafkaResponseProcessorTest.kt | 8 ++++---- 10 files changed, 26 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt index 6c5b150..c5a20bb 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt @@ -20,10 +20,13 @@ package dev.dnpm.etl.processor import dev.dnpm.etl.processor.output.MtbFileSender +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.mock.mockito.MockBean +import org.springframework.context.ApplicationContext import org.springframework.test.context.junit.jupiter.SpringExtension import org.testcontainers.junit.jupiter.Testcontainers @@ -34,7 +37,9 @@ import org.testcontainers.junit.jupiter.Testcontainers class EtlProcessorApplicationTests : AbstractTestcontainerTest() { @Test - fun contextLoadsIfMtbFileSenderConfigured() { + fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) { + // Simply check bean configuration + assertThat(context).isNotNull } } diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index f13a034..732a770 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -69,13 +69,11 @@ import java.util.HashMap; public class GpasPseudonymGenerator implements Generator { + private final static FhirContext r4Context = FhirContext.forR4(); private final String gPasUrl; private final String psnTargetDomain; - private static FhirContext r4Context = FhirContext.forR4(); private final HttpHeaders httpHeader; - private final RetryTemplate retryTemplate = defaultTemplate(); - private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class); private SSLContext customSslContext; diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 6502a1b..06e730b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties(AppConfigProperties.NAME) data class AppConfigProperties( - var bwhc_uri: String?, + var bwhcUri: String?, var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt index 580785d..c0050a4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt @@ -38,7 +38,7 @@ infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) { this.histologyReports.forEach { it.patient = patientPseudonym } this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym } this.molecularPathologyFindings.forEach { it.patient = patientPseudonym } - this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } } + this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } } this.ngsReports.forEach { it.patient = patientPseudonym } this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym } this.rebiopsyRequests.forEach { it.patient = patientPseudonym } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 34156f7..3cd912c 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -30,7 +30,6 @@ import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils -import org.slf4j.LoggerFactory import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import java.time.Instant @@ -45,8 +44,6 @@ class RequestProcessor( private val applicationEventPublisher: ApplicationEventPublisher ) { - private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) - fun processMtbFile(mtbFile: MtbFile) { val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index f2e9e2e..677443a 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.services -import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.Report import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus @@ -33,8 +32,7 @@ import java.util.* @Service class ResponseProcessor( private val requestRepository: RequestRepository, - private val statisticsUpdateProducer: Sinks.Many, - private val objectMapper: ObjectMapper + private val statisticsUpdateProducer: Sinks.Many ) { private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt index a418772..6f0e820 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt @@ -83,9 +83,9 @@ class StatisticsRestController( .groupBy { formatter.format(it.processedAt) } .map { val requestList = it.value - .groupBy { it.status } - .map { - Pair(it.key, it.value.size) + .groupBy { request -> request.status } + .map { request -> + Pair(request.key, request.value.size) } .toMap() Pair( diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt index 78b5a45..0cad285 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -105,7 +105,7 @@ class RestMtbFileSenderTest { } """.trimIndent() - val mtbFile = MtbFile.builder() + val mtbFile: MtbFile = MtbFile.builder() .withPatient( Patient.builder() .withId("PID") @@ -129,7 +129,7 @@ class RestMtbFileSenderTest { ) .build() - private val errorResponseBody = "Sonstiger Fehler bei der Übertragung" + private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung" /** * Synthetic http responses with related request status @@ -147,23 +147,23 @@ class RestMtbFileSenderTest { RequestWithResponse( HttpStatus.BAD_REQUEST, "??", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.UNPROCESSABLE_ENTITY, errorBody, - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), // Some more errors not mentioned in documentation RequestWithResponse( HttpStatus.NOT_FOUND, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.INTERNAL_SERVER_ERROR, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ) ) } @@ -180,12 +180,12 @@ class RestMtbFileSenderTest { RequestWithResponse( HttpStatus.NOT_FOUND, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ), RequestWithResponse( HttpStatus.INTERNAL_SERVER_ERROR, "what????", - MtbFileSender.Response(RequestStatus.ERROR, errorResponseBody) + MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY) ) ) } diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt index cfb1111..b9e4b7f 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -19,8 +19,6 @@ package dev.dnpm.etl.processor.services -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.KotlinModule import dev.dnpm.etl.processor.monitoring.Request import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus @@ -62,12 +60,10 @@ class ResponseProcessorTest { @Mock requestRepository: RequestRepository, @Mock statisticsUpdateProducer: Sinks.Many ) { - val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) - this.requestRepository = requestRepository this.statisticsUpdateProducer = statisticsUpdateProducer - this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt index 0f524ca..6d83146 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -45,7 +45,7 @@ class KafkaResponseProcessorTest { private lateinit var kafkaResponseProcessor: KafkaResponseProcessor - private fun createkafkaRecord( + private fun createKafkaRecord( requestId: String? = null, statusCode: Int = 200, statusBody: Map? = mapOf() @@ -79,14 +79,14 @@ class KafkaResponseProcessorTest { @Test fun shouldNotProcessRecordsWithoutValidKey() { - this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) verify(eventPublisher, never()).publishEvent(any()) } @Test fun shouldNotProcessRecordsWithoutValidBody() { - this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) verify(eventPublisher, never()).publishEvent(any()) } @@ -94,7 +94,7 @@ class KafkaResponseProcessorTest { @ParameterizedTest @MethodSource("statusCodeSource") fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { - this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode)) verify(eventPublisher, times(1)).publishEvent(any()) } -- cgit v1.2.3 From 66dc96680da5e263550413493578ebe936dde149 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 01:09:22 +0200 Subject: Update dev config and added related information into README.md --- src/main/resources/application-dev.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 551f3f8..b1cc2fc 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -4,8 +4,11 @@ spring: file: ./dev-compose.yml app: - rest: - uri: http://localhost:9000/bwhc/etl/api + #rest: + # uri: http://localhost:9000/bwhc/etl/api + + # Note: Make sure, hostname "kafka" points to 127.0.0.1 + # otherwise connection will not be available kafka: topic: test response-topic: test-response -- cgit v1.2.3 From 2eb5cc61b9809523d51d1fa7af7a1afc1fdb7f0c Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Tue, 15 Aug 2023 10:58:17 +0200 Subject: Change Kafka response body JSON alias --- .../dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index ef880f4..68e3611 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -84,7 +84,7 @@ class KafkaResponseProcessor( data class ResponseKey(val requestId: String) data class ResponseBody( - @JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int, - @JsonProperty("status_body") val statusBody: Map + @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, + @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) } \ No newline at end of file -- cgit v1.2.3 From 8dc82225a4cd45a315fac3efe4d76513e6d536fc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 16 Aug 2023 15:25:46 +0200 Subject: Issue #7: Send and expect requestId in record body, not in record key (#8) --- .../etl/processor/output/KafkaMtbFileSender.kt | 12 ++--- .../etl/processor/services/ResponseProcessor.kt | 2 +- .../services/kafka/KafkaResponseProcessor.kt | 54 +++++++++------------- src/main/resources/application-dev.yml | 2 +- .../etl/processor/output/KafkaMtbFileSenderTest.kt | 12 +++-- .../services/kafka/KafkaResponseProcessorTest.kt | 54 +++++++++++++++++----- 6 files changed, 80 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index e7f9769..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -40,7 +40,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(request.mtbFile) + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") @@ -68,7 +68,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(dummyMtbFile) + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) if (result.get() != null) { @@ -85,12 +85,12 @@ class KafkaMtbFileSender( private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId}\", " + - "\"requestId\": \"${request.requestId}\"}" + return "{\"pid\": \"${request.patientId}\"}" } + + data class Data(val requestId: String, val content: MtbFile) } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index 677443a..4048348 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -71,7 +71,7 @@ class ResponseProcessor( } else -> { - logger.error("Cannot process response: Unknown response code!") + logger.error("Cannot process response: Unknown response!") return@ifPresentOrElse } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 68e3611..a29010f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -41,50 +41,40 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) Optional.empty() - }.ifPresentOrElse({ responseKey -> - val event = try { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - responseBody.statusCode.asRequestStatus(), - when (responseBody.statusCode.asRequestStatus()) { - RequestStatus.SUCCESS -> { - Optional.empty() - } + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - RequestStatus.WARNING, RequestStatus.ERROR -> { - Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - else -> { - logger.error("Kafka response: Unknown response code!") - Optional.empty() - } + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() } - ) - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - RequestStatus.ERROR, - Optional.of("Cannot process Kafka response") - ) - } + } + ) eventPublisher.publishEvent(event) }, { - logger.error("No response key in Kafka response") + logger.error("No requestId in Kafka response") }) } - data class ResponseKey(val requestId: String) - data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) + } \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index b1cc2fc..a60cd8a 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -11,7 +11,7 @@ app: # otherwise connection will not be available kafka: topic: test - response-topic: test-response + response-topic: test_response servers: kafka:9092 server: diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index 14bdd5d..3ec9757 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE))) } @Test @@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED))) } companion object { @@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest { }.build() } + fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data { + return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus)) + } + data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) @JvmStatic diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt index 6d83146..95bf41b 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -46,7 +46,7 @@ class KafkaResponseProcessorTest { private lateinit var kafkaResponseProcessor: KafkaResponseProcessor private fun createKafkaRecord( - requestId: String? = null, + requestId: String, statusCode: Int = 200, statusBody: Map? = mapOf() ): ConsumerRecord { @@ -54,15 +54,11 @@ class KafkaResponseProcessorTest { "test-topic", 0, 0, - if (requestId == null) { - null - } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) - }, + null, if (statusBody == null) { "" } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody)) } ) } @@ -78,17 +74,51 @@ class KafkaResponseProcessorTest { } @Test - fun shouldNotProcessRecordsWithoutValidKey() { - this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) + fun shouldNotProcessRecordsWithoutRequestIdInBody() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "statusCode": 200, + "statusBody": {} + } + """.trimIndent() + ) + + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldProcessRecordsWithAliasNames() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "request_id": "test0123456789", + "status_code": 200, + "status_body": {} + } + """.trimIndent() + ) - verify(eventPublisher, never()).publishEvent(any()) + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, times(1)).publishEvent(any()) } @Test - fun shouldNotProcessRecordsWithoutValidBody() { + fun shouldNotProcessRecordsWithoutValidStatusBody() { this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) - verify(eventPublisher, never()).publishEvent(any()) + verify(eventPublisher, never()).publishEvent(any()) } @ParameterizedTest -- cgit v1.2.3 From 9921e1e684cbc236ac645d5172a2385fa69e5bbc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 19 Aug 2023 11:45:21 +0200 Subject: Throw PseudonymRequestFailed exception with error message This will throw an exception with error message describing what the error is instead of having a more generic NoSuchElementException to be thrown if Optional.get() has no value after calling findFirst() on an empty stream. --- .../etl/processor/pseudonym/GpasPseudonymGenerator.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 732a770..91e465b 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -108,12 +108,19 @@ public class GpasPseudonymGenerator implements Generator { @NotNull public static String unwrapPseudonym(Parameters gPasPseudonymResult) { - Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst() - .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst() - .orElseGet(ParametersParameterComponent::new).getValue(); + final var parameters = gPasPseudonymResult.getParameter().stream().findFirst(); + + if (parameters.isEmpty()) { + throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one"); + } + + final var identifier = (Identifier) parameters.get().getPart().stream() + .filter(a -> a.getName().equals("pseudonym")) + .findFirst() + .orElseGet(ParametersParameterComponent::new).getValue(); // pseudonym - return pseudonym.getSystem() + "|" + pseudonym.getValue(); + return identifier.getSystem() + "|" + identifier.getValue(); } -- cgit v1.2.3