summaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt15
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt5
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt6
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt12
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt16
-rw-r--r--src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt2
-rw-r--r--src/main/resources/templates/configs.html6
7 files changed, 50 insertions, 12 deletions
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
index 8d71b62..83cc568 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt
@@ -95,11 +95,6 @@ class AppConfiguration {
}
@Bean
- fun statisticsUpdateProducer(): Sinks.Many<Any> {
- return Sinks.many().multicast().directBestEffort()
- }
-
- @Bean
fun transformationService(
objectMapper: ObjectMapper,
configProperties: AppConfigProperties
@@ -119,5 +114,15 @@ class AppConfiguration {
.build()
}
+ @Bean
+ fun statisticsUpdateProducer(): Sinks.Many<Any> {
+ return Sinks.many().multicast().directBestEffort()
+ }
+
+ @Bean
+ fun configsUpdateProducer(): Sinks.Many<Boolean> {
+ return Sinks.many().multicast().directBestEffort()
+ }
+
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
index 15ed798..68b86b2 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt
@@ -38,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.retry.support.RetryTemplate
+import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
@@ -81,8 +82,8 @@ class AppKafkaConfiguration {
}
@Bean
- fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>): ConnectionCheckService {
- return KafkaConnectionCheckService(consumerFactory.createConsumer())
+ fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService {
+ return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer)
}
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
index 64e91e7..eea5724 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt
@@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
import org.springframework.retry.support.RetryTemplate
import org.springframework.web.client.RestTemplate
+import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
@@ -64,9 +65,10 @@ class AppRestConfiguration {
@Bean
fun connectionCheckService(
restTemplate: RestTemplate,
- restTargetProperties: RestTargetProperties
+ restTargetProperties: RestTargetProperties,
+ configsUpdateProducer: Sinks.Many<Boolean>
): ConnectionCheckService {
- return RestConnectionCheckService(restTemplate, restTargetProperties)
+ return RestConnectionCheckService(restTemplate, restTargetProperties, configsUpdateProducer)
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt
index d109326..54f25b3 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt
@@ -24,9 +24,11 @@ import dev.dnpm.etl.processor.config.RestTargetProperties
import jakarta.annotation.PostConstruct
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.errors.TimeoutException
+import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.HttpStatus
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.web.client.RestTemplate
+import reactor.core.publisher.Sinks
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@@ -37,7 +39,9 @@ interface ConnectionCheckService {
}
class KafkaConnectionCheckService(
- private val consumer: Consumer<String, String>
+ private val consumer: Consumer<String, String>,
+ @Qualifier("configsUpdateProducer")
+ private val configsUpdateProducer: Sinks.Many<Boolean>
) : ConnectionCheckService {
private var connectionAvailable: Boolean = false
@@ -51,6 +55,7 @@ class KafkaConnectionCheckService(
} catch (e: TimeoutException) {
false
}
+ configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST)
}
override fun connectionAvailable(): Boolean {
@@ -61,7 +66,9 @@ class KafkaConnectionCheckService(
class RestConnectionCheckService(
private val restTemplate: RestTemplate,
- private val restTargetProperties: RestTargetProperties
+ private val restTargetProperties: RestTargetProperties,
+ @Qualifier("configsUpdateProducer")
+ private val configsUpdateProducer: Sinks.Many<Boolean>
) : ConnectionCheckService {
private var connectionAvailable: Boolean = false
@@ -77,6 +84,7 @@ class RestConnectionCheckService(
} catch (e: Exception) {
false
}
+ configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST)
}
override fun connectionAvailable(): Boolean {
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt
index 18bf8d7..be291ea 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt
@@ -23,14 +23,21 @@ import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.services.TransformationService
+import org.springframework.beans.factory.annotation.Qualifier
+import org.springframework.http.MediaType
+import org.springframework.http.codec.ServerSentEvent
import org.springframework.stereotype.Controller
import org.springframework.ui.Model
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Sinks
@Controller
@RequestMapping(path = ["configs"])
class ConfigController(
+ @Qualifier("configsUpdateProducer")
+ private val configsUpdateProducer: Sinks.Many<Boolean>,
private val transformationService: TransformationService,
private val pseudonymGenerator: Generator,
private val mtbFileSender: MtbFileSender,
@@ -58,4 +65,13 @@ class ConfigController(
return "configs/connectionAvailable"
}
+ @GetMapping(path = ["events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
+ fun events(): Flux<ServerSentEvent<Any>> {
+ return configsUpdateProducer.asFlux().map {
+ ServerSentEvent.builder<Any>()
+ .event("connection-available").id("none").data("")
+ .build()
+ }
+ }
+
} \ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
index 546909e..74ae238 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt
@@ -22,6 +22,7 @@ package dev.dnpm.etl.processor.web
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
+import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.MediaType
import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.GetMapping
@@ -38,6 +39,7 @@ import java.time.temporal.ChronoUnit
@RestController
@RequestMapping(path = ["/statistics"])
class StatisticsRestController(
+ @Qualifier("statisticsUpdateProducer")
private val statisticsUpdateProducer: Sinks.Many<Any>,
private val requestRepository: RequestRepository
) {
diff --git a/src/main/resources/templates/configs.html b/src/main/resources/templates/configs.html
index 1d6dd20..3c3d744 100644
--- a/src/main/resources/templates/configs.html
+++ b/src/main/resources/templates/configs.html
@@ -37,7 +37,10 @@
</table>
</section>
- <section th:insert="~{configs/connectionAvailable.html}" th:hx-get="@{/configs?connectionAvailable}" hx-trigger="every 20s"></section>
+ <section hx-ext="sse" th:sse-connect="@{/configs/events}">
+ <div th:insert="~{configs/connectionAvailable.html}" th:hx-get="@{/configs?connectionAvailable}" hx-trigger="sse:connection-available">
+ </div>
+ </section>
<section>
<h2><span th:if="${not transformations.isEmpty()}">✅</span><span th:if="${transformations.isEmpty()}">⛔</span> Transformationen</h2>
@@ -84,5 +87,6 @@
</section>
</main>
<script th:src="@{/webjars/htmx.org/dist/htmx.min.js}"></script>
+ <script th:src="@{/webjars/htmx.org/dist/ext/sse.js}"></script>
</body>
</html> \ No newline at end of file