diff options
| author | Jakub Lidke | 2023-07-27 11:49:31 +0200 |
|---|---|---|
| committer | Jakub Lidke | 2023-07-27 11:49:31 +0200 |
| commit | cf8c5a86928da3a109e700a5221b5aa26cfe4aa7 (patch) | |
| tree | 63a94c1075e9cf477cc001b67d80f5b33a43ec8e /src/main/kotlin/dev/dnpm/etl/processor | |
| parent | e9e7139ca41571546b28e48d7254eff781eadf36 (diff) | |
fix: wait for kafka to accept message and return success than
Diffstat (limited to 'src/main/kotlin/dev/dnpm/etl/processor')
| -rw-r--r-- | src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt | 21 |
1 files changed, 15 insertions, 6 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index f83a2ab..18faaf9 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -27,18 +27,27 @@ import org.springframework.stereotype.Component @Component class KafkaMtbFileSender( - private val kafkaTemplate: KafkaTemplate<String, String>, - private val objectMapper: ObjectMapper + private val kafkaTemplate: KafkaTemplate<String, String>, + private val objectMapper: ObjectMapper ) : MtbFileSender { private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, - mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + var result = kafkaTemplate.sendDefault( + String.format( + "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, + mtbFile.episode.id + ), objectMapper.writeValueAsString(mtbFile) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender"); + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS); + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { logger.error("An error occured sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) |
