From 1a640ff9dff1cc182c4ffc1d00dff370e42a25de Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:15:20 +0200 Subject: Decouple request and response processing --- .../etl/processor/services/RequestProcessorTest.kt | 128 +++++++++++++++---- .../processor/services/ResponseProcessorTest.kt | 142 +++++++++++++++++++++ .../services/kafka/KafkaResponseProcessorTest.kt | 119 +++++++++++++++++ 3 files changed, 362 insertions(+), 27 deletions(-) create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt (limited to 'src/test/kotlin/dev/dnpm') diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 8552bbb..f9d8182 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -37,7 +37,7 @@ import org.mockito.Mockito.* import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor -import reactor.core.publisher.Sinks +import org.springframework.context.ApplicationEventPublisher import java.time.Instant import java.util.* @@ -48,7 +48,7 @@ class RequestProcessorTest { private lateinit var pseudonymizeService: PseudonymizeService private lateinit var sender: MtbFileSender private lateinit var requestService: RequestService - private lateinit var statisticsUpdateProducer: Sinks.Many + private lateinit var applicationEventPublisher: ApplicationEventPublisher private lateinit var requestProcessor: RequestProcessor @@ -57,11 +57,12 @@ class RequestProcessorTest { @Mock pseudonymizeService: PseudonymizeService, @Mock sender: RestMtbFileSender, @Mock requestService: RequestService, + @Mock applicationEventPublisher: ApplicationEventPublisher ) { this.pseudonymizeService = pseudonymizeService this.sender = sender this.requestService = requestService - this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + this.applicationEventPublisher = applicationEventPublisher val objectMapper = ObjectMapper() @@ -70,12 +71,12 @@ class RequestProcessorTest { sender, requestService, objectMapper, - statisticsUpdateProducer + applicationEventPublisher ) } @Test - fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() { doAnswer { Request( id = 1L, @@ -126,11 +127,66 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { + fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSendSuccessEvent() { doAnswer { Request( id = 1L, @@ -149,7 +205,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) doAnswer { @@ -182,14 +238,14 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } @Test - fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + fun testShouldSendMtbFileAndSendErrorEvent() { doAnswer { Request( id = 1L, @@ -208,7 +264,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.ERROR) }.`when`(sender).send(any()) doAnswer { @@ -241,20 +297,20 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test - fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { + fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.UNKNOWN) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") @@ -262,25 +318,43 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + fun testShouldSendDeleteRequestAndSendSuccessEvent() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleteRequestAndSendErrorEvent() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = RequestStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt new file mode 100644 index 0000000..cfb1111 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -0,0 +1,142 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class ResponseProcessorTest { + + private lateinit var requestRepository: RequestRepository + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var responseProcessor: ResponseProcessor + + private val testRequest = Request( + 1L, + "TestID1234", + "PSEUDONYM-A", + "1", + "dummyfingerprint", + RequestType.MTB_FILE, + RequestStatus.UNKNOWN + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository, + @Mock statisticsUpdateProducer: Sinks.Many + ) { + val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.requestRepository = requestRepository + this.statisticsUpdateProducer = statisticsUpdateProducer + + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + } + + @Test + fun shouldNotSaveStatusForUnknownRequest() { + doAnswer { + Optional.empty() + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.SUCCESS + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @Test + fun shouldNotSaveStatusWithUnknownState() { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.UNKNOWN + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @ParameterizedTest + @MethodSource("requestStatusSource") + fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + requestStatus + ) + + this.responseProcessor.handleResponseEvent(event) + + val captor = argumentCaptor() + verify(requestRepository, times(1)).save(captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue.status).isEqualTo(requestStatus) + } + + companion object { + + @JvmStatic + fun requestStatusSource(): Set { + return setOf( + RequestStatus.SUCCESS, + RequestStatus.WARNING, + RequestStatus.ERROR, + RequestStatus.DUPLICATION + ) + } + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt new file mode 100644 index 0000000..0f524ca --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -0,0 +1,119 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.services.ResponseEvent +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.springframework.context.ApplicationEventPublisher +import org.springframework.http.HttpStatus + +@ExtendWith(MockitoExtension::class) +class KafkaResponseProcessorTest { + + private lateinit var eventPublisher: ApplicationEventPublisher + private lateinit var objectMapper: ObjectMapper + + private lateinit var kafkaResponseProcessor: KafkaResponseProcessor + + private fun createkafkaRecord( + requestId: String? = null, + statusCode: Int = 200, + statusBody: Map? = mapOf() + ): ConsumerRecord { + return ConsumerRecord( + "test-topic", + 0, + 0, + if (requestId == null) { + null + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) + }, + if (statusBody == null) { + "" + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + } + ) + } + + @BeforeEach + fun setup( + @Mock eventPublisher: ApplicationEventPublisher + ) { + this.eventPublisher = eventPublisher + this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper) + } + + @Test + fun shouldNotProcessRecordsWithoutValidKey() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldNotProcessRecordsWithoutValidBody() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @ParameterizedTest + @MethodSource("statusCodeSource") + fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { + this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + verify(eventPublisher, times(1)).publishEvent(any()) + } + + companion object { + + @JvmStatic + fun statusCodeSource(): Set { + return setOf( + HttpStatus.OK, + HttpStatus.CREATED, + HttpStatus.BAD_REQUEST, + HttpStatus.NOT_FOUND, + HttpStatus.UNPROCESSABLE_ENTITY, + HttpStatus.INTERNAL_SERVER_ERROR + ) + .map { it.value() } + .toSet() + } + + } + +} \ No newline at end of file -- cgit v1.2.3