summaryrefslogtreecommitdiff
path: root/src/main/kotlin
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2023-08-16 15:25:46 +0200
committerGitHub2023-08-16 15:25:46 +0200
commit8dc82225a4cd45a315fac3efe4d76513e6d536fc (patch)
treef4a4e6e3b3f7029a04041ef0d2d66602c4cbfdb2 /src/main/kotlin
parent2eb5cc61b9809523d51d1fa7af7a1afc1fdb7f0c (diff)
Issue #7: Send and expect requestId in record body, not in record key (#8)
Diffstat (limited to 'src/main/kotlin')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt12
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt2
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt54
3 files changed, 29 insertions, 39 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index e7f9769..5772faf 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -40,7 +40,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
- objectMapper.writeValueAsString(request.mtbFile)
+ objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
@@ -68,7 +68,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
- objectMapper.writeValueAsString(dummyMtbFile)
+ objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
if (result.get() != null) {
@@ -85,12 +85,12 @@ class KafkaMtbFileSender(
private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
- "\"eid\": \"${request.mtbFile.episode.id}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ "\"eid\": \"${request.mtbFile.episode.id}\"}"
}
private fun key(request: MtbFileSender.DeleteRequest): String {
- return "{\"pid\": \"${request.patientId}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ return "{\"pid\": \"${request.patientId}\"}"
}
+
+ data class Data(val requestId: String, val content: MtbFile)
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
index 677443a..4048348 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
@@ -71,7 +71,7 @@ class ResponseProcessor(
}
else -> {
- logger.error("Cannot process response: Unknown response code!")
+ logger.error("Cannot process response: Unknown response!")
return@ifPresentOrElse
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
index 68e3611..a29010f 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -41,50 +41,40 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
- Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
+ Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
} catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
Optional.empty()
- }.ifPresentOrElse({ responseKey ->
- val event = try {
- val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
- ResponseEvent(
- responseKey.requestId,
- Instant.ofEpochMilli(data.timestamp()),
- responseBody.statusCode.asRequestStatus(),
- when (responseBody.statusCode.asRequestStatus()) {
- RequestStatus.SUCCESS -> {
- Optional.empty()
- }
+ }.ifPresentOrElse({ responseBody ->
+ val event = ResponseEvent(
+ responseBody.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ responseBody.statusCode.asRequestStatus(),
+ when (responseBody.statusCode.asRequestStatus()) {
+ RequestStatus.SUCCESS -> {
+ Optional.empty()
+ }
- RequestStatus.WARNING, RequestStatus.ERROR -> {
- Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
- }
+ RequestStatus.WARNING, RequestStatus.ERROR -> {
+ Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
+ }
- else -> {
- logger.error("Kafka response: Unknown response code!")
- Optional.empty()
- }
+ else -> {
+ logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
+ Optional.empty()
}
- )
- } catch (e: Exception) {
- logger.error("Cannot process Kafka response", e)
- ResponseEvent(
- responseKey.requestId,
- Instant.ofEpochMilli(data.timestamp()),
- RequestStatus.ERROR,
- Optional.of("Cannot process Kafka response")
- )
- }
+ }
+ )
eventPublisher.publishEvent(event)
}, {
- logger.error("No response key in Kafka response")
+ logger.error("No requestId in Kafka response")
})
}
- data class ResponseKey(val requestId: String)
-
data class ResponseBody(
+ @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
)
+
} \ No newline at end of file