summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/deploy.yml35
-rw-r--r--.github/workflows/test.yml39
-rw-r--r--README.md128
-rw-r--r--build.gradle.kts55
-rw-r--r--dev-compose.yml9
-rw-r--r--docs/etl.pngbin0 -> 76404 bytes
-rw-r--r--src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt51
-rw-r--r--src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt45
-rw-r--r--src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt102
-rw-r--r--src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt134
-rw-r--r--src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java19
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt1
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt28
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt79
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt58
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt15
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt4
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt58
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt21
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt23
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt36
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt50
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt190
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt57
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt94
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt80
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt (renamed from src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt)27
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt6
-rw-r--r--src/main/resources/application-dev.yml14
-rw-r--r--src/main/resources/application.yml4
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt173
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt195
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt86
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt70
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt372
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt225
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt138
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt149
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt150
40 files changed, 2751 insertions, 275 deletions
diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
new file mode 100644
index 0000000..6d15376
--- /dev/null
+++ b/.github/workflows/deploy.yml
@@ -0,0 +1,35 @@
+name: "Run build and deploy"
+
+on:
+ release:
+ types: [ 'published' ]
+
+jobs:
+ docker:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - uses: actions/setup-java@v3
+ with:
+ java-version: '17'
+ distribution: 'temurin'
+
+ - name: Setup Gradle
+ uses: gradle/gradle-build-action@v2.4.2
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v2
+
+ - name: Login to Docker Hub
+ uses: docker/login-action@v2
+ with:
+ registry: ghcr.io
+ username: ${{ github.actor }}
+ password: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Execute image build and push
+ run: |
+ ./gradlew bootBuildImage
+ docker tag ghcr.io/ccc-mf/etl-processor ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }}
+ docker push ghcr.io/ccc-mf/etl-processor
+ docker push ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }} \ No newline at end of file
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..f7c37f3
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,39 @@
+name: 'Run Tests'
+
+on:
+ push:
+ branches: [ 'master' ]
+ tags: [ '*' ]
+ pull_request:
+ branches: [ '*' ]
+
+jobs:
+ tests:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - uses: actions/setup-java@v3
+ with:
+ java-version: '17'
+ distribution: 'temurin'
+
+ - name: Setup Gradle
+ uses: gradle/gradle-build-action@v2.4.2
+
+ - name: Execute tests
+ run: ./gradlew test
+
+ integrationTests:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - uses: actions/setup-java@v3
+ with:
+ java-version: '17'
+ distribution: 'temurin'
+
+ - name: Setup Gradle
+ uses: gradle/gradle-build-action@v2.4.2
+
+ - name: Execute integration tests
+ run: ./gradlew integrationTest \ No newline at end of file
diff --git a/README.md b/README.md
index ce086d4..b743d92 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,28 @@
-# ETL-Processor for bwHC data
+# ETL-Processor for bwHC data [![Run Tests](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml/badge.svg)](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml)
-Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die
-Patienten-ID.
+Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID.
+
+### Einordnung innerhalb einer DNPM-ETL-Strecke
+
+Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)**.
+
+Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft.
+Duplikate werden verworfen, Änderungen werden weitergeleitet.
+
+Löschanfragen werden immer als Löschanfrage an das bwHC-backend weitergeleitet.
+
+![Modell DNPM-ETL-Strecke](docs/etl.png)
+
+#### HTTP/REST-Konfiguration
+
+Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung direkt an das bwHC-Backend gesendet.
+
+#### Konfiguration für Apache Kafka
+
+Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung an Apache Kafka übergeben.
+Eine Antwort wird dabei ebenfalls mithilfe von Apache Kafka übermittelt und nach der Entgegennahme verarbeitet.
+
+Siehe hierzu auch: https://github.com/CCC-MF/kafka-to-bwhc
## Pseudonymisierung der Patienten-ID
@@ -13,10 +34,8 @@ Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgen
### Eingebaute Pseudonymisierung
-Wurde keine oder die Verwendung der eingebauten Pseudonymisierung konfiguriert, so wird für die
-Patienten-ID der
-entsprechende SHA-256-Hash gebildet und Base64-codiert - hier ohne endende "=" - zuzüglich des
-konfigurierten Prefixes
+Wurde keine oder die Verwendung der eingebauten Pseudonymisierung konfiguriert, so wird für die Patienten-ID der
+entsprechende SHA-256-Hash gebildet und Base64-codiert - hier ohne endende "=" - zuzüglich des konfigurierten Prefixes
als Patienten-Pseudonym verwendet.
### Pseudonymisierung mit gPAS
@@ -28,40 +47,87 @@ Wurde die Verwendung von gPAS konfiguriert, so sind weitere Angaben zu konfiguri
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
-* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt
- werden muss.
+* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss.
## Mögliche Endpunkte
-Für REST-Requests als auch (parallel) zur Nutzung von Kafka-Topics können Endpunkte konfiguriert
-werden.
+Für REST-Requests als auch zur Nutzung von Kafka-Topics können Endpunkte konfiguriert werden.
+
+Es ist dabei nur die Konfiguration eines Endpunkts zulässig.
+Werden sowohl REST als auch Kafka-Endpunkt konfiguriert, wird nur der REST-Endpunkt verwendet.
### REST
-Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das bwHC-Backend
-gesendet wird:
+Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das bwHC-Backend gesendet wird:
-* `APP_REST_URI`: URI der zu benutzenden API der bwHC-Backend-Instanz.
- z.B.: `http://localhost:9000/bwhc/etl/api`
+* `APP_REST_URI`: URI der zu benutzenden API der bwHC-Backend-Instanz. z.B.: `http://localhost:9000/bwhc/etl/api`
### Kafka-Topics
-Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic
-übermittelt wird:
+Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic übermittelt wird:
-* `APP_KAFKA_TOPIC`: Zu verwendendes Topic
+* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Versenden von Anfragen
+* `APP_KAFKA_RESPONSE_TOPIC`: Topic mit Antworten über den Erfolg des Versendens. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_response".
+* `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group".
* `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste
+Wird keine Rückantwort über Apache Kafka empfangen und es gibt keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`.
+
Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden.
-### Docker Image
+Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es
+für HTTP nicht gibt.
+
+#### Retention Time
+
+Generell werden in Apache Kafka alle Records entsprechend der Konfiguration vorgehalten.
+So wird ohne spezielle Konfiguration ein Record für 7 Tage in Apache Kafka gespeichert.
+Es sind innerhalb dieses Zeitraums auch alte Informationen weiterhin enthalten, wenn der Consent später abgelehnt wurde.
+
+Durch eine entsprechende Konfiguration des Topics kann dies verhindert werden.
+
+Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records nach einem Tag
+```
+kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000
+```
+
+#### Key based Retention
+
+Möchten Sie hingegen immer nur die letzte Meldung für einen Patienten und eine Erkrankung in Apache Kafka vorhalten,
+so ist die nachfolgend genannte Konfiguration der Kafka-Topics hilfreich.
-Bauen eines Docker Images kann wie folgt erzeugt werden:
+
+* `retention.ms`: Möglichst kurze Zeit in der alte Records noch erhalten bleiben, z.B. 10 Sekunden 10000
+* `cleanup.policy`: Löschen alter Records und Beibehalten des letzten Records zu einem Key [delete,compact]
+
+Beispiele für ein Topic `test`, hier bitte an die verwendeten Topics anpassen.
+
+```
+kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=10000
+kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config cleanup.policy=[delete,compact]
+```
+
+Da als Key eines Records die (pseudonymisierte) Patienten-ID und die (anonymisierte) Erkrankungs-ID verwendet wird,
+stehen mit obiger Konfiguration der Kafka-Topics nach 10 Sekunden nur noch der jeweils letzte Eintrag für den entsprechenden
+Key zur Verfügung.
+
+Da der Key sowohl für die Records in Richtung bwHC-Backend für die Rückantwort identisch aufgebaut ist, lassen sich so
+auch im Falle eines Consent-Widerspruchs die enthaltenen Daten als auch die Offenlegung durch Verifikationsdaten in der
+Antwort effektiv verhindern, da diese nach 10 Sekunden gelöscht werden.
+Es steht dann nur noch die jeweils letzten Information zur Verfügung, dass für einen Patienten/eine Erkrankung
+ein Consent-Widerspruch erfolgte.
+
+## Docker-Images
+
+Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
+
+### Images lokal bauen
```bash
docker build . -t "imageName"
```
+## Deployment
*Ausführen als Docker Conatiner:*
Wenn gewünscht, Änderungen in der `env` vornehmen. Beachten, dass *MONITORING_HTTP_PORT* über
Host-Umgebung gesetzt werden muss (z.B. .env oder Parameter --env-file )
@@ -69,4 +135,24 @@ Host-Umgebung gesetzt werden muss (z.B. .env oder Parameter --env-file )
```bash
cd ./deploy
docker compose up -d
-``` \ No newline at end of file
+```
+
+## Entwicklungssetup
+
+Zum Starten einer lokalen Entwicklungs- und Testumgebung kann die beiliegende Datei `dev-compose.yml` verwendet werden.
+Diese kann zur Nutzung der Datenbanken **MariaDB** als auch **PostgreSQL** angepasst werden.
+
+Zur Nutzung von Apache Kafka muss dazu ein Eintrag im hosts-File vorgenommen werden und der Hostname `kafka` auf die lokale
+IP-Adresse verweisen. Ohne diese Einstellung ist eine Nutzung von Apache Kafka außerhalb der Docker-Umgebung nicht möglich.
+
+Beim Start der Anwendung mit dem Profil `dev` wird die in `dev-compose.yml` definierte Umgebung beim Start der
+Anwendung mit gestartet:
+
+```
+SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun
+```
+
+Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`.
+
+Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet.
+Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`.
diff --git a/build.gradle.kts b/build.gradle.kts
index 290362a..d8389e6 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -1,4 +1,6 @@
+import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
+import org.springframework.boot.gradle.tasks.bundling.BootBuildImage
plugins {
id("org.springframework.boot") version "3.1.1"
@@ -8,12 +10,31 @@ plugins {
}
group = "de.ukw.ccc"
-version = "0.1.0-SNAPSHOT"
+version = "0.2.0-SNAPSHOT"
+
+var versions = mapOf(
+ "bwhc-dto-java" to "0.2.0",
+ "hapi-fhir" to "6.6.2",
+ "httpclient5" to "5.2.1",
+ "mockito-kotlin" to "5.1.0"
+)
java {
sourceCompatibility = JavaVersion.VERSION_17
}
+sourceSets {
+ create("integrationTest") {
+ compileClasspath += sourceSets.main.get().output
+ runtimeClasspath += sourceSets.main.get().output
+ }
+}
+
+val integrationTestImplementation: Configuration by configurations.getting {
+ extendsFrom(configurations.testImplementation.get())
+ extendsFrom(configurations.runtimeOnly.get())
+}
+
configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
@@ -41,10 +62,10 @@ dependencies {
implementation("org.flywaydb:flyway-mysql")
implementation("commons-codec:commons-codec")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
- implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
- implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
- implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
- implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
+ implementation("de.ukw.ccc:bwhc-dto-java:${versions["bwhc-dto-java"]}")
+ implementation("ca.uhn.hapi.fhir:hapi-fhir-base:${versions["hapi-fhir"]}")
+ implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:${versions["hapi-fhir"]}")
+ implementation("org.apache.httpcomponents.client5:httpclient5:${versions["httpclient5"]}")
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
runtimeOnly("org.postgresql:postgresql")
developmentOnly("org.springframework.boot:spring-boot-devtools")
@@ -52,6 +73,9 @@ dependencies {
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
+ testImplementation("org.mockito.kotlin:mockito-kotlin:${versions["mockito-kotlin"]}")
+ integrationTestImplementation("org.testcontainers:junit-jupiter")
+ integrationTestImplementation("org.testcontainers:postgresql")
}
tasks.withType<KotlinCompile> {
@@ -63,5 +87,26 @@ tasks.withType<KotlinCompile> {
tasks.withType<Test> {
useJUnitPlatform()
+ testLogging {
+ events(TestLogEvent.FAILED, TestLogEvent.PASSED, TestLogEvent.SKIPPED)
+ }
}
+task<Test>("integrationTest") {
+ description = "Runs integration tests"
+
+ testClassesDirs = sourceSets["integrationTest"].output.classesDirs
+ classpath = sourceSets["integrationTest"].runtimeClasspath
+
+ shouldRunAfter("test")
+}
+
+tasks.named<BootBuildImage>("bootBuildImage") {
+ imageName.set("ghcr.io/ccc-mf/etl-processor")
+
+ environment.set(environment.get() + mapOf(
+ "BP_OCI_SOURCE" to "https://github.com/CCC-MF/etl-processor",
+ "BP_OCI_LICENSES" to "AGPLv3",
+ "BP_OCI_DESCRIPTION" to "ETL Processor for bwHC MTB files"
+ ))
+}
diff --git a/dev-compose.yml b/dev-compose.yml
index 9b25794..8f0780f 100644
--- a/dev-compose.yml
+++ b/dev-compose.yml
@@ -1,4 +1,7 @@
services:
+
+ # Note: Make sure, hostname "kafka" points to 127.0.0.1
+ # otherwise connection will not be available
kafka:
image: bitnami/kafka
hostname: kafka
@@ -6,6 +9,12 @@ services:
- "9092:9092"
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
+ KAFKA_CFG_NODE_ID: "0"
+ KAFKA_CFG_PROCESS_ROLES: "controller,broker"
+ KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
+ KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
+ KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
+ KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
mariadb:
image: mariadb:10
diff --git a/docs/etl.png b/docs/etl.png
new file mode 100644
index 0000000..0ca5def
--- /dev/null
+++ b/docs/etl.png
Binary files differ
diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt
new file mode 100644
index 0000000..13b57d0
--- /dev/null
+++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/AbstractTestcontainerTest.kt
@@ -0,0 +1,51 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor
+
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.PostgreSQLContainer
+import org.testcontainers.junit.jupiter.Container
+
+abstract class AbstractTestcontainerTest {
+
+ companion object {
+ @Container
+ val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine")
+ .withDatabaseName("test")
+ .withUsername("test")
+ .withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!")
+
+ @DynamicPropertySource
+ @JvmStatic
+ fun registerDynamicProperties(registry: DynamicPropertyRegistry) {
+ registry.add("spring.datasource.url", dbContainer::getJdbcUrl)
+ registry.add("spring.datasource.username", dbContainer::getUsername)
+ registry.add("spring.datasource.password", dbContainer::getPassword)
+ }
+ }
+
+}
+
+class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer<CustomPostgreSQLContainer>(dockerImageName) {
+ override fun stop() {
+ // Keep Testcontainer alive until JVM destroys it
+ }
+} \ No newline at end of file
diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt
new file mode 100644
index 0000000..c5a20bb
--- /dev/null
+++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/EtlProcessorApplicationTests.kt
@@ -0,0 +1,45 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor
+
+import dev.dnpm.etl.processor.output.MtbFileSender
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.boot.test.mock.mockito.MockBean
+import org.springframework.context.ApplicationContext
+import org.springframework.test.context.junit.jupiter.SpringExtension
+import org.testcontainers.junit.jupiter.Testcontainers
+
+@Testcontainers
+@ExtendWith(SpringExtension::class)
+@SpringBootTest
+@MockBean(MtbFileSender::class)
+class EtlProcessorApplicationTests : AbstractTestcontainerTest() {
+
+ @Test
+ fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) {
+ // Simply check bean configuration
+ assertThat(context).isNotNull
+ }
+
+}
diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt
new file mode 100644
index 0000000..8bdaa60
--- /dev/null
+++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt
@@ -0,0 +1,102 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.output.KafkaMtbFileSender
+import dev.dnpm.etl.processor.output.RestMtbFileSender
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Nested
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.assertThrows
+import org.springframework.beans.factory.NoSuchBeanDefinitionException
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.boot.test.mock.mockito.MockBean
+import org.springframework.boot.test.mock.mockito.MockBeans
+import org.springframework.context.ApplicationContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+
+@SpringBootTest
+@ContextConfiguration(classes = [KafkaAutoConfiguration::class, AppKafkaConfiguration::class, AppRestConfiguration::class])
+class AppConfigurationTest {
+
+ @Nested
+ @TestPropertySource(
+ properties = [
+ "app.rest.uri=http://localhost:9000"
+ ]
+ )
+ inner class AppConfigurationRestTest(private val context: ApplicationContext) {
+
+ @Test
+ fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() {
+ assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull
+ assertThrows<NoSuchBeanDefinitionException> { context.getBean(KafkaMtbFileSender::class.java) }
+ }
+
+ }
+
+ @Nested
+ @TestPropertySource(
+ properties = [
+ "app.kafka.servers=localhost:9092",
+ "app.kafka.topic=test",
+ "app.kafka.response-topic=test-response",
+ "app.kafka.group-id=test"
+ ]
+ )
+ @MockBeans(value = [
+ MockBean(ObjectMapper::class),
+ MockBean(RequestRepository::class)
+ ])
+ inner class AppConfigurationKafkaTest(private val context: ApplicationContext) {
+
+ @Test
+ fun shouldUseKafkaMtbFileSenderNotRestMtbFileSender() {
+ assertThrows<NoSuchBeanDefinitionException> { context.getBean(RestMtbFileSender::class.java) }
+ assertThat(context.getBean(KafkaMtbFileSender::class.java)).isNotNull
+ }
+
+ }
+
+ @Nested
+ @TestPropertySource(
+ properties = [
+ "app.rest.uri=http://localhost:9000",
+ "app.kafka.servers=localhost:9092",
+ "app.kafka.topic=test",
+ "app.kafka.response-topic=test-response",
+ "app.kafka.group-id=test"
+ ]
+ )
+ inner class AppConfigurationRestInPrecedenceTest(private val context: ApplicationContext) {
+
+ @Test
+ fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() {
+ assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull
+ assertThrows<NoSuchBeanDefinitionException> { context.getBean(KafkaMtbFileSender::class.java) }
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt
new file mode 100644
index 0000000..ff85296
--- /dev/null
+++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/services/RequestServiceIntegrationTest.kt
@@ -0,0 +1,134 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.AbstractTestcontainerTest
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import dev.dnpm.etl.processor.output.MtbFileSender
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.boot.test.mock.mockito.MockBean
+import org.springframework.test.context.junit.jupiter.SpringExtension
+import org.springframework.transaction.annotation.Transactional
+import org.testcontainers.junit.jupiter.Testcontainers
+import java.time.Instant
+import java.util.*
+
+@Testcontainers
+@ExtendWith(SpringExtension::class)
+@SpringBootTest
+@Transactional
+@MockBean(MtbFileSender::class)
+class RequestServiceIntegrationTest : AbstractTestcontainerTest() {
+
+ private lateinit var requestRepository: RequestRepository
+
+ private lateinit var requestService: RequestService
+
+ @BeforeEach
+ fun setup(
+ @Autowired requestRepository: RequestRepository
+ ) {
+ this.requestRepository = requestRepository
+ this.requestService = RequestService(requestRepository)
+ }
+
+ @Test
+ fun shouldResultInEmptyRequestList() {
+ val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901")
+
+ assertThat(actual).isEmpty()
+ }
+
+ private fun setupTestData() {
+ // Prepare DB
+ this.requestRepository.saveAll(
+ listOf(
+ Request(
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-07-07T02:00:00Z")
+ ),
+ // Should be ignored - wrong patient ID -->
+ Request(
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678902",
+ pid = "P2",
+ fingerprint = "0123456789abcdef2",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-08-08T00:00:00Z")
+ ),
+ // <--
+ Request(
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P2",
+ fingerprint = "0123456789abcdee1",
+ type = RequestType.DELETE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+ )
+ )
+ }
+
+ @Test
+ fun shouldResultInSortedRequestList() {
+ setupTestData()
+
+ val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901")
+
+ assertThat(actual).hasSize(2)
+ assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1")
+ assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1")
+ }
+
+ @Test
+ fun shouldReturnDeleteRequestAsLastRequest() {
+ setupTestData()
+
+ val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
+
+ assertThat(actual).isTrue()
+ }
+
+ @Test
+ fun shouldReturnLastMtbFileRequest() {
+ setupTestData()
+
+ val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901")
+
+ assertThat(actual).isNotNull
+ assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1")
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
index f13a034..91e465b 100644
--- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
+++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
@@ -69,13 +69,11 @@ import java.util.HashMap;
public class GpasPseudonymGenerator implements Generator {
+ private final static FhirContext r4Context = FhirContext.forR4();
private final String gPasUrl;
private final String psnTargetDomain;
- private static FhirContext r4Context = FhirContext.forR4();
private final HttpHeaders httpHeader;
-
private final RetryTemplate retryTemplate = defaultTemplate();
-
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
private SSLContext customSslContext;
@@ -110,12 +108,19 @@ public class GpasPseudonymGenerator implements Generator {
@NotNull
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
- Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
- .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
- .orElseGet(ParametersParameterComponent::new).getValue();
+ final var parameters = gPasPseudonymResult.getParameter().stream().findFirst();
+
+ if (parameters.isEmpty()) {
+ throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one");
+ }
+
+ final var identifier = (Identifier) parameters.get().getPart().stream()
+ .filter(a -> a.getName().equals("pseudonym"))
+ .findFirst()
+ .orElseGet(ParametersParameterComponent::new).getValue();
// pseudonym
- return pseudonym.getSystem() + "|" + pseudonym.getValue();
+ return identifier.getSystem() + "|" + identifier.getValue();
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
index 0c4ab68..5d28c97 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt
@@ -28,4 +28,3 @@ class EtlProcessorApplication
fun main(args: Array<String>) {
runApplication<EtlProcessorApplication>(*args)
}
-
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 64be70d..06e730b 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(AppConfigProperties.NAME)
data class AppConfigProperties(
- var bwhc_uri: String?,
+ var bwhcUri: String?,
var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN
) {
companion object {
@@ -48,7 +48,7 @@ data class GPasConfigProperties(
val password: String?,
val sslCaLocation: String?,
-) {
+ ) {
companion object {
const val NAME = "app.pseudonymize.gpas"
}
@@ -66,6 +66,8 @@ data class RestTargetProperties(
@ConfigurationProperties(KafkaTargetProperties.NAME)
data class KafkaTargetProperties(
val topic: String = "etl-processor",
+ val responseTopic: String = "${topic}_response",
+ val groupId: String = "${topic}_group",
val servers: String = ""
) {
companion object {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index c677f2b..6b15fc0 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -21,9 +21,6 @@ package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.ReportService
-import dev.dnpm.etl.processor.output.KafkaMtbFileSender
-import dev.dnpm.etl.processor.output.MtbFileSender
-import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
@@ -32,7 +29,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.core.KafkaTemplate
import reactor.core.publisher.Sinks
@Configuration
@@ -40,9 +36,7 @@ import reactor.core.publisher.Sinks
value = [
AppConfigProperties::class,
PseudonymizeConfigProperties::class,
- GPasConfigProperties::class,
- RestTargetProperties::class,
- KafkaTargetProperties::class
+ GPasConfigProperties::class
]
)
class AppConfiguration {
@@ -60,25 +54,13 @@ class AppConfiguration {
}
@Bean
- fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService {
+ fun pseudonymizeService(
+ generator: Generator,
+ pseudonymizeConfigProperties: PseudonymizeConfigProperties
+ ): PseudonymizeService {
return PseudonymizeService(generator, pseudonymizeConfigProperties)
}
- @ConditionalOnProperty(value = ["app.rest.uri"])
- @Bean
- fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender {
- return RestMtbFileSender(restTargetProperties)
- }
-
- @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
- @Bean
- fun kafkaMtbFileSender(
- kafkaTemplate: KafkaTemplate<String, String>,
- objectMapper: ObjectMapper
- ): MtbFileSender {
- return KafkaMtbFileSender(kafkaTemplate, objectMapper)
- }
-
@Bean
fun reportService(objectMapper: ObjectMapper): ReportService {
return ReportService(objectMapper)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
new file mode 100644
index 0000000..309ff2d
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -0,0 +1,79 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.output.KafkaMtbFileSender
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.annotation.Order
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.listener.ContainerProperties
+import org.springframework.kafka.listener.KafkaMessageListenerContainer
+
+@Configuration
+@EnableConfigurationProperties(
+ value = [KafkaTargetProperties::class]
+)
+@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
+@ConditionalOnMissingBean(MtbFileSender::class)
+@Order(-5)
+class AppKafkaConfiguration {
+
+ private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java)
+
+ @Bean
+ fun kafkaMtbFileSender(
+ kafkaTemplate: KafkaTemplate<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
+ objectMapper: ObjectMapper
+ ): MtbFileSender {
+ logger.info("Selected 'KafkaMtbFileSender'")
+ return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
+ }
+
+ @Bean
+ fun kafkaListenerContainer(
+ consumerFactory: ConsumerFactory<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
+ kafkaResponseProcessor: KafkaResponseProcessor
+ ): KafkaMessageListenerContainer<String, String> {
+ val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
+ containerProperties.messageListener = kafkaResponseProcessor
+ return KafkaMessageListenerContainer(consumerFactory, containerProperties)
+ }
+
+ @Bean
+ fun kafkaResponseProcessor(
+ applicationEventPublisher: ApplicationEventPublisher,
+ objectMapper: ObjectMapper
+ ): KafkaResponseProcessor {
+ return KafkaResponseProcessor(applicationEventPublisher, objectMapper)
+ }
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
new file mode 100644
index 0000000..a830597
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
@@ -0,0 +1,58 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.config
+
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.output.RestMtbFileSender
+import org.slf4j.LoggerFactory
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.core.annotation.Order
+import org.springframework.web.client.RestTemplate
+
+@Configuration
+@EnableConfigurationProperties(
+ value = [
+ RestTargetProperties::class
+ ]
+)
+@ConditionalOnProperty(value = ["app.rest.uri"])
+@ConditionalOnMissingBean(MtbFileSender::class)
+@Order(-10)
+class AppRestConfiguration {
+
+ private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java)
+
+ @Bean
+ fun restTemplate(): RestTemplate {
+ return RestTemplate()
+ }
+
+ @Bean
+ fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
+ logger.info("Selected 'RestMtbFileSender'")
+ return RestMtbFileSender(restTemplate, restTargetProperties)
+ }
+
+}
+
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
index 6ee8ae9..ae36705 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ReportService.kt
@@ -19,7 +19,9 @@
package dev.dnpm.etl.processor.monitoring
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonValue
+import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
@@ -33,15 +35,22 @@ class ReportService(
}
return try {
objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues
- } catch (e: JsonMappingException) {
- e.printStackTrace()
- listOf()
+ } catch (e: Exception) {
+ val otherIssue =
+ Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'")
+ return when (e) {
+ is JsonMappingException -> listOf(otherIssue)
+ is JsonParseException -> listOf(otherIssue)
+ else -> throw e
+ }
}
}
+ @JsonIgnoreProperties(ignoreUnknown = true)
private data class DataQualityReport(val issues: List<Issue>)
+ @JsonIgnoreProperties(ignoreUnknown = true)
data class Issue(val severity: Severity, val message: String)
enum class Severity(@JsonValue val value: String) {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
index ecd8219..c1d4d43 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
@@ -36,9 +36,9 @@ data class Request(
val patientId: String,
val pid: String,
val fingerprint: String,
- val status: RequestStatus,
val type: RequestType,
- val processedAt: Instant = Instant.now(),
+ var status: RequestStatus,
+ var processedAt: Instant = Instant.now(),
@Embedded.Nullable var report: Report? = null
)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 55503cf..5772faf 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -20,11 +20,16 @@
package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
+import de.ukw.ccc.bwhc.dto.Consent
+import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
+ private val kafkaTargetProperties: KafkaTargetProperties,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -32,31 +37,60 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
- val result = kafkaTemplate.sendDefault(
- header(request),
- objectMapper.writeValueAsString(request.mtbFile)
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
-
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
}
- // TODO not yet implemented
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ val dummyMtbFile = MtbFile.builder()
+ .withConsent(
+ Consent.builder()
+ .withPatient(request.patientId)
+ .withStatus(Consent.Status.REJECTED)
+ .build()
+ )
+ .build()
+
+ return try {
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
+ )
+
+ if (result.get() != null) {
+ logger.debug("Sent deletion request via KafkaMtbFileSender")
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
+ } else {
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
+ } catch (e: Exception) {
+ logger.error("An error occurred sending to kafka", e)
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
}
- private fun header(request: MtbFileSender.MtbFileRequest): String {
+ private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
- "\"eid\": \"${request.mtbFile.episode.id}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ "\"eid\": \"${request.mtbFile.episode.id}\"}"
}
+
+ private fun key(request: MtbFileSender.DeleteRequest): String {
+ return "{\"pid\": \"${request.patientId}\"}"
+ }
+
+ data class Data(val requestId: String, val content: MtbFile)
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
index 6914ba5..de0efaa 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
@@ -20,22 +20,31 @@
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.springframework.http.HttpStatusCode
interface MtbFileSender {
fun send(request: MtbFileRequest): Response
fun send(request: DeleteRequest): Response
- data class Response(val status: ResponseStatus, val reason: String = "")
+ data class Response(val status: RequestStatus, val body: String = "")
data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile)
data class DeleteRequest(val requestId: String, val patientId: String)
- enum class ResponseStatus {
- SUCCESS,
- WARNING,
- ERROR,
- UNKNOWN
+}
+
+fun Int.asRequestStatus(): RequestStatus {
+ return when (this) {
+ 200 -> RequestStatus.SUCCESS
+ 201 -> RequestStatus.WARNING
+ in 400 .. 999 -> RequestStatus.ERROR
+ else -> RequestStatus.UNKNOWN
}
+}
+
+fun HttpStatusCode.asRequestStatus(): RequestStatus {
+ return this.value().asRequestStatus()
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 04c73ef..1c59f5c 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -20,20 +20,21 @@
package dev.dnpm.etl.processor.output
import dev.dnpm.etl.processor.config.RestTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
-import org.springframework.web.util.UriComponentsBuilder
-class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender {
+class RestMtbFileSender(
+ private val restTemplate: RestTemplate,
+ private val restTargetProperties: RestTargetProperties
+) : MtbFileSender {
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
- private val restTemplate = RestTemplate()
-
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
try {
val headers = HttpHeaders()
@@ -46,21 +47,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}")
+ return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
- return if (response.body?.contains("warning") == true) {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}")
- } else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
- }
+ return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
@@ -74,14 +71,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
String::class.java
)
logger.debug("Sent file via RestMtbFileSender")
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ return MtbFileSender.Response(RequestStatus.SUCCESS)
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
index 1a79850..ab8ce2f 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt
@@ -19,7 +19,6 @@
package dev.dnpm.etl.processor.pseudonym
-import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties
class PseudonymizeService(
@@ -27,38 +26,11 @@ class PseudonymizeService(
private val configProperties: PseudonymizeConfigProperties
) {
- fun pseudonymize(mtbFile: MtbFile): MtbFile {
- val patientPseudonym = patientPseudonym(mtbFile.patient.id)
-
- mtbFile.episode.patient = patientPseudonym
- mtbFile.carePlans.forEach { it.patient = patientPseudonym }
- mtbFile.patient.id = patientPseudonym
- mtbFile.claims.forEach { it.patient = patientPseudonym }
- mtbFile.consent.patient = patientPseudonym
- mtbFile.claimResponses.forEach { it.patient = patientPseudonym }
- mtbFile.diagnoses.forEach { it.patient = patientPseudonym }
- mtbFile.ecogStatus.forEach { it.patient = patientPseudonym }
- mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
- mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
- mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
- mtbFile.histologyReports.forEach { it.patient = patientPseudonym }
- mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
- mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
- mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } }
- mtbFile.ngsReports.forEach { it.patient = patientPseudonym }
- mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
- mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym }
- mtbFile.recommendations.forEach { it.patient = patientPseudonym }
- mtbFile.recommendations.forEach { it.patient = patientPseudonym }
- mtbFile.responses.forEach { it.patient = patientPseudonym }
- mtbFile.specimens.forEach { it.patient = patientPseudonym }
- mtbFile.specimens.forEach { it.patient = patientPseudonym }
-
- return mtbFile
- }
-
fun patientPseudonym(patientId: String): String {
- return "${configProperties.prefix}_${generator.generate(patientId)}"
+ return when (generator) {
+ is GpasPseudonymGenerator -> generator.generate(patientId)
+ else -> "${configProperties.prefix}_${generator.generate(patientId)}"
+ }
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt
new file mode 100644
index 0000000..c0050a4
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/extensions.kt
@@ -0,0 +1,50 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.pseudonym
+
+import de.ukw.ccc.bwhc.dto.MtbFile
+
+infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) {
+ val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id)
+
+ this.episode.patient = patientPseudonym
+ this.carePlans.forEach { it.patient = patientPseudonym }
+ this.patient.id = patientPseudonym
+ this.claims.forEach { it.patient = patientPseudonym }
+ this.consent.patient = patientPseudonym
+ this.claimResponses.forEach { it.patient = patientPseudonym }
+ this.diagnoses.forEach { it.patient = patientPseudonym }
+ this.ecogStatus.forEach { it.patient = patientPseudonym }
+ this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
+ this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
+ this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
+ this.histologyReports.forEach { it.patient = patientPseudonym }
+ this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
+ this.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
+ this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } }
+ this.ngsReports.forEach { it.patient = patientPseudonym }
+ this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
+ this.rebiopsyRequests.forEach { it.patient = patientPseudonym }
+ this.recommendations.forEach { it.patient = patientPseudonym }
+ this.recommendations.forEach { it.patient = patientPseudonym }
+ this.responses.forEach { it.patient = patientPseudonym }
+ this.specimens.forEach { it.patient = patientPseudonym }
+ this.specimens.forEach { it.patient = patientPseudonym }
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index 8588ebe..3cd912c 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -21,169 +21,117 @@ package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.monitoring.*
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
+import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
-import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
-import reactor.core.publisher.Sinks
+import java.time.Instant
import java.util.*
@Service
class RequestProcessor(
private val pseudonymizeService: PseudonymizeService,
- private val senders: List<MtbFileSender>,
- private val requestRepository: RequestRepository,
+ private val sender: MtbFileSender,
+ private val requestService: RequestService,
private val objectMapper: ObjectMapper,
- private val statisticsUpdateProducer: Sinks.Many<Any>
+ private val applicationEventPublisher: ApplicationEventPublisher
) {
- private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
-
- fun processMtbFile(mtbFile: MtbFile): RequestStatus {
+ fun processMtbFile(mtbFile: MtbFile) {
+ val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
- val pseudonymized = pseudonymizeService.pseudonymize(mtbFile)
- val lastRequestForPatient =
- requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id)
- .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
+ mtbFile pseudonymizeWith pseudonymizeService
- if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) {
- requestRepository.save(
- Request(
- patientId = pseudonymized.patient.id,
- pid = pid,
- fingerprint = fingerprint(mtbFile),
- status = RequestStatus.DUPLICATION,
- type = RequestType.MTB_FILE,
- report = Report("Duplikat erkannt - keine Daten weitergeleitet")
- )
- )
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
- return RequestStatus.DUPLICATION
- }
+ val request = MtbFileSender.MtbFileRequest(requestId, mtbFile)
- val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized)
+ requestService.save(
+ Request(
+ uuid = requestId,
+ patientId = request.mtbFile.patient.id,
+ pid = pid,
+ fingerprint = fingerprint(request.mtbFile),
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.MTB_FILE
+ )
+ )
- val responses = senders.map {
- val responseStatus = it.send(request)
- if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
- logger.info(
- "Sent file for Patient '{}' using '{}'",
- pseudonymized.patient.id,
- it.javaClass.simpleName
+ if (isDuplication(mtbFile)) {
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ RequestStatus.DUPLICATION
)
- } else {
- logger.error(
- "Error sending file for Patient '{}' using '{}'",
- pseudonymized.patient.id,
- it.javaClass.simpleName
- )
- }
- responseStatus
+ )
+ return
}
- val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
- RequestStatus.ERROR
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) {
- RequestStatus.WARNING
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
- RequestStatus.SUCCESS
- } else {
- RequestStatus.UNKNOWN
- }
+ val responseStatus = sender.send(request)
- requestRepository.save(
- Request(
- uuid = request.requestId,
- patientId = request.mtbFile.patient.id,
- pid = pid,
- fingerprint = fingerprint(request.mtbFile),
- status = requestStatus,
- type = RequestType.MTB_FILE,
- report = when (requestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten",
- responses.joinToString("\n") { it.reason })
-
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
}
)
)
+ }
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
+ val lastMtbFileRequestForPatient =
+ requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id)
+ val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id)
- return requestStatus
+ return null != lastMtbFileRequestForPatient
+ && !isLastRequestDeletion
+ && lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile)
}
- fun processDeletion(patientId: String): RequestStatus {
+ fun processDeletion(patientId: String) {
val requestId = UUID.randomUUID().toString()
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
- val responses = senders.map {
- val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
- when (responseStatus.status) {
- MtbFileSender.ResponseStatus.SUCCESS -> {
- logger.info(
- "Sent delete for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
-
- MtbFileSender.ResponseStatus.ERROR -> {
- logger.error(
- "Error deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
-
- else -> {
- logger.error(
- "Unknown result on deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- it.javaClass.simpleName
- )
- }
- }
- responseStatus
- }
-
- val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
- RequestStatus.ERROR
- } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
- RequestStatus.SUCCESS
- } else {
- RequestStatus.UNKNOWN
- }
-
- requestRepository.save(
+ requestService.save(
Request(
uuid = requestId,
patientId = patientPseudonym,
pid = patientId,
fingerprint = fingerprint(patientPseudonym),
- status = overallRequestStatus,
- type = RequestType.DELETE,
- report = when (overallRequestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
- }
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.DELETE
)
)
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
+
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
+ }
+ )
+ )
- return overallRequestStatus
} catch (e: Exception) {
- requestRepository.save(
+ requestService.save(
Request(
uuid = requestId,
patientId = "???",
@@ -194,10 +142,6 @@ class RequestProcessor(
report = Report("Fehler bei der Pseudonymisierung")
)
)
-
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
-
- return RequestStatus.ERROR
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt
new file mode 100644
index 0000000..e0043d2
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestService.kt
@@ -0,0 +1,57 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import org.springframework.stereotype.Service
+
+@Service
+class RequestService(
+ private val requestRepository: RequestRepository
+) {
+
+ fun save(request: Request) = requestRepository.save(request)
+
+ fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository
+ .findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym)
+
+ fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) =
+ Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym))
+
+ fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) =
+ Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym))
+
+ companion object {
+
+ fun lastMtbFileRequestForPatientPseudonym(allRequests: List<Request>) = allRequests
+ .filter { it.type == RequestType.MTB_FILE }
+ .sortedByDescending { it.processedAt }
+ .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
+
+ fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests
+ .filter { it.status != RequestStatus.UNKNOWN }
+ .maxByOrNull { it.processedAt }?.type == RequestType.DELETE
+
+ }
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
new file mode 100644
index 0000000..4048348
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
@@ -0,0 +1,94 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.slf4j.LoggerFactory
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import reactor.core.publisher.Sinks
+import java.time.Instant
+import java.util.*
+
+@Service
+class ResponseProcessor(
+ private val requestRepository: RequestRepository,
+ private val statisticsUpdateProducer: Sinks.Many<Any>
+) {
+
+ private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
+
+ @EventListener(classes = [ResponseEvent::class])
+ fun handleResponseEvent(event: ResponseEvent) {
+ requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({
+ it.processedAt = event.timestamp
+ it.status = event.status
+
+ when (event.status) {
+ RequestStatus.SUCCESS -> {
+ it.report = Report(
+ "Keine Probleme erkannt",
+ )
+ }
+
+ RequestStatus.WARNING -> {
+ it.report = Report(
+ "Warnungen über mangelhafte Daten",
+ event.body.orElse("")
+ )
+ }
+
+ RequestStatus.ERROR -> {
+ it.report = Report(
+ "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
+ event.body.orElse("")
+ )
+ }
+
+ RequestStatus.DUPLICATION -> {
+ it.report = Report(
+ "Duplikat erkannt"
+ )
+ }
+
+ else -> {
+ logger.error("Cannot process response: Unknown response!")
+ return@ifPresentOrElse
+ }
+ }
+
+ requestRepository.save(it)
+
+ statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ }, {
+ logger.error("Response for unknown request '${event.requestUuid}'!")
+ })
+ }
+
+}
+
+data class ResponseEvent(
+ val requestUuid: String,
+ val timestamp: Instant,
+ val status: RequestStatus,
+ val body: Optional<String> = Optional.empty()
+) \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
new file mode 100644
index 0000000..a29010f
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -0,0 +1,80 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services.kafka
+
+import com.fasterxml.jackson.annotation.JsonAlias
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.output.asRequestStatus
+import dev.dnpm.etl.processor.services.ResponseEvent
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.kafka.listener.MessageListener
+import java.time.Instant
+import java.util.*
+
+class KafkaResponseProcessor(
+ private val eventPublisher: ApplicationEventPublisher,
+ private val objectMapper: ObjectMapper
+) : MessageListener<String, String> {
+
+ private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java)
+
+ override fun onMessage(data: ConsumerRecord<String, String>) {
+ try {
+ Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
+ } catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
+ Optional.empty()
+ }.ifPresentOrElse({ responseBody ->
+ val event = ResponseEvent(
+ responseBody.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ responseBody.statusCode.asRequestStatus(),
+ when (responseBody.statusCode.asRequestStatus()) {
+ RequestStatus.SUCCESS -> {
+ Optional.empty()
+ }
+
+ RequestStatus.WARNING, RequestStatus.ERROR -> {
+ Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
+ }
+
+ else -> {
+ logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
+ Optional.empty()
+ }
+ }
+ )
+ eventPublisher.publishEvent(event)
+ }, {
+ logger.error("No requestId in Kafka response")
+ })
+ }
+
+ data class ResponseBody(
+ @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
+ @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
+ @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
+ )
+
+} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt
index a2cc953..9b441f6 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileRestController.kt
@@ -19,40 +19,37 @@
package dev.dnpm.etl.processor.web
+import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.services.RequestProcessor
import org.slf4j.LoggerFactory
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
@RestController
-class MtbFileController(
+class MtbFileRestController(
private val requestProcessor: RequestProcessor,
) {
- private val logger = LoggerFactory.getLogger(MtbFileController::class.java)
+ private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java)
@PostMapping(path = ["/mtbfile"])
fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Void> {
- val requestStatus = requestProcessor.processMtbFile(mtbFile)
-
- return if (requestStatus == RequestStatus.ERROR) {
- ResponseEntity.unprocessableEntity().build()
+ if (mtbFile.consent.status == Consent.Status.ACTIVE) {
+ logger.debug("Accepted MTB File for processing")
+ requestProcessor.processMtbFile(mtbFile)
} else {
- ResponseEntity.noContent().build()
+ logger.debug("Accepted MTB File and process deletion")
+ requestProcessor.processDeletion(mtbFile.patient.id)
}
+ return ResponseEntity.accepted().build()
}
@DeleteMapping(path = ["/mtbfile/{patientId}"])
fun deleteData(@PathVariable patientId: String): ResponseEntity<Void> {
- val requestStatus = requestProcessor.processDeletion(patientId)
-
- return if (requestStatus == RequestStatus.ERROR) {
- ResponseEntity.unprocessableEntity().build()
- } else {
- ResponseEntity.noContent().build()
- }
+ logger.debug("Accepted patient ID to process deletion")
+ requestProcessor.processDeletion(patientId)
+ return ResponseEntity.accepted().build()
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
index a418772..6f0e820 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
@@ -83,9 +83,9 @@ class StatisticsRestController(
.groupBy { formatter.format(it.processedAt) }
.map {
val requestList = it.value
- .groupBy { it.status }
- .map {
- Pair(it.key, it.value.size)
+ .groupBy { request -> request.status }
+ .map { request ->
+ Pair(request.key, request.value.size)
}
.toMap()
Pair(
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 99e4bbf..a60cd8a 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -4,11 +4,15 @@ spring:
file: ./dev-compose.yml
app:
- rest:
- uri: http://localhost:9000/bwhc/etl/api/MTBFile
- #kafka:
- # topic: test
- # servers: kafka:9092
+ #rest:
+ # uri: http://localhost:9000/bwhc/etl/api
+
+ # Note: Make sure, hostname "kafka" points to 127.0.0.1
+ # otherwise connection will not be available
+ kafka:
+ topic: test
+ response-topic: test_response
+ servers: kafka:9092
server:
port: 8000
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 39acb37..5cd47c0 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,7 +1,7 @@
spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
- template:
- default-topic: ${app.kafka.topic}
+ consumer:
+ group-id: ${app.kafka.group-id}
flyway:
locations: "classpath:db/migration/{vendor}" \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
new file mode 100644
index 0000000..3ec9757
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
@@ -0,0 +1,173 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.output
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import de.ukw.ccc.bwhc.dto.*
+import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+import org.mockito.ArgumentMatchers.anyString
+import org.mockito.Mock
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.*
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.SendResult
+import java.util.concurrent.CompletableFuture.completedFuture
+import java.util.concurrent.ExecutionException
+
+@ExtendWith(MockitoExtension::class)
+class KafkaMtbFileSenderTest {
+
+ private lateinit var kafkaTemplate: KafkaTemplate<String, String>
+
+ private lateinit var kafkaMtbFileSender: KafkaMtbFileSender
+
+ private lateinit var objectMapper: ObjectMapper
+
+ @BeforeEach
+ fun setup(
+ @Mock kafkaTemplate: KafkaTemplate<String, String>
+ ) {
+ val kafkaTargetProperties = KafkaTargetProperties("testtopic")
+ this.objectMapper = ObjectMapper()
+ this.kafkaTemplate = kafkaTemplate
+
+ this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
+ }
+
+ @ParameterizedTest
+ @MethodSource("requestWithResponseSource")
+ fun shouldSendMtbFileRequestAndReturnExpectedState(testData: TestData) {
+ doAnswer {
+ if (null != testData.exception) {
+ throw testData.exception
+ }
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ val response = kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
+ assertThat(response.status).isEqualTo(testData.requestStatus)
+ }
+
+ @ParameterizedTest
+ @MethodSource("requestWithResponseSource")
+ fun shouldSendDeleteRequestAndReturnExpectedState(testData: TestData) {
+ doAnswer {
+ if (null != testData.exception) {
+ throw testData.exception
+ }
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ val response = kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
+ assertThat(response.status).isEqualTo(testData.requestStatus)
+ }
+
+ @Test
+ fun shouldSendMtbFileRequestWithCorrectKeyAndBody() {
+ doAnswer {
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
+
+ val captor = argumentCaptor<String>()
+ verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
+ assertThat(captor.firstValue).isNotNull
+ assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
+ assertThat(captor.secondValue).isNotNull
+ assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
+ }
+
+ @Test
+ fun shouldSendDeleteRequestWithCorrectKeyAndBody() {
+ doAnswer {
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
+
+ val captor = argumentCaptor<String>()
+ verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
+ assertThat(captor.firstValue).isNotNull
+ assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
+ assertThat(captor.secondValue).isNotNull
+ assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
+ }
+
+ companion object {
+ fun mtbFile(consentStatus: Consent.Status): MtbFile {
+ return if (consentStatus == Consent.Status.ACTIVE) {
+ MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("PID")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(consentStatus)
+ .withPatient("PID")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("PID")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ } else {
+ MtbFile.builder()
+ .withConsent(
+ Consent.builder()
+ .withStatus(consentStatus)
+ .withPatient("PID")
+ .build()
+ )
+ }.build()
+ }
+
+ fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
+ return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
+ }
+
+ data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
+
+ @JvmStatic
+ fun requestWithResponseSource(): Set<TestData> {
+ return setOf(
+ TestData(RequestStatus.UNKNOWN),
+ TestData(RequestStatus.ERROR, InterruptedException("Test interrupted")),
+ TestData(RequestStatus.ERROR, ExecutionException(RuntimeException("Test execution aborted")))
+ )
+ }
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt
new file mode 100644
index 0000000..0cad285
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt
@@ -0,0 +1,195 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.output
+
+import de.ukw.ccc.bwhc.dto.*
+import dev.dnpm.etl.processor.config.RestTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+import org.springframework.http.HttpMethod
+import org.springframework.http.HttpStatus
+import org.springframework.test.web.client.MockRestServiceServer
+import org.springframework.test.web.client.match.MockRestRequestMatchers.method
+import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
+import org.springframework.test.web.client.response.MockRestResponseCreators.withStatus
+import org.springframework.web.client.RestTemplate
+
+class RestMtbFileSenderTest {
+
+ private lateinit var mockRestServiceServer: MockRestServiceServer
+
+ private lateinit var restMtbFileSender: RestMtbFileSender
+
+ @BeforeEach
+ fun setup() {
+ val restTemplate = RestTemplate()
+ val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
+
+ this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
+
+ this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties)
+ }
+
+ @ParameterizedTest
+ @MethodSource("deleteRequestWithResponseSource")
+ fun shouldReturnExpectedResponseForDelete(requestWithResponse: RequestWithResponse) {
+ this.mockRestServiceServer.expect {
+ method(HttpMethod.DELETE)
+ requestTo("/mtbfile")
+ }.andRespond {
+ withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
+ }
+
+ val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
+ assertThat(response.status).isEqualTo(requestWithResponse.response.status)
+ assertThat(response.body).isEqualTo(requestWithResponse.response.body)
+ }
+
+ @ParameterizedTest
+ @MethodSource("mtbFileRequestWithResponseSource")
+ fun shouldReturnExpectedResponseForMtbFilePost(requestWithResponse: RequestWithResponse) {
+ this.mockRestServiceServer.expect {
+ method(HttpMethod.POST)
+ requestTo("/mtbfile")
+ }.andRespond {
+ withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
+ }
+
+ val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
+ assertThat(response.status).isEqualTo(requestWithResponse.response.status)
+ assertThat(response.body).isEqualTo(requestWithResponse.response.body)
+ }
+
+ companion object {
+ data class RequestWithResponse(
+ val httpStatus: HttpStatus,
+ val body: String,
+ val response: MtbFileSender.Response
+ )
+
+ private val warningBody = """
+ {
+ "patient_id": "PID",
+ "issues": [
+ { "severity": "warning", "message": "Something is not right" }
+ ]
+ }
+ """.trimIndent()
+
+ private val errorBody = """
+ {
+ "patient_id": "PID",
+ "issues": [
+ { "severity": "error", "message": "Something is very bad" }
+ ]
+ }
+ """.trimIndent()
+
+ val mtbFile: MtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("PID")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("PID")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("PID")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung"
+
+ /**
+ * Synthetic http responses with related request status
+ * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
+ */
+ @JvmStatic
+ fun mtbFileRequestWithResponseSource(): Set<RequestWithResponse> {
+ return setOf(
+ RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")),
+ RequestWithResponse(
+ HttpStatus.CREATED,
+ warningBody,
+ MtbFileSender.Response(RequestStatus.WARNING, warningBody)
+ ),
+ RequestWithResponse(
+ HttpStatus.BAD_REQUEST,
+ "??",
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ ),
+ RequestWithResponse(
+ HttpStatus.UNPROCESSABLE_ENTITY,
+ errorBody,
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ ),
+ // Some more errors not mentioned in documentation
+ RequestWithResponse(
+ HttpStatus.NOT_FOUND,
+ "what????",
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ ),
+ RequestWithResponse(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ "what????",
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ )
+ )
+ }
+
+ /**
+ * Synthetic http responses with related request status
+ * Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
+ */
+ @JvmStatic
+ fun deleteRequestWithResponseSource(): Set<RequestWithResponse> {
+ return setOf(
+ RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)),
+ // Some more errors not mentioned in documentation
+ RequestWithResponse(
+ HttpStatus.NOT_FOUND,
+ "what????",
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ ),
+ RequestWithResponse(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ "what????",
+ MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
+ )
+ )
+ }
+ }
+
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt
new file mode 100644
index 0000000..a30a328
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeServiceTest.kt
@@ -0,0 +1,86 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.pseudonym
+
+import de.ukw.ccc.bwhc.dto.*
+import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.ArgumentMatchers.anyString
+import org.mockito.Mock
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.doAnswer
+import org.mockito.kotlin.whenever
+
+@ExtendWith(MockitoExtension::class)
+class PseudonymizeServiceTest {
+
+ private val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("123")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("123")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("123")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ @Test
+ fun shouldNotUsePseudonymPrefixForGpas(@Mock generator: GpasPseudonymGenerator) {
+ doAnswer {
+ it.arguments[0]
+ }.whenever(generator).generate(anyString())
+
+ val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties())
+
+ mtbFile.pseudonymizeWith(pseudonymizeService)
+
+ assertThat(mtbFile.patient.id).isEqualTo("123")
+ }
+
+ @Test
+ fun shouldUsePseudonymPrefixForBuiltin(@Mock generator: AnonymizingGenerator) {
+ doAnswer {
+ it.arguments[0]
+ }.whenever(generator).generate(anyString())
+
+ val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties())
+
+ mtbFile.pseudonymizeWith(pseudonymizeService)
+
+ assertThat(mtbFile.patient.id).isEqualTo("UNKNOWN_123")
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt
new file mode 100644
index 0000000..70efe2b
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ReportServiceTest.kt
@@ -0,0 +1,70 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import dev.dnpm.etl.processor.monitoring.ReportService
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+
+class ReportServiceTest {
+
+ private lateinit var reportService: ReportService
+
+ @BeforeEach
+ fun setup() {
+ this.reportService = ReportService(ObjectMapper().registerModule(KotlinModule.Builder().build()))
+ }
+
+ @Test
+ fun shouldParseDataQualityReport() {
+ val json = """
+ {
+ "patient": "4711",
+ "issues": [
+ { "severity": "warning", "message": "Warning Message" },
+ { "severity": "error", "message": "Error Message" }
+ ]
+ }
+ """.trimIndent()
+
+ val actual = this.reportService.deserialize(json)
+
+ assertThat(actual).hasSize(2)
+ assertThat(actual[0].severity).isEqualTo(ReportService.Severity.WARNING)
+ assertThat(actual[0].message).isEqualTo("Warning Message")
+ assertThat(actual[1].severity).isEqualTo(ReportService.Severity.ERROR)
+ assertThat(actual[1].message).isEqualTo("Error Message")
+ }
+
+ @Test
+ fun shouldReturnSyntheticDataQualityReportOnParserError() {
+ val invalidResponse = "Invalid Response Data"
+
+ val actual = this.reportService.deserialize(invalidResponse)
+
+ assertThat(actual).hasSize(1)
+ assertThat(actual[0].severity).isEqualTo(ReportService.Severity.ERROR)
+ assertThat(actual[0].message).isEqualTo("Not parsable data quality report '$invalidResponse'")
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt
new file mode 100644
index 0000000..7856833
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt
@@ -0,0 +1,372 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import de.ukw.ccc.bwhc.dto.*
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import dev.dnpm.etl.processor.output.MtbFileSender
+import dev.dnpm.etl.processor.output.RestMtbFileSender
+import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.ArgumentMatchers.anyString
+import org.mockito.Mock
+import org.mockito.Mockito.*
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.any
+import org.mockito.kotlin.argumentCaptor
+import org.springframework.context.ApplicationEventPublisher
+import java.time.Instant
+import java.util.*
+
+
+@ExtendWith(MockitoExtension::class)
+class RequestProcessorTest {
+
+ private lateinit var pseudonymizeService: PseudonymizeService
+ private lateinit var sender: MtbFileSender
+ private lateinit var requestService: RequestService
+ private lateinit var applicationEventPublisher: ApplicationEventPublisher
+
+ private lateinit var requestProcessor: RequestProcessor
+
+ @BeforeEach
+ fun setup(
+ @Mock pseudonymizeService: PseudonymizeService,
+ @Mock sender: RestMtbFileSender,
+ @Mock requestService: RequestService,
+ @Mock applicationEventPublisher: ApplicationEventPublisher
+ ) {
+ this.pseudonymizeService = pseudonymizeService
+ this.sender = sender
+ this.requestService = requestService
+ this.applicationEventPublisher = applicationEventPublisher
+
+ val objectMapper = ObjectMapper()
+
+ requestProcessor = RequestProcessor(
+ pseudonymizeService,
+ sender,
+ requestService,
+ objectMapper,
+ applicationEventPublisher
+ )
+ }
+
+ @Test
+ fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() {
+ doAnswer {
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+ }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
+
+ doAnswer {
+ false
+ }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
+
+ doAnswer {
+ it.arguments[0] as String
+ }.`when`(pseudonymizeService).patientPseudonym(any())
+
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("1")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("123")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("1")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ this.requestProcessor.processMtbFile(mtbFile)
+
+ val requestCaptor = argumentCaptor<Request>()
+ verify(requestService, times(1)).save(requestCaptor.capture())
+ assertThat(requestCaptor.firstValue).isNotNull
+ assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN)
+ }
+
+ @Test
+ fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() {
+ doAnswer {
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+ }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
+
+ doAnswer {
+ false
+ }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
+
+ doAnswer {
+ it.arguments[0] as String
+ }.`when`(pseudonymizeService).patientPseudonym(any())
+
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("1")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("123")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("1")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ this.requestProcessor.processMtbFile(mtbFile)
+
+ val eventCaptor = argumentCaptor<ResponseEvent>()
+ verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
+ assertThat(eventCaptor.firstValue).isNotNull
+ assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION)
+ }
+
+ @Test
+ fun testShouldSendMtbFileAndSendSuccessEvent() {
+ doAnswer {
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "different",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+ }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
+
+ doAnswer {
+ false
+ }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
+
+ doAnswer {
+ MtbFileSender.Response(status = RequestStatus.SUCCESS)
+ }.`when`(sender).send(any<MtbFileSender.MtbFileRequest>())
+
+ doAnswer {
+ it.arguments[0] as String
+ }.`when`(pseudonymizeService).patientPseudonym(any())
+
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("1")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("123")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("1")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ this.requestProcessor.processMtbFile(mtbFile)
+
+ val eventCaptor = argumentCaptor<ResponseEvent>()
+ verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
+ assertThat(eventCaptor.firstValue).isNotNull
+ assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS)
+ }
+
+ @Test
+ fun testShouldSendMtbFileAndSendErrorEvent() {
+ doAnswer {
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "different",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+ }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
+
+ doAnswer {
+ false
+ }.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
+
+ doAnswer {
+ MtbFileSender.Response(status = RequestStatus.ERROR)
+ }.`when`(sender).send(any<MtbFileSender.MtbFileRequest>())
+
+ doAnswer {
+ it.arguments[0] as String
+ }.`when`(pseudonymizeService).patientPseudonym(any())
+
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("1")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("123")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("1")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ this.requestProcessor.processMtbFile(mtbFile)
+
+ val eventCaptor = argumentCaptor<ResponseEvent>()
+ verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
+ assertThat(eventCaptor.firstValue).isNotNull
+ assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
+ }
+
+ @Test
+ fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() {
+ doAnswer {
+ "PSEUDONYM"
+ }.`when`(pseudonymizeService).patientPseudonym(anyString())
+
+ doAnswer {
+ MtbFileSender.Response(status = RequestStatus.UNKNOWN)
+ }.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
+
+ this.requestProcessor.processDeletion("TEST_12345678901")
+
+ val requestCaptor = argumentCaptor<Request>()
+ verify(requestService, times(1)).save(requestCaptor.capture())
+ assertThat(requestCaptor.firstValue).isNotNull
+ assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN)
+ }
+
+ @Test
+ fun testShouldSendDeleteRequestAndSendSuccessEvent() {
+ doAnswer {
+ "PSEUDONYM"
+ }.`when`(pseudonymizeService).patientPseudonym(anyString())
+
+ doAnswer {
+ MtbFileSender.Response(status = RequestStatus.SUCCESS)
+ }.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
+
+ this.requestProcessor.processDeletion("TEST_12345678901")
+
+ val eventCaptor = argumentCaptor<ResponseEvent>()
+ verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
+ assertThat(eventCaptor.firstValue).isNotNull
+ assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS)
+ }
+
+ @Test
+ fun testShouldSendDeleteRequestAndSendErrorEvent() {
+ doAnswer {
+ "PSEUDONYM"
+ }.`when`(pseudonymizeService).patientPseudonym(anyString())
+
+ doAnswer {
+ MtbFileSender.Response(status = RequestStatus.ERROR)
+ }.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
+
+ this.requestProcessor.processDeletion("TEST_12345678901")
+
+ val eventCaptor = argumentCaptor<ResponseEvent>()
+ verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
+ assertThat(eventCaptor.firstValue).isNotNull
+ assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
+ }
+
+ @Test
+ fun testShouldSendDeleteRequestWithPseudonymErrorAndSaveErrorRequestStatus() {
+ doThrow(RuntimeException()).`when`(pseudonymizeService).patientPseudonym(anyString())
+
+ this.requestProcessor.processDeletion("TEST_12345678901")
+
+ val requestCaptor = argumentCaptor<Request>()
+ verify(requestService, times(1)).save(requestCaptor.capture())
+ assertThat(requestCaptor.firstValue).isNotNull
+ assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt
new file mode 100644
index 0000000..3cf8804
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestServiceTest.kt
@@ -0,0 +1,225 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.Mock
+import org.mockito.Mockito.*
+import org.mockito.junit.jupiter.MockitoExtension
+import java.time.Instant
+import java.util.*
+
+@ExtendWith(MockitoExtension::class)
+class RequestServiceTest {
+
+ private lateinit var requestRepository: RequestRepository
+
+ private lateinit var requestService: RequestService
+
+ private fun anyRequest() = any(Request::class.java) ?: Request(
+ id = 0L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_dummy",
+ pid = "PX",
+ fingerprint = "dummy",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-08-08T02:00:00Z")
+ )
+
+ @BeforeEach
+ fun setup(
+ @Mock requestRepository: RequestRepository
+ ) {
+ this.requestRepository = requestRepository
+ this.requestService = RequestService(requestRepository)
+ }
+
+ @Test
+ fun shouldIndicateLastRequestIsDeleteRequest() {
+ val requests = listOf(
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-07-07T00:00:00Z")
+ ),
+ Request(
+ id = 2L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdefd",
+ type = RequestType.DELETE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-07-07T02:00:00Z")
+ ),
+ Request(
+ id = 3L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.UNKNOWN,
+ processedAt = Instant.parse("2023-08-11T00:00:00Z")
+ )
+ )
+
+ val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
+
+ assertThat(actual).isTrue()
+ }
+
+ @Test
+ fun shouldIndicateLastRequestIsNotDeleteRequest() {
+ val requests = listOf(
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-07-07T00:00:00Z")
+ ),
+ Request(
+ id = 2L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-07-07T02:00:00Z")
+ ),
+ Request(
+ id = 3L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.UNKNOWN,
+ processedAt = Instant.parse("2023-08-11T00:00:00Z")
+ )
+ )
+
+ val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
+
+ assertThat(actual).isFalse()
+ }
+
+ @Test
+ fun shouldReturnPatientsLastRequest() {
+ val requests = listOf(
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.DELETE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-07-07T02:00:00Z")
+ ),
+ Request(
+ id = 1L,
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678902",
+ pid = "P2",
+ fingerprint = "0123456789abcdef2",
+ type = RequestType.MTB_FILE,
+ status = RequestStatus.WARNING,
+ processedAt = Instant.parse("2023-08-08T00:00:00Z")
+ )
+ )
+
+ val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests)
+
+ assertThat(actual).isInstanceOf(Request::class.java)
+ assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef2")
+ }
+
+ @Test
+ fun shouldReturnNullIfNoRequests() {
+ val requests = listOf<Request>()
+
+ val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests)
+
+ assertThat(actual).isNull()
+ }
+
+ @Test
+ fun saveShouldSaveRequestUsingRepository() {
+ doAnswer {
+ val obj = it.arguments[0] as Request
+ obj.copy(id = 1L)
+ }.`when`(requestRepository).save(anyRequest())
+
+ val request = Request(
+ uuid = UUID.randomUUID().toString(),
+ patientId = "TEST_12345678901",
+ pid = "P1",
+ fingerprint = "0123456789abcdef1",
+ type = RequestType.DELETE,
+ status = RequestStatus.SUCCESS,
+ processedAt = Instant.parse("2023-07-07T02:00:00Z")
+ )
+
+ requestService.save(request)
+
+ verify(requestRepository, times(1)).save(anyRequest())
+ }
+
+ @Test
+ fun allRequestsByPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() {
+ requestService.allRequestsByPatientPseudonym("TEST_12345678901")
+
+ verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
+ }
+
+ @Test
+ fun lastMtbFileRequestForPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() {
+ requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901")
+
+ verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
+ }
+
+ @Test
+ fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() {
+ requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
+
+ verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt
new file mode 100644
index 0000000..b9e4b7f
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt
@@ -0,0 +1,138 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.monitoring.RequestType
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+import org.mockito.ArgumentMatchers.anyString
+import org.mockito.Mock
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.*
+import reactor.core.publisher.Sinks
+import java.time.Instant
+import java.util.*
+
+@ExtendWith(MockitoExtension::class)
+class ResponseProcessorTest {
+
+ private lateinit var requestRepository: RequestRepository
+ private lateinit var statisticsUpdateProducer: Sinks.Many<Any>
+
+ private lateinit var responseProcessor: ResponseProcessor
+
+ private val testRequest = Request(
+ 1L,
+ "TestID1234",
+ "PSEUDONYM-A",
+ "1",
+ "dummyfingerprint",
+ RequestType.MTB_FILE,
+ RequestStatus.UNKNOWN
+ )
+
+ @BeforeEach
+ fun setup(
+ @Mock requestRepository: RequestRepository,
+ @Mock statisticsUpdateProducer: Sinks.Many<Any>
+ ) {
+ this.requestRepository = requestRepository
+ this.statisticsUpdateProducer = statisticsUpdateProducer
+
+ this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer)
+ }
+
+ @Test
+ fun shouldNotSaveStatusForUnknownRequest() {
+ doAnswer {
+ Optional.empty<Request>()
+ }.whenever(requestRepository).findByUuidEquals(anyString())
+
+ val event = ResponseEvent(
+ "TestID1234",
+ Instant.parse("2023-09-09T00:00:00Z"),
+ RequestStatus.SUCCESS
+ )
+
+ this.responseProcessor.handleResponseEvent(event)
+
+ verify(requestRepository, never()).save(any())
+ }
+
+ @Test
+ fun shouldNotSaveStatusWithUnknownState() {
+ doAnswer {
+ Optional.of(testRequest)
+ }.whenever(requestRepository).findByUuidEquals(anyString())
+
+ val event = ResponseEvent(
+ "TestID1234",
+ Instant.parse("2023-09-09T00:00:00Z"),
+ RequestStatus.UNKNOWN
+ )
+
+ this.responseProcessor.handleResponseEvent(event)
+
+ verify(requestRepository, never()).save(any())
+ }
+
+ @ParameterizedTest
+ @MethodSource("requestStatusSource")
+ fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) {
+ doAnswer {
+ Optional.of(testRequest)
+ }.whenever(requestRepository).findByUuidEquals(anyString())
+
+ val event = ResponseEvent(
+ "TestID1234",
+ Instant.parse("2023-09-09T00:00:00Z"),
+ requestStatus
+ )
+
+ this.responseProcessor.handleResponseEvent(event)
+
+ val captor = argumentCaptor<Request>()
+ verify(requestRepository, times(1)).save(captor.capture())
+ assertThat(captor.firstValue).isNotNull
+ assertThat(captor.firstValue.status).isEqualTo(requestStatus)
+ }
+
+ companion object {
+
+ @JvmStatic
+ fun requestStatusSource(): Set<RequestStatus> {
+ return setOf(
+ RequestStatus.SUCCESS,
+ RequestStatus.WARNING,
+ RequestStatus.ERROR,
+ RequestStatus.DUPLICATION
+ )
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt
new file mode 100644
index 0000000..95bf41b
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt
@@ -0,0 +1,149 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services.kafka
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import dev.dnpm.etl.processor.services.ResponseEvent
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+import org.mockito.Mock
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.any
+import org.mockito.kotlin.never
+import org.mockito.kotlin.times
+import org.mockito.kotlin.verify
+import org.springframework.context.ApplicationEventPublisher
+import org.springframework.http.HttpStatus
+
+@ExtendWith(MockitoExtension::class)
+class KafkaResponseProcessorTest {
+
+ private lateinit var eventPublisher: ApplicationEventPublisher
+ private lateinit var objectMapper: ObjectMapper
+
+ private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
+
+ private fun createKafkaRecord(
+ requestId: String,
+ statusCode: Int = 200,
+ statusBody: Map<String, Any>? = mapOf()
+ ): ConsumerRecord<String, String> {
+ return ConsumerRecord<String, String>(
+ "test-topic",
+ 0,
+ 0,
+ null,
+ if (statusBody == null) {
+ ""
+ } else {
+ this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
+ }
+ )
+ }
+
+ @BeforeEach
+ fun setup(
+ @Mock eventPublisher: ApplicationEventPublisher
+ ) {
+ this.eventPublisher = eventPublisher
+ this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build())
+
+ this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper)
+ }
+
+ @Test
+ fun shouldNotProcessRecordsWithoutRequestIdInBody() {
+ val record = ConsumerRecord<String, String>(
+ "test-topic",
+ 0,
+ 0,
+ null,
+ """
+ {
+ "statusCode": 200,
+ "statusBody": {}
+ }
+ """.trimIndent()
+ )
+
+ this.kafkaResponseProcessor.onMessage(record)
+
+ verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
+ }
+
+ @Test
+ fun shouldProcessRecordsWithAliasNames() {
+ val record = ConsumerRecord<String, String>(
+ "test-topic",
+ 0,
+ 0,
+ null,
+ """
+ {
+ "request_id": "test0123456789",
+ "status_code": 200,
+ "status_body": {}
+ }
+ """.trimIndent()
+ )
+
+ this.kafkaResponseProcessor.onMessage(record)
+
+ verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
+ }
+
+ @Test
+ fun shouldNotProcessRecordsWithoutValidStatusBody() {
+ this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
+
+ verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
+ }
+
+ @ParameterizedTest
+ @MethodSource("statusCodeSource")
+ fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) {
+ this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode))
+ verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
+ }
+
+ companion object {
+
+ @JvmStatic
+ fun statusCodeSource(): Set<Int> {
+ return setOf(
+ HttpStatus.OK,
+ HttpStatus.CREATED,
+ HttpStatus.BAD_REQUEST,
+ HttpStatus.NOT_FOUND,
+ HttpStatus.UNPROCESSABLE_ENTITY,
+ HttpStatus.INTERNAL_SERVER_ERROR
+ )
+ .map { it.value() }
+ .toSet()
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt
new file mode 100644
index 0000000..2fde35a
--- /dev/null
+++ b/src/test/kotlin/dev/dnpm/etl/processor/web/MtbFileRestControllerTest.kt
@@ -0,0 +1,150 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.web
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import de.ukw.ccc.bwhc.dto.*
+import dev.dnpm.etl.processor.services.RequestProcessor
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.mockito.Mock
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
+import org.mockito.junit.jupiter.MockitoExtension
+import org.mockito.kotlin.any
+import org.mockito.kotlin.argumentCaptor
+import org.springframework.http.MediaType
+import org.springframework.test.web.servlet.MockMvc
+import org.springframework.test.web.servlet.delete
+import org.springframework.test.web.servlet.post
+import org.springframework.test.web.servlet.setup.MockMvcBuilders
+
+@ExtendWith(MockitoExtension::class)
+class MtbFileRestControllerTest {
+
+ private lateinit var mockMvc: MockMvc
+
+ private lateinit var requestProcessor: RequestProcessor
+
+ private val objectMapper = ObjectMapper()
+
+ @BeforeEach
+ fun setup(
+ @Mock requestProcessor: RequestProcessor
+ ) {
+ this.requestProcessor = requestProcessor
+ val controller = MtbFileRestController(requestProcessor)
+ this.mockMvc = MockMvcBuilders.standaloneSetup(controller).build()
+ }
+
+ @Test
+ fun shouldProcessMtbFilePostRequest() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("TEST_12345678")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.ACTIVE)
+ .withPatient("TEST_12345678")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("TEST_12345678")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ mockMvc.post("/mtbfile") {
+ content = objectMapper.writeValueAsString(mtbFile)
+ contentType = MediaType.APPLICATION_JSON
+ }.andExpect {
+ status {
+ isAccepted()
+ }
+ }
+
+ verify(requestProcessor, times(1)).processMtbFile(any())
+ }
+
+ @Test
+ fun shouldProcessMtbFilePostRequestWithRejectedConsent() {
+ val mtbFile = MtbFile.builder()
+ .withPatient(
+ Patient.builder()
+ .withId("TEST_12345678")
+ .withBirthDate("2000-08-08")
+ .withGender(Patient.Gender.MALE)
+ .build()
+ )
+ .withConsent(
+ Consent.builder()
+ .withId("1")
+ .withStatus(Consent.Status.REJECTED)
+ .withPatient("TEST_12345678")
+ .build()
+ )
+ .withEpisode(
+ Episode.builder()
+ .withId("1")
+ .withPatient("TEST_12345678")
+ .withPeriod(PeriodStart("2023-08-08"))
+ .build()
+ )
+ .build()
+
+ mockMvc.post("/mtbfile") {
+ content = objectMapper.writeValueAsString(mtbFile)
+ contentType = MediaType.APPLICATION_JSON
+ }.andExpect {
+ status {
+ isAccepted()
+ }
+ }
+
+ val captor = argumentCaptor<String>()
+ verify(requestProcessor, times(1)).processDeletion(captor.capture())
+ assertThat(captor.firstValue).isEqualTo("TEST_12345678")
+ }
+
+ @Test
+ fun shouldProcessMtbFileDeleteRequest() {
+ mockMvc.delete("/mtbfile/TEST_12345678").andExpect {
+ status {
+ isAccepted()
+ }
+ }
+
+ val captor = argumentCaptor<String>()
+ verify(requestProcessor, times(1)).processDeletion(captor.capture())
+ assertThat(captor.firstValue).isEqualTo("TEST_12345678")
+ }
+
+} \ No newline at end of file