summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJakub Lidke2023-07-27 11:49:31 +0200
committerJakub Lidke2023-07-27 11:49:31 +0200
commitcf8c5a86928da3a109e700a5221b5aa26cfe4aa7 (patch)
tree63a94c1075e9cf477cc001b67d80f5b33a43ec8e /src
parente9e7139ca41571546b28e48d7254eff781eadf36 (diff)
fix: wait for kafka to accept message and return success than
Diffstat (limited to 'src')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt21
1 files changed, 15 insertions, 6 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index f83a2ab..18faaf9 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -27,18 +27,27 @@ import org.springframework.stereotype.Component
@Component
class KafkaMtbFileSender(
- private val kafkaTemplate: KafkaTemplate<String, String>,
- private val objectMapper: ObjectMapper
+ private val kafkaTemplate: KafkaTemplate<String, String>,
+ private val objectMapper: ObjectMapper
) : MtbFileSender {
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
return try {
- kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id,
- mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile))
- logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ var result = kafkaTemplate.sendDefault(
+ String.format(
+ "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id,
+ mtbFile.episode.id
+ ), objectMapper.writeValueAsString(mtbFile)
+ )
+ if (result.get() != null) {
+ logger.debug("Sent file via KafkaMtbFileSender");
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS);
+ } else {
+ MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ }
+
} catch (e: Exception) {
logger.error("An error occured sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)