summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul-Christian Volkmer2024-01-04 11:56:40 +0100
committerGitHub2024-01-04 11:56:40 +0100
commit0083e759402602dd5953521cd574bf16cc97741a (patch)
tree90403d001813ba240e09ba8d39cad25b7004445f
parent8a6f9a6e02f99032085aa13a8c9ce783d87a4de1 (diff)
parentc892ff24613bbe600458aa8a0abfb2b96d1092e1 (diff)
Merge pull request #19 from CCC-MF/feat_17
feat #17: add request retry
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt15
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt4
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt9
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt48
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt59
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt60
-rw-r--r--src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt66
7 files changed, 208 insertions, 53 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index b6bedf5..42632aa 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -32,8 +32,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
+import org.springframework.retry.policy.SimpleRetryPolicy
+import org.springframework.retry.support.RetryTemplate
+import org.springframework.retry.support.RetryTemplateBuilder
import org.springframework.scheduling.annotation.EnableScheduling
import reactor.core.publisher.Sinks
+import kotlin.time.Duration.Companion.seconds
+import kotlin.time.toJavaDuration
+
@Configuration
@EnableConfigurationProperties(
@@ -89,5 +95,14 @@ class AppConfiguration {
})
}
+ @Bean
+ fun retryTemplate(): RetryTemplate {
+ return RetryTemplateBuilder()
+ .notRetryOn(IllegalArgumentException::class.java)
+ .fixedBackoff(5.seconds.toJavaDuration())
+ .customPolicy(SimpleRetryPolicy(3))
+ .build()
+ }
+
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index c8fbdf5..15ed798 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -37,6 +37,7 @@ import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
+import org.springframework.retry.support.RetryTemplate
@Configuration
@EnableConfigurationProperties(
@@ -53,10 +54,11 @@ class AppKafkaConfiguration {
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
kafkaTargetProperties: KafkaTargetProperties,
+ retryTemplate: RetryTemplate,
objectMapper: ObjectMapper
): MtbFileSender {
logger.info("Selected 'KafkaMtbFileSender'")
- return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
+ return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
}
@Bean
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
index 2596e1c..64e91e7 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
@@ -30,6 +30,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
+import org.springframework.retry.support.RetryTemplate
import org.springframework.web.client.RestTemplate
@Configuration
@@ -51,9 +52,13 @@ class AppRestConfiguration {
}
@Bean
- fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
+ fun restMtbFileSender(
+ restTemplate: RestTemplate,
+ restTargetProperties: RestTargetProperties,
+ retryTemplate: RetryTemplate
+ ): MtbFileSender {
logger.info("Selected 'RestMtbFileSender'")
- return RestMtbFileSender(restTemplate, restTargetProperties)
+ return RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
}
@Bean
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 5772faf..8c244b8 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -26,10 +26,12 @@ import dev.dnpm.etl.processor.config.KafkaTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.retry.support.RetryTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val kafkaTargetProperties: KafkaTargetProperties,
+ private val retryTemplate: RetryTemplate,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@@ -37,16 +39,18 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
- val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
- key(request),
- objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
- )
- if (result.get() != null) {
- logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(RequestStatus.UNKNOWN)
- } else {
- MtbFileSender.Response(RequestStatus.ERROR)
+ return retryTemplate.execute<MtbFileSender.Response, Exception> {
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
+ )
+ if (result.get() != null) {
+ logger.debug("Sent file via KafkaMtbFileSender")
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
+ } else {
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
@@ -65,17 +69,19 @@ class KafkaMtbFileSender(
.build()
return try {
- val result = kafkaTemplate.send(
- kafkaTargetProperties.topic,
- key(request),
- objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
- )
+ return retryTemplate.execute<MtbFileSender.Response, Exception> {
+ val result = kafkaTemplate.send(
+ kafkaTargetProperties.topic,
+ key(request),
+ objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
+ )
- if (result.get() != null) {
- logger.debug("Sent deletion request via KafkaMtbFileSender")
- MtbFileSender.Response(RequestStatus.UNKNOWN)
- } else {
- MtbFileSender.Response(RequestStatus.ERROR)
+ if (result.get() != null) {
+ logger.debug("Sent deletion request via KafkaMtbFileSender")
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
+ } else {
+ MtbFileSender.Response(RequestStatus.ERROR)
+ }
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 1c59f5c..5a4ae9e 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -25,32 +25,39 @@ import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
+import org.springframework.retry.support.RetryTemplate
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
class RestMtbFileSender(
private val restTemplate: RestTemplate,
- private val restTargetProperties: RestTargetProperties
+ private val restTargetProperties: RestTargetProperties,
+ private val retryTemplate: RetryTemplate
) : MtbFileSender {
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
try {
- val headers = HttpHeaders()
- headers.contentType = MediaType.APPLICATION_JSON
- val entityReq = HttpEntity(request.mtbFile, headers)
- val response = restTemplate.postForEntity(
- "${restTargetProperties.uri}/MTBFile",
- entityReq,
- String::class.java
- )
- if (!response.statusCode.is2xxSuccessful) {
- logger.warn("Error sending to remote system: {}", response.body)
- return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
+ return retryTemplate.execute<MtbFileSender.Response, Exception> {
+ val headers = HttpHeaders()
+ headers.contentType = MediaType.APPLICATION_JSON
+ val entityReq = HttpEntity(request.mtbFile, headers)
+ val response = restTemplate.postForEntity(
+ "${restTargetProperties.uri}/MTBFile",
+ entityReq,
+ String::class.java
+ )
+ if (!response.statusCode.is2xxSuccessful) {
+ logger.warn("Error sending to remote system: {}", response.body)
+ return@execute MtbFileSender.Response(
+ response.statusCode.asRequestStatus(),
+ "Status-Code: ${response.statusCode.value()}"
+ )
+ }
+ logger.debug("Sent file via RestMtbFileSender")
+ return@execute MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
}
- logger.debug("Sent file via RestMtbFileSender")
- return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
@@ -62,16 +69,18 @@ class RestMtbFileSender(
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
try {
- val headers = HttpHeaders()
- headers.contentType = MediaType.APPLICATION_JSON
- val entityReq = HttpEntity(null, headers)
- restTemplate.delete(
- "${restTargetProperties.uri}/Patient/${request.patientId}",
- entityReq,
- String::class.java
- )
- logger.debug("Sent file via RestMtbFileSender")
- return MtbFileSender.Response(RequestStatus.SUCCESS)
+ return retryTemplate.execute<MtbFileSender.Response, Exception> {
+ val headers = HttpHeaders()
+ headers.contentType = MediaType.APPLICATION_JSON
+ val entityReq = HttpEntity(null, headers)
+ restTemplate.delete(
+ "${restTargetProperties.uri}/Patient/${request.patientId}",
+ entityReq,
+ String::class.java
+ )
+ logger.debug("Sent file via RestMtbFileSender")
+ return@execute MtbFileSender.Response(RequestStatus.SUCCESS)
+ }
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
index 3ec9757..d0f7c30 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.*
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
+import org.springframework.retry.policy.SimpleRetryPolicy
+import org.springframework.retry.support.RetryTemplateBuilder
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.ExecutionException
@@ -52,10 +54,12 @@ class KafkaMtbFileSenderTest {
@Mock kafkaTemplate: KafkaTemplate<String, String>
) {
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
+
this.objectMapper = ObjectMapper()
this.kafkaTemplate = kafkaTemplate
- this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
+ this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
}
@ParameterizedTest
@@ -118,6 +122,58 @@ class KafkaMtbFileSenderTest {
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
}
+ @ParameterizedTest
+ @MethodSource("requestWithResponseSource")
+ fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) {
+ val kafkaTargetProperties = KafkaTargetProperties("testtopic")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
+ this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
+
+ doAnswer {
+ if (null != testData.exception) {
+ throw testData.exception
+ }
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
+
+ val expectedCount = when (testData.exception) {
+ // OK - No Retry
+ null -> times(1)
+ // Request failed - Retry max 3 times
+ else -> times(3)
+ }
+
+ verify(kafkaTemplate, expectedCount).send(anyString(), anyString(), anyString())
+ }
+
+ @ParameterizedTest
+ @MethodSource("requestWithResponseSource")
+ fun shouldRetryOnDeleteKafkaSendError(testData: TestData) {
+ val kafkaTargetProperties = KafkaTargetProperties("testtopic")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
+ this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
+
+ doAnswer {
+ if (null != testData.exception) {
+ throw testData.exception
+ }
+ completedFuture(SendResult<String, String>(null, null))
+ }.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
+
+ kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
+
+ val expectedCount = when (testData.exception) {
+ // OK - No Retry
+ null -> times(1)
+ // Request failed - Retry max 3 times
+ else -> times(3)
+ }
+
+ verify(kafkaTemplate, expectedCount).send(anyString(), anyString(), anyString())
+ }
+
companion object {
fun mtbFile(consentStatus: Consent.Status): MtbFile {
return if (consentStatus == Consent.Status.ACTIVE) {
diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt
index 0cad285..df19ddb 100644
--- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt
+++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
- * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
@@ -28,6 +28,9 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
+import org.springframework.retry.policy.SimpleRetryPolicy
+import org.springframework.retry.support.RetryTemplateBuilder
+import org.springframework.test.web.client.ExpectedCount
import org.springframework.test.web.client.MockRestServiceServer
import org.springframework.test.web.client.match.MockRestRequestMatchers.method
import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
@@ -44,10 +47,11 @@ class RestMtbFileSenderTest {
fun setup() {
val restTemplate = RestTemplate()
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
- this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties)
+ this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
}
@ParameterizedTest
@@ -80,6 +84,64 @@ class RestMtbFileSenderTest {
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
}
+ @ParameterizedTest
+ @MethodSource("mtbFileRequestWithResponseSource")
+ fun shouldRetryOnMtbFileHttpRequestError(requestWithResponse: RequestWithResponse) {
+ val restTemplate = RestTemplate()
+ val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
+
+ this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
+ this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
+
+ val expectedCount = when (requestWithResponse.httpStatus) {
+ // OK - No Retry
+ HttpStatus.OK, HttpStatus.CREATED -> ExpectedCount.max(1)
+ // Request failed - Retry max 3 times
+ else -> ExpectedCount.max(3)
+ }
+
+ this.mockRestServiceServer.expect(expectedCount) {
+ method(HttpMethod.POST)
+ requestTo("/mtbfile")
+ }.andRespond {
+ withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
+ }
+
+ val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
+ assertThat(response.status).isEqualTo(requestWithResponse.response.status)
+ assertThat(response.body).isEqualTo(requestWithResponse.response.body)
+ }
+
+ @ParameterizedTest
+ @MethodSource("deleteRequestWithResponseSource")
+ fun shouldRetryOnDeleteHttpRequestError(requestWithResponse: RequestWithResponse) {
+ val restTemplate = RestTemplate()
+ val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
+ val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
+
+ this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
+ this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
+
+ val expectedCount = when (requestWithResponse.httpStatus) {
+ // OK - No Retry
+ HttpStatus.OK, HttpStatus.CREATED -> ExpectedCount.max(1)
+ // Request failed - Retry max 3 times
+ else -> ExpectedCount.max(3)
+ }
+
+ this.mockRestServiceServer.expect(expectedCount) {
+ method(HttpMethod.DELETE)
+ requestTo("/mtbfile")
+ }.andRespond {
+ withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
+ }
+
+ val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
+ assertThat(response.status).isEqualTo(requestWithResponse.response.status)
+ assertThat(response.body).isEqualTo(requestWithResponse.response.body)
+ }
+
companion object {
data class RequestWithResponse(
val httpStatus: HttpStatus,