summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt12
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt2
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt54
-rw-r--r--src/main/resources/application-dev.yml2
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt12
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt54
6 files changed, 80 insertions, 56 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index e7f9769..5772faf 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -40,7 +40,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
- objectMapper.writeValueAsString(request.mtbFile)
+ objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
@@ -68,7 +68,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
- objectMapper.writeValueAsString(dummyMtbFile)
+ objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
if (result.get() != null) {
@@ -85,12 +85,12 @@ class KafkaMtbFileSender(
private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
- "\"eid\": \"${request.mtbFile.episode.id}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ "\"eid\": \"${request.mtbFile.episode.id}\"}"
}
private fun key(request: MtbFileSender.DeleteRequest): String {
- return "{\"pid\": \"${request.patientId}\", " +
- "\"requestId\": \"${request.requestId}\"}"
+ return "{\"pid\": \"${request.patientId}\"}"
}
+
+ data class Data(val requestId: String, val content: MtbFile)
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
index 677443a..4048348 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
@@ -71,7 +71,7 @@ class ResponseProcessor(
}
else -> {
- logger.error("Cannot process response: Unknown response code!")
+ logger.error("Cannot process response: Unknown response!")
return@ifPresentOrElse
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
index 68e3611..a29010f 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -41,50 +41,40 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
- Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
+ Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
} catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
Optional.empty()
- }.ifPresentOrElse({ responseKey ->
- val event = try {
- val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
- ResponseEvent(
- responseKey.requestId,
- Instant.ofEpochMilli(data.timestamp()),
- responseBody.statusCode.asRequestStatus(),
- when (responseBody.statusCode.asRequestStatus()) {
- RequestStatus.SUCCESS -> {
- Optional.empty()
- }
+ }.ifPresentOrElse({ responseBody ->
+ val event = ResponseEvent(
+ responseBody.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ responseBody.statusCode.asRequestStatus(),
+ when (responseBody.statusCode.asRequestStatus()) {
+ RequestStatus.SUCCESS -> {
+ Optional.empty()
+ }
- RequestStatus.WARNING, RequestStatus.ERROR -> {
- Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
- }
+ RequestStatus.WARNING, RequestStatus.ERROR -> {
+ Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
+ }
- else -> {
- logger.error("Kafka response: Unknown response code!")
- Optional.empty()
- }
+ else -> {
+ logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
+ Optional.empty()
}
- )
- } catch (e: Exception) {
- logger.error("Cannot process Kafka response", e)
- ResponseEvent(
- responseKey.requestId,
- Instant.ofEpochMilli(data.timestamp()),
- RequestStatus.ERROR,
- Optional.of("Cannot process Kafka response")
- )
- }
+ }
+ )
eventPublisher.publishEvent(event)
}, {
- logger.error("No response key in Kafka response")
+ logger.error("No requestId in Kafka response")
})
}
- data class ResponseKey(val requestId: String)
-
data class ResponseBody(
+ @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
)
+
} \ No newline at end of file
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index b1cc2fc..a60cd8a 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -11,7 +11,7 @@ app:
# otherwise connection will not be available
kafka:
topic: test
- response-topic: test-response
+ response-topic: test_response
servers: kafka:9092
server:
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
index 14bdd5d..3ec9757 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
@@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
- assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}")
+ assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
assertThat(captor.secondValue).isNotNull
- assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE)))
+ assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
}
@Test
@@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
- assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}")
+ assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
assertThat(captor.secondValue).isNotNull
- assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED)))
+ assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
}
companion object {
@@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest {
}.build()
}
+ fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
+ return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
+ }
+
data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
@JvmStatic
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt
index 6d83146..95bf41b 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt
@@ -46,7 +46,7 @@ class KafkaResponseProcessorTest {
private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
private fun createKafkaRecord(
- requestId: String? = null,
+ requestId: String,
statusCode: Int = 200,
statusBody: Map<String, Any>? = mapOf()
): ConsumerRecord<String, String> {
@@ -54,15 +54,11 @@ class KafkaResponseProcessorTest {
"test-topic",
0,
0,
- if (requestId == null) {
- null
- } else {
- this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId))
- },
+ null,
if (statusBody == null) {
""
} else {
- this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody))
+ this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
}
)
}
@@ -78,17 +74,51 @@ class KafkaResponseProcessorTest {
}
@Test
- fun shouldNotProcessRecordsWithoutValidKey() {
- this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200))
+ fun shouldNotProcessRecordsWithoutRequestIdInBody() {
+ val record = ConsumerRecord<String, String>(
+ "test-topic",
+ 0,
+ 0,
+ null,
+ """
+ {
+ "statusCode": 200,
+ "statusBody": {}
+ }
+ """.trimIndent()
+ )
+
+ this.kafkaResponseProcessor.onMessage(record)
+
+ verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
+ }
+
+ @Test
+ fun shouldProcessRecordsWithAliasNames() {
+ val record = ConsumerRecord<String, String>(
+ "test-topic",
+ 0,
+ 0,
+ null,
+ """
+ {
+ "request_id": "test0123456789",
+ "status_code": 200,
+ "status_body": {}
+ }
+ """.trimIndent()
+ )
- verify(eventPublisher, never()).publishEvent(any())
+ this.kafkaResponseProcessor.onMessage(record)
+
+ verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}
@Test
- fun shouldNotProcessRecordsWithoutValidBody() {
+ fun shouldNotProcessRecordsWithoutValidStatusBody() {
this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
- verify(eventPublisher, never()).publishEvent(any())
+ verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
}
@ParameterizedTest