summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2023-07-31 09:07:14 +0200
committerGitHub2023-07-31 09:07:14 +0200
commitc8b2e50d47b0617aece257168b7a650315dc2fd5 (patch)
tree680f0007614931ee2b5164d949f9dc2d68500bc0
parented17a803bfea59274ac1d04663627111c0808fee (diff)
parent361bba5b65261bfba3dc160001a5036ece94d5d5 (diff)
Merge pull request #1 from CCC-MF/add-gPas-support
Add g pas support
-rw-r--r--.gitignore1
-rw-r--r--README.md5
-rw-r--r--build.gradle.kts3
-rw-r--r--dev/README_TEST_WITH_GPAS.md11
-rw-r--r--dev/docker-compose.dev.yml90
-rw-r--r--src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java237
-rw-r--r--src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java12
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt19
10 files changed, 371 insertions, 19 deletions
diff --git a/.gitignore b/.gitignore
index c2065bc..4ae22a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,3 +35,4 @@ out/
### VS Code ###
.vscode/
+/dev/gpas*
diff --git a/README.md b/README.md
index d8b86ca..d49ef02 100644
--- a/README.md
+++ b/README.md
@@ -8,8 +8,11 @@ Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet.
Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen.
* `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt
-* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz
+* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`)
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
+* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
+* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
+* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss.
## Mögliche Endpunkte
diff --git a/build.gradle.kts b/build.gradle.kts
index 3371bb4..eecd959 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -39,6 +39,9 @@ dependencies {
implementation("commons-codec:commons-codec")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
+ implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
+ implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
+ implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
runtimeOnly("org.postgresql:postgresql")
developmentOnly("org.springframework.boot:spring-boot-devtools")
diff --git a/dev/README_TEST_WITH_GPAS.md b/dev/README_TEST_WITH_GPAS.md
new file mode 100644
index 0000000..ff9f62b
--- /dev/null
+++ b/dev/README_TEST_WITH_GPAS.md
@@ -0,0 +1,11 @@
+# Test with gPAS
+1. Download from [Latest Docker-compose version of gPAS](https://www.ths-greifswald.de/gpas/#_download "")
+2. copy `./demo/demo_gpas.sql` into `./sqls` folder
+3. if needed change port mapping
+4. startup via `docker compose up -d`
+
+By default, PSN are created via `localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate` endpoint
+You can review created PSN via gPAs web interface running at `http://localhost:8080/gpas-web/`
+
+
+
diff --git a/dev/docker-compose.dev.yml b/dev/docker-compose.dev.yml
new file mode 100644
index 0000000..d7a436b
--- /dev/null
+++ b/dev/docker-compose.dev.yml
@@ -0,0 +1,90 @@
+version: '3.7'
+
+services:
+
+ zoo1:
+ image: zookeeper:3.8.0
+ hostname: zoo1
+ ports:
+ - "2181:2181"
+ environment:
+ ZOO_MY_ID: 1
+ ZOO_PORT: 2181
+ ZOO_SERVERS: server.1=zoo1:2888:3888;2181
+
+ kafka1:
+ image: confluentinc/cp-kafka:7.2.1
+ hostname: kafka1
+ ports:
+ - "9092:9092"
+ environment:
+ KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
+ KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
+ KAFKA_BROKER_ID: 1
+ KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ depends_on:
+ - zoo1
+
+ kafka-rest-proxy:
+ image: confluentinc/cp-kafka-rest:7.2.1
+ hostname: kafka-rest-proxy
+ ports:
+ - "8082:8082"
+ environment:
+ # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
+ KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
+ KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
+ KAFKA_REST_HOST_NAME: kafka-rest-proxy
+ KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
+ depends_on:
+ - zoo1
+ - kafka1
+
+ kafka-connect:
+ image: confluentinc/cp-kafka-connect:7.2.1
+ hostname: kafka-connect
+ ports:
+ - "8083:8083"
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
+ CONNECT_REST_PORT: 8083
+ CONNECT_GROUP_ID: compose-connect-group
+ CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
+ CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
+ CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
+ CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
+ CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
+ CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
+ CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
+ CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
+ CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
+ #volumes:
+ # - ./connectors:/etc/kafka-connect/jars/
+ depends_on:
+ - zoo1
+ - kafka1
+ - kafka-rest-proxy
+
+ akhq:
+ image: tchiotludo/akhq:0.21.0
+ environment:
+ AKHQ_CONFIGURATION: |
+ akhq:
+ connections:
+ docker-kafka-server:
+ properties:
+ bootstrap.servers: "kafka1:19092"
+ connect:
+ - name: "kafka-connect"
+ url: "http://kafka-connect:8083"
+ ports:
+ - "8084:8080"
+ depends_on:
+ - kafka1
+ - kafka-connect
diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
index 98f4ba6..f13a034 100644
--- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
+++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java
@@ -19,23 +19,244 @@
package dev.dnpm.etl.processor.pseudonym;
-import java.net.URI;
+import ca.uhn.fhir.context.FhirContext;
+import ca.uhn.fhir.parser.IParser;
+import dev.dnpm.etl.processor.config.GPasConfigProperties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
+import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
+import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
+import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
+import org.apache.hc.core5.http.config.Registry;
+import org.apache.hc.core5.http.config.RegistryBuilder;
+import org.hl7.fhir.r4.model.Identifier;
+import org.hl7.fhir.r4.model.Parameters;
+import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
+import org.hl7.fhir.r4.model.StringType;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.*;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.retry.RetryCallback;
+import org.springframework.retry.RetryContext;
+import org.springframework.retry.RetryListener;
+import org.springframework.retry.RetryPolicy;
+import org.springframework.retry.backoff.ExponentialBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Base64;
+import java.util.HashMap;
public class GpasPseudonymGenerator implements Generator {
- private final URI uri;
+ private final String gPasUrl;
+ private final String psnTargetDomain;
+ private static FhirContext r4Context = FhirContext.forR4();
+ private final HttpHeaders httpHeader;
+
+ private final RetryTemplate retryTemplate = defaultTemplate();
+
+ private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
+
+ private SSLContext customSslContext;
+ private RestTemplate restTemplate;
+
+ public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) {
+
+ this.gPasUrl = gpasCfg.getUri();
+ this.psnTargetDomain = gpasCfg.getTarget();
+ httpHeader = getHttpHeaders(gpasCfg.getUsername(), gpasCfg.getPassword());
- private final String target;
+ try {
+ if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) {
+ customSslContext = getSslContext(gpasCfg.getSslCaLocation());
+ }
+ } catch (IOException | KeyManagementException | KeyStoreException | CertificateException |
+ NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
- public GpasPseudonymGenerator(URI uri, String target) {
- this.uri = uri;
- this.target = target;
}
@Override
public String generate(String id) {
- // TODO Implement this
- return "?";
+ var gPasRequestBody = getGpasRequestBody(id);
+ var responseEntity = getGpasPseudonym(gPasRequestBody);
+ var gPasPseudonymResult = (Parameters) r4Context.newJsonParser()
+ .parseResource(responseEntity.getBody());
+
+ return unwrapPseudonym(gPasPseudonymResult);
+ }
+
+ @NotNull
+ public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
+ Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
+ .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
+ .orElseGet(ParametersParameterComponent::new).getValue();
+
+ // pseudonym
+ return pseudonym.getSystem() + "|" + pseudonym.getValue();
+ }
+
+
+ @NotNull
+ protected ResponseEntity<String> getGpasPseudonym(String gPasRequestBody) {
+
+ HttpEntity<String> requestEntity = new HttpEntity<>(gPasRequestBody, this.httpHeader);
+ ResponseEntity<String> responseEntity;
+ var restTemplate = getRestTemplete();
+
+ try {
+ responseEntity = retryTemplate.execute(
+ ctx -> restTemplate.exchange(gPasUrl, HttpMethod.POST, requestEntity,
+ String.class));
+
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode());
+ } else {
+ log.warn("API request unsuccessful. Response: {}", requestEntity.getBody());
+ throw new PseudonymRequestFailed("API request unsuccessful gPas unsuccessful.");
+ }
+
+ return responseEntity;
+ } catch (Exception unexpected) {
+ throw new PseudonymRequestFailed(
+ "API request due unexpected error unsuccessful gPas unsuccessful.", unexpected);
+ }
+ }
+
+ protected String getGpasRequestBody(String id) {
+ var requestParameters = new Parameters();
+ requestParameters.addParameter().setName("target")
+ .setValue(new StringType().setValue(psnTargetDomain));
+ requestParameters.addParameter().setName("original")
+ .setValue(new StringType().setValue(id));
+ final IParser iParser = r4Context.newJsonParser();
+ return iParser.encodeResourceToString(requestParameters);
+ }
+
+ @NotNull
+ protected HttpHeaders getHttpHeaders(String gPasUserName, String gPasPassword) {
+ var headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+
+ if (StringUtils.isBlank(gPasUserName) || StringUtils.isBlank(gPasPassword)) {
+ return headers;
+ }
+
+ String authHeader = gPasUserName + ":" + gPasPassword;
+ byte[] authHeaderBytes = authHeader.getBytes();
+ byte[] encodedAuthHeaderBytes = Base64.getEncoder().encode(authHeaderBytes);
+ String encodedAuthHeader = new String(encodedAuthHeaderBytes);
+
+ if (StringUtils.isNotBlank(gPasUserName) && StringUtils.isNotBlank(gPasPassword)) {
+ headers.set("Authorization", "Basic " + encodedAuthHeader);
+ }
+
+ return headers;
}
+ protected RetryTemplate defaultTemplate() {
+ RetryTemplate retryTemplate = new RetryTemplate();
+ ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
+ backOffPolicy.setInitialInterval(1000);
+ backOffPolicy.setMultiplier(1.25);
+ retryTemplate.setBackOffPolicy(backOffPolicy);
+ HashMap<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
+ retryableExceptions.put(RestClientException.class, true);
+ retryableExceptions.put(ConnectException.class, true);
+ RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
+ retryTemplate.setRetryPolicy(retryPolicy);
+
+ retryTemplate.registerListener(new RetryListener() {
+ @Override
+ public <T, E extends Throwable> void onError(RetryContext context,
+ RetryCallback<T, E> callback, Throwable throwable) {
+ log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(),
+ context.getRetryCount());
+ RetryListener.super.onError(context, callback, throwable);
+ }
+ });
+
+ return retryTemplate;
+ }
+
+ /**
+ * Read SSL root certificate and return SSLContext
+ *
+ * @param certificateLocation file location to root certificate (PEM)
+ * @return initialized SSLContext
+ * @throws IOException file cannot be read
+ * @throws CertificateException in case we have an invalid certificate of type X.509
+ * @throws KeyStoreException keystore cannot be initialized
+ * @throws NoSuchAlgorithmException missing trust manager algorithmus
+ * @throws KeyManagementException key management failed at init SSLContext
+ */
+ @Nullable
+ protected SSLContext getSslContext(String certificateLocation)
+ throws IOException, CertificateException, KeyStoreException, KeyManagementException, NoSuchAlgorithmException {
+
+ KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+
+ FileInputStream fis = new FileInputStream(certificateLocation);
+ X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509")
+ .generateCertificate(new BufferedInputStream(fis));
+
+ ks.load(null, null);
+ ks.setCertificateEntry(Integer.toString(1), ca);
+
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+ TrustManagerFactory.getDefaultAlgorithm());
+ tmf.init(ks);
+
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, tmf.getTrustManagers(), null);
+
+ return sslContext;
+ }
+
+ protected RestTemplate getRestTemplete() {
+
+ if (restTemplate != null) {
+ return restTemplate;
+ }
+
+ if (customSslContext == null) {
+ restTemplate = new RestTemplate();
+ return restTemplate;
+ }
+ final var sslsf = new SSLConnectionSocketFactory(customSslContext);
+ final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("https", sslsf).register("http", new PlainConnectionSocketFactory()).build();
+
+ final BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(
+ socketFactoryRegistry);
+ final CloseableHttpClient httpClient = HttpClients.custom()
+ .setConnectionManager(connectionManager).build();
+
+ final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
+ httpClient);
+ restTemplate = new RestTemplate(requestFactory);
+ return restTemplate;
+ }
}
diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java
new file mode 100644
index 0000000..79b4ba6
--- /dev/null
+++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java
@@ -0,0 +1,12 @@
+package dev.dnpm.etl.processor.pseudonym;
+
+public class PseudonymRequestFailed extends RuntimeException {
+
+ public PseudonymRequestFailed(String message) {
+ super(message);
+ }
+
+ public PseudonymRequestFailed(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
index 1575c39..0f257e8 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt
@@ -45,7 +45,11 @@ data class PseudonymizeConfigProperties(
@ConfigurationProperties(GPasConfigProperties.NAME)
data class GPasConfigProperties(
val uri: String?,
- val target: String = "etl-processor"
+ val target: String = "etl-processor",
+ val username: String?,
+ val password: String?,
+ val sslCaLocation: String?,
+
) {
companion object {
const val NAME = "app.pseudonymize.gpas"
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index 5c3add2..c677f2b 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -28,16 +28,12 @@ import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
-import org.reactivestreams.Publisher
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaTemplate
-import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
-import java.net.URI
-import java.time.Duration
@Configuration
@EnableConfigurationProperties(
@@ -54,7 +50,7 @@ class AppConfiguration {
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS")
@Bean
fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator {
- return GpasPseudonymGenerator(URI.create(configProperties.uri!!), configProperties.target)
+ return GpasPseudonymGenerator(configProperties)
}
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN", matchIfMissing = true)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 374c0af..9867deb 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -33,11 +33,22 @@ class KafkaMtbFileSender(
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
return try {
- kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile))
- logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ val result = kafkaTemplate.sendDefault(
+ String.format(
+ "{\"pid\": \"%s\", \"eid\": \"%s\"}",
+ mtbFile.patient.id,
+ mtbFile.episode.id
+ ), objectMapper.writeValueAsString(mtbFile)
+ )
+ if (result.get() != null) {
+ logger.debug("Sent file via KafkaMtbFileSender")
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ } else {
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ }
+
} catch (e: Exception) {
- logger.error("An error occured sending to kafka", e)
+ logger.error("An error occurred sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
}
}