summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorJakub Lidke2023-08-25 12:59:38 +0200
committerJakub Lidke2023-08-25 12:59:38 +0200
commitda26b5a2c88f68a36d20ead2e84f15672d5995f0 (patch)
treeb30b82fa7099bdb9cf4c261006548b58607d5f01 /src/main
parentbbea48322fa5a24ff61eef25ed84f4966bf49c42 (diff)
parent9921e1e684cbc236ac645d5172a2385fa69e5bbc (diff)
Merge branch 'master' into add-docker-build
# Conflicts: # README.md # build.gradle.kts
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java19
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt1
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt28
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt79
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt58
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt15
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt4
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt58
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt21
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt23
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt36
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt50
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt190
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt57
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt94
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt80
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt (renamed from src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt)27
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt6
-rw-r--r--src/main/resources/application-dev.yml14
-rw-r--r--src/main/resources/application.yml4
21 files changed, 621 insertions, 249 deletions
diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
index f13a034..91e465b 100644
--- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
+++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
@@ -69,13 +69,11 @@ import java.util.HashMap;
public class GpasPseudonymGenerator implements Generator {
+ private final static FhirContext r4Context = FhirContext.forR4();
private final String gPasUrl;
private final String psnTargetDomain;
- private static FhirContext r4Context = FhirContext.forR4();
private final HttpHeaders httpHeader;
-
private final RetryTemplate retryTemplate = defaultTemplate();
-
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
private SSLContext customSslContext;
@@ -110,12 +108,19 @@ public class GpasPseudonymGenerator implements Generator {
@NotNull
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
- Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
- .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
- .orElseGet(ParametersParameterComponent::new).getValue();
+ final var parameters = gPasPseudonymResult.getParameter().stream().findFirst();
+
+ if (parameters.isEmpty()) {
+ throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one");
+ }
+
+ final var identifier = (Identifier) parameters.get().getPart().stream()
+ .filter(a -> a.getName().equals("pseudonym"))
+ .findFirst()
+ .orElseGet(ParametersParameterComponent::new).getValue();
// pseudonym
- return pseudonym.getSystem() + "|" + pseudonym.getValue();
+ return identifier.getSystem() + "|" + identifier.getValue();
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
index 0c4ab68..5d28c97 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
@@ -28,4 +28,3 @@ class EtlProcessorApplication
fun main(args: Array<String>) {
runApplication<EtlProcessorApplication>(*args)
}
-
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 64be70d..06e730b 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(AppConfigProperties.NAME)
data class AppConfigProperties(
- var bwhc_uri: String?,
+ var bwhcUri: String?,
var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN
) {
companion object {
@@ -48,7 +48,7 @@ data class GPasConfigProperties(
val password: String?,
val sslCaLocation: String?,
-) {
+ ) {
companion object {
const val NAME = "app.pseudonymize.gpas"
}
@@ -66,6 +66,8 @@ data class RestTargetProperties(
@ConfigurationProperties(KafkaTargetProperties.NAME)
data class KafkaTargetProperties(
val topic: String = "etl-processor",
+ val responseTopic: String = "${topic}_response",
+ val groupId: String = "${topic}_group",
val servers: String = ""
) {
companion object {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index c677f2b..6b15fc0 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -21,9 +21,6 @@ package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.ReportService
-import dev.dnpm.etl.processor.output.KafkaMtbFileSender
-import dev.dnpm.etl.processor.output.MtbFileSender
-import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
@@ -32,7 +29,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.core.KafkaTemplate
import reactor.core.publisher.Sinks
@Configuration
@@ -40,9 +36,7 @@ import reactor.core.publisher.Sinks
value = [
AppConfigProperties::class,
PseudonymizeConfigProperties::class,
- GPasConfigProperties::class,
- RestTargetProperties::class,
- KafkaTargetProperties::class
+ GPasConfigProperties::class
]
)
class AppConfiguration {
@@ -60,25 +54,13 @@ class AppConfiguration {
}
@Bean
- fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService {
+ fun pseudonymizeService(
+ generator: Generator,
+ pseudonymizeConfigProperties: PseudonymizeConfigProperties
+ ): PseudonymizeService {
return PseudonymizeService(generator, pseudonymizeConfigProperties)
}
- @ConditionalOnProperty(value = ["app.rest.uri"])
- @Bean
- fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender {
- return RestMtbFileSender(restTargetProperties)
- }
-
- @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
- @Bean
- fun kafkaMtbFileSender(
- kafkaTemplate: KafkaTemplate<String, String>,
- objectMapper: ObjectMapper
- ): MtbFileSender {
- return KafkaMtbFileSender(kafkaTemplate, objectMapper)
- }
-
@Bean
fun reportService(objectMapper: ObjectMapper): ReportService {
return ReportService(objectMapper)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
new file mode 100644
index 0000000..309ff2d
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -0,0 +1,79 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.output.KafkaMtbFileSender
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.annotation.Order
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.listener.ContainerProperties
+import org.springframework.kafka.listener.KafkaMessageListenerContainer
+
+@Configuration
+@EnableConfigurationProperties(
+ value = [KafkaTargetProperties::class]
+)
+@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+@ConditionalOnMissingBean(MtbFileSender::class)
+@Order(-5)
+class AppKafkaConfiguration {
+
+ private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java)
+
+ @Bean
+ fun kafkaMtbFileSender(
+ kafkaTemplate: KafkaTemplate<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
+ objectMapper: ObjectMapper
+ ): MtbFileSender {
+ logger.info("Selected 'KafkaMtbFileSender'")
+ return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
+ }
+
+ @Bean
+ fun kafkaListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
+ kafkaResponseProcessor: KafkaResponseProcessor
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
+ containerProperties.messageListener = kafkaResponseProcessor
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ fun kafkaResponseProcessor(
+ applicationEventPublisher: ApplicationEventPublisher,
+ objectMapper: ObjectMapper
+ ): KafkaResponseProcessor {
+ return KafkaResponseProcessor(applicationEventPublisher, objectMapper)
+ }
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
new file mode 100644
index 0000000..a830597
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
@@ -0,0 +1,58 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.output.RestMtbFileSender
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.annotation.Order
+import org.springframework.web.client.RestTemplate
+
+@Configuration
+@EnableConfigurationProperties(
+ value = [
+ RestTargetProperties::class
+ ]
+)
+@ConditionalOnProperty(value = ["app.rest.uri"])
+@ConditionalOnMissingBean(MtbFileSender::class)
+@Order(-10)
+class AppRestConfiguration {
+
+ private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java)
+
+ @Bean
+ fun restTemplate(): RestTemplate {
+ return RestTemplate()
+ }
+
+ @Bean
+ fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
+ logger.info("Selected 'RestMtbFileSender'")
+ return RestMtbFileSender(restTemplate, restTargetProperties)
+ }
+
+}
+
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
index 6ee8ae9..ae36705 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
@@ -19,7 +19,9 @@
package dev.dnpm.etl.processor.monitoring
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonValue
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
@@ -33,15 +35,22 @@ class ReportService(
}
return try {
objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues
- } catch (e: JsonMappingException) {
- e.printStackTrace()
- listOf()
+ } catch (e: Exception) {
+ val otherIssue =
+ Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'")
+ return when (e) {
+ is JsonMappingException -> listOf(otherIssue)
+ is JsonParseException -> listOf(otherIssue)
+ else -> throw e
+ }
}
}
+ @JsonIgnoreProperties(ignoreUnknown = true)
private data class DataQualityReport(val issues: List<Issue>)
+ @JsonIgnoreProperties(ignoreUnknown = true)
data class Issue(val severity: Severity, val message: String)
enum class Severity(@JsonValue val value: String) {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
index ecd8219..c1d4d43 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
@@ -36,9 +36,9 @@ data class Request(
val patientId: String,
val pid: String,
val fingerprint: String,
- val status: RequestStatus,
val type: RequestType,
- val processedAt: Instant = Instant.now(),
+ var status: RequestStatus,
+ var processedAt: Instant = Instant.now(),
@Embedded.Nullable var report: Report? = null
)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 55503cf..5772faf 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -20,11 +20,16 @@
package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
+import de.ukw.ccc.bwhc.dto.Consent
+import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
+ private val kafkaTargetProperties: KafkaTargetProperties,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -32,31 +37,60 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
- val result = kafkaTemplate.sendDefault(
- header(request),
- objectMapper.writeValueAsString(request.mtbFile)
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
-
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
}
- // TODO not yet implemented
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ val dummyMtbFile = MtbFile.builder()
+ .withConsent(
+ Consent.builder()
+ .withPatient(request.patientId)
+ .withStatus(Consent.Status.REJECTED)
+ .build()
+ )
+ .build()
+
+ return try {
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
+ )
+
+ if (result.get() != null) {
+ logger.debug("Sent deletion request via KafkaMtbFileSender")
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
+ } else {
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
+ } catch (e: Exception) {
+ logger.error("An error occurred sending to kafka", e)
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
}
- private fun header(request: MtbFileSender.MtbFileRequest): String {
+ private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
- "\"eid\": \"${request.mtbFile.episode.id}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ "\"eid\": \"${request.mtbFile.episode.id}\"}"
}
+
+ private fun key(request: MtbFileSender.DeleteRequest): String {
+ return "{\"pid\": \"${request.patientId}\"}"
+ }
+
+ data class Data(val requestId: String, val content: MtbFile)
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
index 6914ba5..de0efaa 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
@@ -20,22 +20,31 @@
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.springframework.http.HttpStatusCode
interface MtbFileSender {
fun send(request: MtbFileRequest): Response
fun send(request: DeleteRequest): Response
- data class Response(val status: ResponseStatus, val reason: String = "")
+ data class Response(val status: RequestStatus, val body: String = "")
data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile)
data class DeleteRequest(val requestId: String, val patientId: String)
- enum class ResponseStatus {
- SUCCESS,
- WARNING,
- ERROR,
- UNKNOWN
+}
+
+fun Int.asRequestStatus(): RequestStatus {
+ return when (this) {
+ 200 -> RequestStatus.SUCCESS
+ 201 -> RequestStatus.WARNING
+ in 400 .. 999 -> RequestStatus.ERROR
+ else -> RequestStatus.UNKNOWN
}
+}
+
+fun HttpStatusCode.asRequestStatus(): RequestStatus {
+ return this.value().asRequestStatus()
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 04c73ef..1c59f5c 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -20,20 +20,21 @@
package dev.dnpm.etl.processor.output
import dev.dnpm.etl.processor.config.RestTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
-import org.springframework.web.util.UriComponentsBuilder
-class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender {
+class RestMtbFileSender(
+ private val restTemplate: RestTemplate,
+ private val restTargetProperties: RestTargetProperties
+) : MtbFileSender {
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
- private val restTemplate = RestTemplate()
-
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
try {
val headers = HttpHeaders()
@@ -46,21 +47,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}")
+ return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
- return if (response.body?.contains("warning") == true) {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}")
- } else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
- }
+ return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
@@ -74,14 +71,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
String::class.java
)
logger.debug("Sent file via RestMtbFileSender")
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ return MtbFileSender.Response(RequestStatus.SUCCESS)
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
index 1a79850..ab8ce2f 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
@@ -19,7 +19,6 @@
package dev.dnpm.etl.processor.pseudonym
-import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties
class PseudonymizeService(
@@ -27,38 +26,11 @@ class PseudonymizeService(
private val configProperties: PseudonymizeConfigProperties
) {
- fun pseudonymize(mtbFile: MtbFile): MtbFile {
- val patientPseudonym = patientPseudonym(mtbFile.patient.id)
-
- mtbFile.episode.patient = patientPseudonym
- mtbFile.carePlans.forEach { it.patient = patientPseudonym }
- mtbFile.patient.id = patientPseudonym
- mtbFile.claims.forEach { it.patient = patientPseudonym }
- mtbFile.consent.patient = patientPseudonym
- mtbFile.claimResponses.forEach { it.patient = patientPseudonym }
- mtbFile.diagnoses.forEach { it.patient = patientPseudonym }
- mtbFile.ecogStatus.forEach { it.patient = patientPseudonym }
- mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
- mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
- mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
- mtbFile.histologyReports.forEach { it.patient = patientPseudonym }
- mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
- mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
- mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } }
- mtbFile.ngsReports.forEach { it.patient = patientPseudonym }
- mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
- mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym }
- mtbFile.recommendations.forEach { it.patient = patientPseudonym }
- mtbFile.recommendations.forEach { it.patient = patientPseudonym }
- mtbFile.responses.forEach { it.patient = patientPseudonym }
- mtbFile.specimens.forEach { it.patient = patientPseudonym }
- mtbFile.specimens.forEach { it.patient = patientPseudonym }
-
- return mtbFile
- }
-
fun patientPseudonym(patientId: String): String {
- return "${configProperties.prefix}_${generator.generate(patientId)}"
+ return when (generator) {
+ is GpasPseudonymGenerator -> generator.generate(patientId)
+ else -> "${configProperties.prefix}_${generator.generate(patientId)}"
+ }
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt
new file mode 100644
index 0000000..c0050a4
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt
@@ -0,0 +1,50 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.pseudonym
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+
+infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) {
+ val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id)
+
+ this.episode.patient = patientPseudonym
+ this.carePlans.forEach { it.patient = patientPseudonym }
+ this.patient.id = patientPseudonym
+ this.claims.forEach { it.patient = patientPseudonym }
+ this.consent.patient = patientPseudonym
+ this.claimResponses.forEach { it.patient = patientPseudonym }
+ this.diagnoses.forEach { it.patient = patientPseudonym }
+ this.ecogStatus.forEach { it.patient = patientPseudonym }
+ this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
+ this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
+ this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
+ this.histologyReports.forEach { it.patient = patientPseudonym }
+ this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
+ this.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
+ this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } }
+ this.ngsReports.forEach { it.patient = patientPseudonym }
+ this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
+ this.rebiopsyRequests.forEach { it.patient = patientPseudonym }
+ this.recommendations.forEach { it.patient = patientPseudonym }
+ this.recommendations.forEach { it.patient = patientPseudonym }
+ this.responses.forEach { it.patient = patientPseudonym }
+ this.specimens.forEach { it.patient = patientPseudonym }
+ this.specimens.forEach { it.patient = patientPseudonym }
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index 8588ebe..3cd912c 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -21,169 +21,117 @@ package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.monitoring.*
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
+import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
-import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
-import reactor.core.publisher.Sinks
+import java.time.Instant
import java.util.*
@Service
class RequestProcessor(
private val pseudonymizeService: PseudonymizeService,
- private val senders: List<MtbFileSender>,
- private val requestRepository: RequestRepository,
+ private val sender: MtbFileSender,
+ private val requestService: RequestService,
private val objectMapper: ObjectMapper,
- private val statisticsUpdateProducer: Sinks.Many<Any>
+ private val applicationEventPublisher: ApplicationEventPublisher
) {
- private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
-
- fun processMtbFile(mtbFile: MtbFile): RequestStatus {
+ fun processMtbFile(mtbFile: MtbFile) {
+ val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
- val pseudonymized = pseudonymizeService.pseudonymize(mtbFile)
- val lastRequestForPatient =
- requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id)
- .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
+ mtbFile pseudonymizeWith pseudonymizeService
- if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) {
- requestRepository.save(
- Request(
- patientId = pseudonymized.patient.id,
- pid = pid,
- fingerprint = fingerprint(mtbFile),
- status = RequestStatus.DUPLICATION,
- type = RequestType.MTB_FILE,
- report = Report("Duplikat erkannt - keine Daten weitergeleitet")
- )
- )
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
- return RequestStatus.DUPLICATION
- }
+ val request = MtbFileSender.MtbFileRequest(requestId, mtbFile)
- val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized)
+ requestService.save(
+ Request(
+ uuid = requestId,
+ patientId = request.mtbFile.patient.id,
+ pid = pid,
+ fingerprint = fingerprint(request.mtbFile),
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.MTB_FILE
+ )
+ )
- val responses = senders.map {
- val responseStatus = it.send(request)
- if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
- logger.info(
- "Sent file for Patient '{}' using '{}'",
- pseudonymized.patient.id,
- it.javaClass.simpleName
+ if (isDuplication(mtbFile)) {
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ RequestStatus.DUPLICATION
)
- } else {
- logger.error(
- "Error sending file for Patient '{}' using '{}'",
- pseudonymized.patient.id,
- it.javaClass.simpleName
- )
- }
- responseStatus
+ )
+ return
}
- val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
- RequestStatus.ERROR
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) {
- RequestStatus.WARNING
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
- RequestStatus.SUCCESS
- } else {
- RequestStatus.UNKNOWN
- }
+ val responseStatus = sender.send(request)
- requestRepository.save(
- Request(
- uuid = request.requestId,
- patientId = request.mtbFile.patient.id,
- pid = pid,
- fingerprint = fingerprint(request.mtbFile),
- status = requestStatus,
- type = RequestType.MTB_FILE,
- report = when (requestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten",
- responses.joinToString("\n") { it.reason })
-
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
}
)
)
+ }
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
+ val lastMtbFileRequestForPatient =
+ requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id)
+ val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id)
- return requestStatus
+ return null != lastMtbFileRequestForPatient
+ && !isLastRequestDeletion
+ && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile)
}
- fun processDeletion(patientId: String): RequestStatus {
+ fun processDeletion(patientId: String) {
val requestId = UUID.randomUUID().toString()
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
- val responses = senders.map {
- val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
- when (responseStatus.status) {
- MtbFileSender.ResponseStatus.SUCCESS -> {
- logger.info(
- "Sent delete for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
-
- MtbFileSender.ResponseStatus.ERROR -> {
- logger.error(
- "Error deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
-
- else -> {
- logger.error(
- "Unknown result on deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
- }
- responseStatus
- }
-
- val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
- RequestStatus.ERROR
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
- RequestStatus.SUCCESS
- } else {
- RequestStatus.UNKNOWN
- }
-
- requestRepository.save(
+ requestService.save(
Request(
uuid = requestId,
patientId = patientPseudonym,
pid = patientId,
fingerprint = fingerprint(patientPseudonym),
- status = overallRequestStatus,
- type = RequestType.DELETE,
- report = when (overallRequestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
- }
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.DELETE
)
)
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
+
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
+ }
+ )
+ )
- return overallRequestStatus
} catch (e: Exception) {
- requestRepository.save(
+ requestService.save(
Request(
uuid = requestId,
patientId = "???",
@@ -194,10 +142,6 @@ class RequestProcessor(
report = Report("Fehler bei der Pseudonymisierung")
)
)
-
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
-
- return RequestStatus.ERROR
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt
new file mode 100644
index 0000000..e0043d2
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt
@@ -0,0 +1,57 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import org.springframework.stereotype.Service
+
+@Service
+class RequestService(
+ private val requestRepository: RequestRepository
+) {
+
+ fun save(request: Request) = requestRepository.save(request)
+
+ fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository
+ .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym)
+
+ fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) =
+ Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym))
+
+ fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) =
+ Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym))
+
+ companion object {
+
+ fun lastMtbFileRequestForPatientPseudonym(allRequests: List<Request>) = allRequests
+ .filter { it.type == RequestType.MTB_FILE }
+ .sortedByDescending { it.processedAt }
+ .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
+
+ fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests
+ .filter { it.status != RequestStatus.UNKNOWN }
+ .maxByOrNull { it.processedAt }?.type == RequestType.DELETE
+
+ }
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
new file mode 100644
index 0000000..4048348
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
@@ -0,0 +1,94 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.slf4j.LoggerFactory
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import reactor.core.publisher.Sinks
+import java.time.Instant
+import java.util.*
+
+@Service
+class ResponseProcessor(
+ private val requestRepository: RequestRepository,
+ private val statisticsUpdateProducer: Sinks.Many<Any>
+) {
+
+ private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
+
+ @EventListener(classes = [ResponseEvent::class])
+ fun handleResponseEvent(event: ResponseEvent) {
+ requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({
+ it.processedAt = event.timestamp
+ it.status = event.status
+
+ when (event.status) {
+ RequestStatus.SUCCESS -> {
+ it.report = Report(
+ "Keine Probleme erkannt",
+ )
+ }
+
+ RequestStatus.WARNING -> {
+ it.report = Report(
+ "Warnungen über mangelhafte Daten",
+ event.body.orElse("")
+ )
+ }
+
+ RequestStatus.ERROR -> {
+ it.report = Report(
+ "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
+ event.body.orElse("")
+ )
+ }
+
+ RequestStatus.DUPLICATION -> {
+ it.report = Report(
+ "Duplikat erkannt"
+ )
+ }
+
+ else -> {
+ logger.error("Cannot process response: Unknown response!")
+ return@ifPresentOrElse
+ }
+ }
+
+ requestRepository.save(it)
+
+ statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ }, {
+ logger.error("Response for unknown request '${event.requestUuid}'!")
+ })
+ }
+
+}
+
+data class ResponseEvent(
+ val requestUuid: String,
+ val timestamp: Instant,
+ val status: RequestStatus,
+ val body: Optional<String> = Optional.empty()
+) \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
new file mode 100644
index 0000000..a29010f
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -0,0 +1,80 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services.kafka
+
+import com.fasterxml.jackson.annotation.JsonAlias
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.output.asRequestStatus
+import dev.dnpm.etl.processor.services.ResponseEvent
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.kafka.listener.MessageListener
+import java.time.Instant
+import java.util.*
+
+class KafkaResponseProcessor(
+ private val eventPublisher: ApplicationEventPublisher,
+ private val objectMapper: ObjectMapper
+) : MessageListener<String, String> {
+
+ private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java)
+
+ override fun onMessage(data: ConsumerRecord<String, String>) {
+ try {
+ Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
+ } catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
+ Optional.empty()
+ }.ifPresentOrElse({ responseBody ->
+ val event = ResponseEvent(
+ responseBody.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ responseBody.statusCode.asRequestStatus(),
+ when (responseBody.statusCode.asRequestStatus()) {
+ RequestStatus.SUCCESS -> {
+ Optional.empty()
+ }
+
+ RequestStatus.WARNING, RequestStatus.ERROR -> {
+ Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
+ }
+
+ else -> {
+ logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
+ Optional.empty()
+ }
+ }
+ )
+ eventPublisher.publishEvent(event)
+ }, {
+ logger.error("No requestId in Kafka response")
+ })
+ }
+
+ data class ResponseBody(
+ @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
+ @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
+ @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
+ )
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt
index a2cc953..9b441f6 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt
@@ -19,40 +19,37 @@
package dev.dnpm.etl.processor.web
+import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.services.RequestProcessor
import org.slf4j.LoggerFactory
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
@RestController
-class MtbFileController(
+class MtbFileRestController(
private val requestProcessor: RequestProcessor,
) {
- private val logger = LoggerFactory.getLogger(MtbFileController::class.java)
+ private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java)
@PostMapping(path = ["/mtbfile"])
fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Void> {
- val requestStatus = requestProcessor.processMtbFile(mtbFile)
-
- return if (requestStatus == RequestStatus.ERROR) {
- ResponseEntity.unprocessableEntity().build()
+ if (mtbFile.consent.status == Consent.Status.ACTIVE) {
+ logger.debug("Accepted MTB File for processing")
+ requestProcessor.processMtbFile(mtbFile)
} else {
- ResponseEntity.noContent().build()
+ logger.debug("Accepted MTB File and process deletion")
+ requestProcessor.processDeletion(mtbFile.patient.id)
}
+ return ResponseEntity.accepted().build()
}
@DeleteMapping(path = ["/mtbfile/{patientId}"])
fun deleteData(@PathVariable patientId: String): ResponseEntity<Void> {
- val requestStatus = requestProcessor.processDeletion(patientId)
-
- return if (requestStatus == RequestStatus.ERROR) {
- ResponseEntity.unprocessableEntity().build()
- } else {
- ResponseEntity.noContent().build()
- }
+ logger.debug("Accepted patient ID to process deletion")
+ requestProcessor.processDeletion(patientId)
+ return ResponseEntity.accepted().build()
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
index a418772..6f0e820 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
@@ -83,9 +83,9 @@ class StatisticsRestController(
.groupBy { formatter.format(it.processedAt) }
.map {
val requestList = it.value
- .groupBy { it.status }
- .map {
- Pair(it.key, it.value.size)
+ .groupBy { request -> request.status }
+ .map { request ->
+ Pair(request.key, request.value.size)
}
.toMap()
Pair(
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 99e4bbf..a60cd8a 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -4,11 +4,15 @@ spring:
file: ./dev-compose.yml
app:
- rest:
- uri: http://localhost:9000/bwhc/etl/api/MTBFile
- #kafka:
- # topic: test
- # servers: kafka:9092
+ #rest:
+ # uri: http://localhost:9000/bwhc/etl/api
+
+ # Note: Make sure, hostname "kafka" points to 127.0.0.1
+ # otherwise connection will not be available
+ kafka:
+ topic: test
+ response-topic: test_response
+ servers: kafka:9092
server:
port: 8000
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 39acb37..5cd47c0 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,7 +1,7 @@
spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
- template:
- default-topic: ${app.kafka.topic}
+ consumer:
+ group-id: ${app.kafka.group-id}
flyway:
locations: "classpath:db/migration/{vendor}" \ No newline at end of file