summaryrefslogtreecommitdiff
path: root/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/kotlin')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt13
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt21
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt16
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt135
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt96
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt85
7 files changed, 211 insertions, 161 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 6d0254e..309ff2d 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -20,7 +20,6 @@
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
-import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
@@ -70,10 +70,10 @@ class AppKafkaConfiguration {
@Bean
fun kafkaResponseProcessor(
- requestRepository: RequestRepository,
+ applicationEventPublisher: ApplicationEventPublisher,
objectMapper: ObjectMapper
): KafkaResponseProcessor {
- return KafkaResponseProcessor(requestRepository, objectMapper)
+ return KafkaResponseProcessor(applicationEventPublisher, objectMapper)
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index d903745..9448e29 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.KafkaTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@@ -43,13 +44,13 @@ class KafkaMtbFileSender(
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
}
}
@@ -72,13 +73,13 @@ class KafkaMtbFileSender(
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
+ MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
- MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
+ MtbFileSender.Response(RequestStatus.UNKNOWN)
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
index 6914ba5..de0efaa 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
@@ -20,22 +20,31 @@
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.springframework.http.HttpStatusCode
interface MtbFileSender {
fun send(request: MtbFileRequest): Response
fun send(request: DeleteRequest): Response
- data class Response(val status: ResponseStatus, val reason: String = "")
+ data class Response(val status: RequestStatus, val body: String = "")
data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile)
data class DeleteRequest(val requestId: String, val patientId: String)
- enum class ResponseStatus {
- SUCCESS,
- WARNING,
- ERROR,
- UNKNOWN
+}
+
+fun Int.asRequestStatus(): RequestStatus {
+ return when (this) {
+ 200 -> RequestStatus.SUCCESS
+ 201 -> RequestStatus.WARNING
+ in 400 .. 999 -> RequestStatus.ERROR
+ else -> RequestStatus.UNKNOWN
}
+}
+
+fun HttpStatusCode.asRequestStatus(): RequestStatus {
+ return this.value().asRequestStatus()
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 04c73ef..24cdc49 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -20,13 +20,13 @@
package dev.dnpm.etl.processor.output
import dev.dnpm.etl.processor.config.RestTargetProperties
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
-import org.springframework.web.util.UriComponentsBuilder
class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender {
@@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}")
+ return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
- return if (response.body?.contains("warning") == true) {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}")
- } else {
- MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
- }
+ return MtbFileSender.Response(response.statusCode.asRequestStatus())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
@@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
String::class.java
)
logger.debug("Sent file via RestMtbFileSender")
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
+ return MtbFileSender.Response(RequestStatus.SUCCESS)
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
+ return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
index 6465e82..d2f8619 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt
@@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
-import reactor.core.publisher.Sinks
+import java.time.Instant
import java.util.*
@Service
@@ -41,73 +42,54 @@ class RequestProcessor(
private val sender: MtbFileSender,
private val requestService: RequestService,
private val objectMapper: ObjectMapper,
- private val statisticsUpdateProducer: Sinks.Many<Any>
+ private val applicationEventPublisher: ApplicationEventPublisher
) {
private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
fun processMtbFile(mtbFile: MtbFile) {
+ val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
mtbFile pseudonymizeWith pseudonymizeService
+ val request = MtbFileSender.MtbFileRequest(requestId, mtbFile)
+
+ requestService.save(
+ Request(
+ uuid = requestId,
+ patientId = request.mtbFile.patient.id,
+ pid = pid,
+ fingerprint = fingerprint(request.mtbFile),
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.MTB_FILE
+ )
+ )
+
if (isDuplication(mtbFile)) {
- requestService.save(
- Request(
- patientId = mtbFile.patient.id,
- pid = pid,
- fingerprint = fingerprint(mtbFile),
- status = RequestStatus.DUPLICATION,
- type = RequestType.MTB_FILE,
- report = Report("Duplikat erkannt - keine Daten weitergeleitet")
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ RequestStatus.DUPLICATION
)
)
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
return
}
- val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile)
-
val responseStatus = sender.send(request)
- if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
- logger.info(
- "Sent file for Patient '{}' using '{}'",
- mtbFile.patient.id,
- sender.javaClass.simpleName
- )
- } else {
- logger.error(
- "Error sending file for Patient '{}' using '{}'",
- mtbFile.patient.id,
- sender.javaClass.simpleName
- )
- }
-
- val requestStatus = when (responseStatus.status) {
- MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR
- MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING
- MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS
- else -> RequestStatus.UNKNOWN
- }
- requestService.save(
- Request(
- uuid = request.requestId,
- patientId = request.mtbFile.patient.id,
- pid = pid,
- fingerprint = fingerprint(request.mtbFile),
- status = requestStatus,
- type = RequestType.MTB_FILE,
- report = when (requestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason)
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
}
)
)
-
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
}
private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
@@ -126,55 +108,31 @@ class RequestProcessor(
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
- val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
- when (responseStatus.status) {
- MtbFileSender.ResponseStatus.SUCCESS -> {
- logger.info(
- "Sent delete for Patient '{}' using '{}'",
- patientPseudonym,
- sender.javaClass.simpleName
- )
- }
-
- MtbFileSender.ResponseStatus.ERROR -> {
- logger.error(
- "Error deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- sender.javaClass.simpleName
- )
- }
-
- else -> {
- logger.error(
- "Unknown result on deleting data for Patient '{}' using '{}'",
- patientPseudonym,
- sender.javaClass.simpleName
- )
- }
- }
-
- val requestStatus = when (responseStatus.status) {
- MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR
- MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING
- MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS
- else -> RequestStatus.UNKNOWN
- }
-
requestService.save(
Request(
uuid = requestId,
patientId = patientPseudonym,
pid = patientId,
fingerprint = fingerprint(patientPseudonym),
- status = requestStatus,
- type = RequestType.DELETE,
- report = when (requestStatus) {
- RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
- RequestStatus.UNKNOWN -> Report("Keine Informationen")
- else -> null
+ status = RequestStatus.UNKNOWN,
+ type = RequestType.DELETE
+ )
+ )
+
+ val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
+
+ applicationEventPublisher.publishEvent(
+ ResponseEvent(
+ requestId,
+ Instant.now(),
+ responseStatus.status,
+ when (responseStatus.status) {
+ RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body)
+ else -> Optional.empty()
}
)
)
+
} catch (e: Exception) {
requestService.save(
Request(
@@ -188,7 +146,6 @@ class RequestProcessor(
)
)
}
- statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
}
private fun fingerprint(mtbFile: MtbFile): String {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
new file mode 100644
index 0000000..d7ad86f
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt
@@ -0,0 +1,96 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package dev.dnpm.etl.processor.services
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import dev.dnpm.etl.processor.monitoring.Report
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
+import org.slf4j.LoggerFactory
+import org.springframework.context.event.EventListener
+import org.springframework.stereotype.Service
+import reactor.core.publisher.Sinks
+import java.time.Instant
+import java.util.*
+
+@Service
+class ResponseProcessor(
+ private val requestRepository: RequestRepository,
+ private val statisticsUpdateProducer: Sinks.Many<Any>,
+ private val objectMapper: ObjectMapper
+) {
+
+ private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
+
+ @EventListener(classes = [ResponseEvent::class])
+ fun handleResponseEvent(event: ResponseEvent) {
+ requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({
+ it.processedAt = event.timestamp
+ it.status = event.status
+
+ when (event.status) {
+ RequestStatus.SUCCESS -> {
+ it.report = Report(
+ "Keine Probleme erkannt",
+ )
+ }
+
+ RequestStatus.WARNING -> {
+ it.report = Report(
+ "Warnungen über mangelhafte Daten",
+ objectMapper.writeValueAsString(event.body)
+ )
+ }
+
+ RequestStatus.ERROR -> {
+ it.report = Report(
+ "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
+ objectMapper.writeValueAsString(event.body)
+ )
+ }
+
+ RequestStatus.DUPLICATION -> {
+ it.report = Report(
+ "Duplikat erkannt"
+ )
+ }
+
+ else -> {
+ logger.error("Cannot process response: Unknown response code!")
+ return@ifPresentOrElse
+ }
+ }
+
+ requestRepository.save(it)
+
+ statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
+ }, {
+ logger.error("Response for unknown request '${event.requestUuid}'!")
+ })
+ }
+
+}
+
+data class ResponseEvent(
+ val requestUuid: String,
+ val timestamp: Instant,
+ val status: RequestStatus,
+ val body: Optional<String> = Optional.empty()
+) \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
index 1e9263d..ef880f4 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt
@@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka
import com.fasterxml.jackson.annotation.JsonAlias
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
-import dev.dnpm.etl.processor.monitoring.Report
-import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
+import dev.dnpm.etl.processor.output.asRequestStatus
+import dev.dnpm.etl.processor.services.ResponseEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
+import org.springframework.context.ApplicationEventPublisher
import org.springframework.kafka.listener.MessageListener
import java.time.Instant
+import java.util.*
class KafkaResponseProcessor(
- private val requestRepository: RequestRepository,
+ private val eventPublisher: ApplicationEventPublisher,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
@@ -39,55 +41,44 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
- val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
- requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
+ Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
+ } catch (e: Exception) {
+ Optional.empty()
+ }.ifPresentOrElse({ responseKey ->
+ val event = try {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
+ ResponseEvent(
+ responseKey.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ responseBody.statusCode.asRequestStatus(),
+ when (responseBody.statusCode.asRequestStatus()) {
+ RequestStatus.SUCCESS -> {
+ Optional.empty()
+ }
- when (responseBody.statusCode) {
- 200 -> {
- it.status = RequestStatus.SUCCESS
- it.processedAt = Instant.ofEpochMilli(data.timestamp())
- requestRepository.save(it)
- }
+ RequestStatus.WARNING, RequestStatus.ERROR -> {
+ Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
+ }
- 201 -> {
- it.status = RequestStatus.WARNING
- it.processedAt = Instant.ofEpochMilli(data.timestamp())
- it.report = Report(
- "Warnungen über mangelhafte Daten",
- objectMapper.writeValueAsString(responseBody.statusBody)
- )
- requestRepository.save(it)
+ else -> {
+ logger.error("Kafka response: Unknown response code!")
+ Optional.empty()
+ }
}
-
- 400, 422 -> {
- it.status = RequestStatus.ERROR
- it.processedAt = Instant.ofEpochMilli(data.timestamp())
- it.report = Report(
- "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
- objectMapper.writeValueAsString(responseBody.statusBody)
- )
- requestRepository.save(it)
- }
-
- in 900..999 -> {
- it.status = RequestStatus.ERROR
- it.processedAt = Instant.ofEpochMilli(data.timestamp())
- it.report = Report(
- "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich",
- objectMapper.writeValueAsString(responseBody.statusBody)
- )
- requestRepository.save(it)
- }
-
- else -> {
- logger.error("Cannot process Kafka response: Unknown response code!")
- }
- }
+ )
+ } catch (e: Exception) {
+ logger.error("Cannot process Kafka response", e)
+ ResponseEvent(
+ responseKey.requestId,
+ Instant.ofEpochMilli(data.timestamp()),
+ RequestStatus.ERROR,
+ Optional.of("Cannot process Kafka response")
+ )
}
- } catch (e: Exception) {
- logger.error("Cannot process Kafka response", e)
- }
+ eventPublisher.publishEvent(event)
+ }, {
+ logger.error("No response key in Kafka response")
+ })
}
data class ResponseKey(val requestId: String)