diff options
Diffstat (limited to 'src/main/kotlin/dev/dnpm/etl/processor')
7 files changed, 211 insertions, 161 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 6d0254e..309ff2d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order @@ -70,10 +70,10 @@ class AppKafkaConfiguration { @Bean fun kafkaResponseProcessor( - requestRepository: RequestRepository, + applicationEventPublisher: ApplicationEventPublisher, objectMapper: ObjectMapper ): KafkaResponseProcessor { - return KafkaResponseProcessor(requestRepository, objectMapper) + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index d903745..9448e29 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -43,13 +44,13 @@ class KafkaMtbFileSender( ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } @@ -72,13 +73,13 @@ class KafkaMtbFileSender( if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..24cdc49 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,13 +20,13 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { @@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 6465e82..d2f8619 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service @@ -41,73 +42,54 @@ class RequestProcessor( private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many<Any> + private val applicationEventPublisher: ApplicationEventPublisher ) { private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) + + requestService.save( + Request( + uuid = requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE + ) + ) + if (isDuplication(mtbFile)) { - requestService.save( - Request( - patientId = mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) return } - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) - val responseStatus = sender.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { @@ -126,55 +108,31 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = requestStatus, - type = RequestType.DELETE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE + ) + ) + + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } catch (e: Exception) { requestService.save( Request( @@ -188,7 +146,6 @@ class RequestProcessor( ) ) } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..d7ad86f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,96 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many<Any>, + private val objectMapper: ObjectMapper +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response code!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional<String> = Optional.empty() +)
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 1e9263d..ef880f4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.Report -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.kafka.listener.MessageListener import java.time.Instant +import java.util.* class KafkaResponseProcessor( - private val requestRepository: RequestRepository, + private val eventPublisher: ApplicationEventPublisher, private val objectMapper: ObjectMapper ) : MessageListener<String, String> { @@ -39,55 +41,44 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord<String, String>) { try { - val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) - requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + } catch (e: Exception) { + Optional.empty() + }.ifPresentOrElse({ responseKey -> + val event = try { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - when (responseBody.statusCode) { - 200 -> { - it.status = RequestStatus.SUCCESS - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - requestRepository.save(it) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - 201 -> { - it.status = RequestStatus.WARNING - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) + else -> { + logger.error("Kafka response: Unknown response code!") + Optional.empty() + } } - - 400, 422 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - in 900..999 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - else -> { - logger.error("Cannot process Kafka response: Unknown response code!") - } - } + ) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + RequestStatus.ERROR, + Optional.of("Cannot process Kafka response") + ) } - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - } + eventPublisher.publishEvent(event) + }, { + logger.error("No response key in Kafka response") + }) } data class ResponseKey(val requestId: String) |
