diff options
| author | Paul-Christian Volkmer | 2023-08-16 15:25:46 +0200 |
|---|---|---|
| committer | GitHub | 2023-08-16 15:25:46 +0200 |
| commit | 8dc82225a4cd45a315fac3efe4d76513e6d536fc (patch) | |
| tree | f4a4e6e3b3f7029a04041ef0d2d66602c4cbfdb2 /src/main/kotlin | |
| parent | 2eb5cc61b9809523d51d1fa7af7a1afc1fdb7f0c (diff) | |
Issue #7: Send and expect requestId in record body, not in record key (#8)
Diffstat (limited to 'src/main/kotlin')
3 files changed, 29 insertions, 39 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index e7f9769..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -40,7 +40,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(request.mtbFile) + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") @@ -68,7 +68,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(dummyMtbFile) + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) if (result.get() != null) { @@ -85,12 +85,12 @@ class KafkaMtbFileSender( private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId}\", " + - "\"requestId\": \"${request.requestId}\"}" + return "{\"pid\": \"${request.patientId}\"}" } + + data class Data(val requestId: String, val content: MtbFile) }
\ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index 677443a..4048348 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -71,7 +71,7 @@ class ResponseProcessor( } else -> { - logger.error("Cannot process response: Unknown response code!") + logger.error("Cannot process response: Unknown response!") return@ifPresentOrElse } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 68e3611..a29010f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -41,50 +41,40 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord<String, String>) { try { - Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) Optional.empty() - }.ifPresentOrElse({ responseKey -> - val event = try { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - responseBody.statusCode.asRequestStatus(), - when (responseBody.statusCode.asRequestStatus()) { - RequestStatus.SUCCESS -> { - Optional.empty() - } + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } - RequestStatus.WARNING, RequestStatus.ERROR -> { - Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) - } + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } - else -> { - logger.error("Kafka response: Unknown response code!") - Optional.empty() - } + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() } - ) - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - RequestStatus.ERROR, - Optional.of("Cannot process Kafka response") - ) - } + } + ) eventPublisher.publishEvent(event) }, { - logger.error("No response key in Kafka response") + logger.error("No requestId in Kafka response") }) } - data class ResponseKey(val requestId: String) - data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any> ) + }
\ No newline at end of file |
