summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt5
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt20
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt13
-rw-r--r--src/main/resources/application.yml2
4 files changed, 28 insertions, 12 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index f81d3fb..7adcb02 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -43,9 +43,10 @@ class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
+ kafkaTargetProperties: KafkaTargetProperties,
objectMapper: ObjectMapper
): MtbFileSender {
- return KafkaMtbFileSender(kafkaTemplate, objectMapper)
+ return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
}
@Bean
@@ -54,7 +55,7 @@ class AppKafkaConfiguration {
kafkaTargetProperties: KafkaTargetProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
- val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic);
+ val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index da25576..d903745 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.config.KafkaTargetProperties
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
+ private val kafkaTargetProperties: KafkaTargetProperties,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -34,13 +36,14 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
- val result = kafkaTemplate.sendDefault(
- header(request),
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
objectMapper.writeValueAsString(request.mtbFile)
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
}
@@ -61,14 +64,15 @@ class KafkaMtbFileSender(
.build()
return try {
- val result = kafkaTemplate.sendDefault(
- header(request),
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
objectMapper.writeValueAsString(dummyMtbFile)
)
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
}
@@ -78,13 +82,13 @@ class KafkaMtbFileSender(
}
}
- private fun header(request: MtbFileSender.MtbFileRequest): String {
+ private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
"\"eid\": \"${request.mtbFile.episode.id}\", " +
"\"requestId\": \"${request.requestId}\"}"
}
- private fun header(request: MtbFileSender.DeleteRequest): String {
+ private fun key(request: MtbFileSender.DeleteRequest): String {
return "{\"pid\": \"${request.patientId}\", " +
"\"requestId\": \"${request.requestId}\"}"
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
index 547833c..fd047d0 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -42,6 +42,9 @@ class KafkaResponseProcessor(
val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
+
+ println("${responseBody.statusCode}")
+
when (responseBody.statusCode) {
200 -> {
it.status = RequestStatus.SUCCESS
@@ -69,6 +72,16 @@ class KafkaResponseProcessor(
requestRepository.save(it)
}
+ in 900..999 -> {
+ it.status = RequestStatus.ERROR
+ it.processedAt = Instant.ofEpochMilli(data.timestamp())
+ it.report = Report(
+ "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich",
+ objectMapper.writeValueAsString(responseBody.statusBody)
+ )
+ requestRepository.save(it)
+ }
+
else -> {
logger.error("Cannot process Kafka response: Unknown response code!")
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 72edde6..5cd47c0 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,8 +1,6 @@
spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
- template:
- default-topic: ${app.kafka.topic}
consumer:
group-id: ${app.kafka.group-id}
flyway: