/* * This file is part of ETL-Processor * * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken * Copyright (c) 2025-2026 Paul-Christian Volkmer, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published * by the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.CustomMediaType import dev.dnpm.etl.processor.PatientPseudonym import dev.dnpm.etl.processor.RequestId import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.pcvolkmer.mv64e.mtb.* import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.* import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult import org.springframework.retry.policy.SimpleRetryPolicy import org.springframework.retry.support.RetryTemplateBuilder import tools.jackson.databind.json.JsonMapper import java.time.Instant import java.util.* import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.ExecutionException @ExtendWith(MockitoExtension::class) class KafkaMtbFileSenderTest { @Nested inner class BwhcV1Record { private lateinit var kafkaTemplate: KafkaTemplate private lateinit var kafkaMtbFileSender: KafkaMtbFileSender private lateinit var jsonMapper: JsonMapper @BeforeEach fun setup(@Mock kafkaTemplate: KafkaTemplate) { val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() this.jsonMapper = JsonMapper.builder().build() this.kafkaTemplate = kafkaTemplate this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, jsonMapper) } @ParameterizedTest @MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource") fun shouldSendDeleteRequestAndReturnExpectedState(testData: TestData) { doAnswer { if (null != testData.exception) { throw testData.exception } completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) val response = kafkaMtbFileSender.send(DeleteRequest(TEST_REQUEST_ID, TEST_PATIENT_PSEUDONYM)) assertThat(response.status).isEqualTo(testData.requestStatus) } @ParameterizedTest @MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource") fun shouldRetryOnDeleteKafkaSendError(testData: TestData) { val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.jsonMapper) doAnswer { if (null != testData.exception) { throw testData.exception } completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) kafkaMtbFileSender.send(DeleteRequest(TEST_REQUEST_ID, TEST_PATIENT_PSEUDONYM)) val expectedCount = when (testData.exception) { // OK - No Retry null -> times(1) // Request failed - Retry max 3 times else -> times(3) } verify(kafkaTemplate, expectedCount).send(any>()) } } @Nested inner class DnpmV2Record { private lateinit var kafkaTemplate: KafkaTemplate private lateinit var kafkaMtbFileSender: KafkaMtbFileSender private lateinit var jsonMapper: JsonMapper @BeforeEach fun setup(@Mock kafkaTemplate: KafkaTemplate) { val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() this.jsonMapper = JsonMapper() this.kafkaTemplate = kafkaTemplate this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, jsonMapper) } @ParameterizedTest @MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource") fun shouldSendMtbFileRequestAndReturnExpectedState(testData: TestData) { doAnswer { if (null != testData.exception) { throw testData.exception } completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) val response = kafkaMtbFileSender.send(DnpmV2MtbFileRequest(TEST_REQUEST_ID, dnpmV2MtbFile())) assertThat(response.status).isEqualTo(testData.requestStatus) } @Test fun shouldSendMtbFileRequestWithCorrectKeyAndHeaderAndBody() { doAnswer { completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) kafkaMtbFileSender.send(DnpmV2MtbFileRequest(TEST_REQUEST_ID, dnpmV2MtbFile())) val captor = argumentCaptor>() verify(kafkaTemplate, times(1)).send(captor.capture()) assertThat(captor.firstValue.key()).isNotNull assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.firstValue.headers().headers("contentType")).isNotNull assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()) .isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) assertThat(captor.firstValue.headers().headers("requestId")).isNotNull assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()) .isEqualTo(TEST_REQUEST_ID.value.toByteArray()) assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value()) .isEqualTo("POST".toByteArray()) assertThat(captor.firstValue.value()).isNotNull assertThat(captor.firstValue.value()) .isEqualTo(jsonMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID))) } @Test fun shouldSendDeleteRequestWithCorrectKeyAndHeaderAndBody() { doAnswer { completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) kafkaMtbFileSender.send(DeleteRequest(TEST_REQUEST_ID, PatientPseudonym("PID"))) val captor = argumentCaptor>() verify(kafkaTemplate, times(1)).send(captor.capture()) assertThat(captor.firstValue.key()).isNotNull assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.firstValue.headers().headers("contentType")).isNotNull assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()) .isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) assertThat(captor.firstValue.headers().headers("requestId")).isNotNull assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()) .isEqualTo(TEST_REQUEST_ID.value.toByteArray()) assertThat(captor.firstValue.headers().headers("requestMethod")).isNotNull assertThat(captor.firstValue.headers().headers("requestMethod")?.firstOrNull()?.value()) .isEqualTo("DELETE".toByteArray()) assertThat(captor.firstValue.value()).isNotNull } @ParameterizedTest @MethodSource("dev.dnpm.etl.processor.output.KafkaMtbFileSenderTest#requestWithResponseSource") fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) { val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.jsonMapper) doAnswer { if (null != testData.exception) { throw testData.exception } completedFuture(SendResult(testProducerRecord(), testProducerRecordWithMetadata())) } .whenever(kafkaTemplate) .send(any>()) kafkaMtbFileSender.send(DnpmV2MtbFileRequest(TEST_REQUEST_ID, dnpmV2MtbFile())) val expectedCount = when (testData.exception) { // OK - No Retry null -> times(1) // Request failed - Retry max 3 times else -> times(3) } verify(kafkaTemplate, expectedCount).send(any>()) } } fun testProducerRecord(): ProducerRecord { return ProducerRecord("testtopic", "testkey", "testvalue") } fun testProducerRecordWithMetadata(): RecordMetadata { return RecordMetadata( TopicPartition("testtopic", 0), 0, 0, Instant.now().epochSecond, 0, 0 ) } companion object { val TEST_REQUEST_ID = RequestId("TestId") val TEST_PATIENT_PSEUDONYM = PatientPseudonym("PID") fun dnpmV2MtbFile(): Mtb { return Mtb().apply { this.patient = dev.pcvolkmer.mv64e.mtb.Patient().apply { this.id = "PID" this.birthDate = Date.from(Instant.now()) this.gender = GenderCoding().apply { this.code = GenderCodingCode.MALE } } this.episodesOfCare = listOf( MtbEpisodeOfCare().apply { this.id = "1" this.patient = Reference().apply { this.id = "PID" } this.period = PeriodDate().apply { this.start = Date.from(Instant.now()) } } ) } } fun dnmpV2kafkaRecordData(requestId: RequestId): Mtb { return DnpmV2MtbFileRequest(requestId, dnpmV2MtbFile()).content } data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) @JvmStatic fun requestWithResponseSource(): Set { return setOf( TestData(RequestStatus.UNKNOWN), TestData(RequestStatus.ERROR, InterruptedException("Test interrupted")), TestData( RequestStatus.ERROR, ExecutionException(RuntimeException("Test execution aborted")), ), ) } } }