summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt8
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt11
2 files changed, 12 insertions, 7 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
index 4ac9f2d..d53eb7e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.kafka.listener.MessageListener
+import java.nio.charset.Charset
class KafkaInputListener(
private val requestProcessor: RequestProcessor,
@@ -49,19 +50,16 @@ class KafkaInputListener(
}
}
- private fun guessMimeType(record: ConsumerRecord<String, String>): String {
+ private fun guessMimeType(record: ConsumerRecord<String, String>): String? {
if (record.headers().headers("contentType").toList().isEmpty()) {
// Fallback if no contentType set (old behavior)
return MediaType.APPLICATION_JSON_VALUE
}
- return record.headers().headers("contentType")?.firstOrNull()?.value().contentToString()
+ return record.headers().headers("contentType")?.firstOrNull()?.value()?.toString(Charset.forName("UTF-8"))
}
private fun handleDnpmV2Message(record: ConsumerRecord<String, String>) {
- // Do not handle DNPM-V2 for now
- logger.warn("Ignoring MTB File in DNPM V2 format: Not implemented yet")
-
val mtbFile = objectMapper.readValue(record.value(), Mtb::class.java)
val patientId = PatientId(mtbFile.patient.id)
val firstRequestIdHeader = record.headers().headers("requestId")?.firstOrNull()
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
index a047f74..7f07766 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt
@@ -244,7 +244,14 @@ class KafkaInputListenerTest {
}
@Test
- fun shouldNotProcessDnpmV2Request() {
+ fun shouldProcessDnpmV2Request() {
+ whenever(consentEvaluator.check(any())).thenReturn(
+ ConsentEvaluation(
+ TtpConsentStatus.BROAD_CONSENT_GIVEN,
+ false
+ )
+ )
+
val mtbFile = Mtb.builder()
.patient(Patient.builder().id("DUMMY_12345678").build())
.metadata(
@@ -285,7 +292,7 @@ class KafkaInputListenerTest {
Optional.empty()
)
)
- verify(requestProcessor, times(0)).processDeletion(
+ verify(requestProcessor, times(1)).processDeletion(
anyValueClass(), anyValueClass(), eq(
TtpConsentStatus.UNKNOWN_CHECK_FILE
)