LogStreamService.kt

package com.blizzard.content.filter.chatreplay.source


import com.blizzard.content.filter.chatreplay.config.CHATLOG_PREFIX

import com.blizzard.content.filter.chatreplay.config.METRICS_PREFIX

import com.blizzard.content.filter.chatreplay.config.SourceProperties

import com.blizzard.content.filter.chatreplay.config.flashbackInstant

import com.blizzard.content.filter.chatreplay.domain.ChatLog

import com.blizzard.content.filter.chatreplay.domain.hadOcurredAt

import io.micrometer.core.instrument.Counter

import io.micrometer.core.instrument.Gauge

import io.micrometer.core.instrument.MeterRegistry

import io.micrometer.core.instrument.Metrics

import io.micrometer.core.instrument.Tags

import org.jetbrains.annotations.NotNull

import org.slf4j.LoggerFactory

import org.springframework.beans.factory.annotation.Value

import org.springframework.boot.context.event.ApplicationReadyEvent

import org.springframework.context.event.EventListener

import org.springframework.stereotype.Service

import reactor.core.Disposable

import reactor.core.publisher.Flux

import reactor.core.publisher.Sinks

import reactor.core.scheduler.Schedulers

import reactor.util.retry.Retry

import java.time.Duration.ofMillis

import java.util.concurrent.ConcurrentLinkedQueue

import kotlin.random.Random.Default.nextDouble


@Service

class LogStreamService(

meterRegistry: MeterRegistry,

private val sourceProperties: SourceProperties,

@Value("\${git.commit.id}")

private val commitId: String// = "unknown"

) {


private val log = LoggerFactory.getLogger(javaClass)

private val driverLog = LoggerFactory.getLogger("${javaClass.name}.Driver")


private var logDriver: Disposable? = null


private val logStorage = ConcurrentLinkedQueue<ChatLog>()

private val driverScheduler = Schedulers.newBoundedElastic(2, 1000, "driver")

private val multiScheduler = Schedulers.newBoundedElastic(10, 1000, "multic")

private val logSink: Sinks.Many<ChatLog> = Sinks.many().multicast().directBestEffort()


private val publishCounter: Counter = meterRegistry.counter("$CHATLOG_PREFIX.logstream.published")


init {

Gauge.builder("$CHATLOG_PREFIX.fetch.queuesize", logStorage::size)

.description("Count of chatlogs collected, waiting to be processed.")

.register(meterRegistry)


// Increment it to some random value 0-100. Will probably be different from last restart

meterRegistry

.counter("$METRICS_PREFIX.application.restart", Tags.of("commitid", commitId))

.increment((nextDouble(0.0, 100.0) + 20.0).also {

log.info("Reporting restart: $it $commitId")

})

}


@EventListener

fun onApplicationEvent(@Suppress("UNUSED_PARAMETER") readyEvent: ApplicationReadyEvent) {

// See function comments

startDriver()

}


/**

* This flux interval drives sink emission

*/

private fun startDriver() {

Metrics.counter("$METRICS_PREFIX.logstream.driver.start").increment()


logDriver = Flux.interval(ofMillis(50))

.publishOn(driverScheduler)

.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100))

.doBeforeRetry {

driverLog.error("Driver Retrying attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")

Metrics.counter("$METRICS_PREFIX.logstream.driver.retry").increment()

})

.subscribe {

while (logStorage.peek()?.hadOccurred() == true) {

logSink.emitNext(logStorage.poll()) { _, _ -> true }

publishCounter.increment()

}

}


driverLog.info("ChatLog driver started.")

}


@Synchronized

fun logStream(): Flux<ChatLog> {

return logSink

.asFlux()

.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100))

.doBeforeRetry {

log.error("Retrying attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")

Metrics.counter("$CHATLOG_PREFIX.logstream.retry").increment()

})

.publishOn(multiScheduler)

}



@Synchronized

fun addAll(@NotNull incomingLogs: Collection<ChatLog>) {

// If, somehow, incomingLogs are empty

if (incomingLogs.isEmpty()) {

log.error("No chatLogs received to add. Queue size: ${logStorage.size}")

return

}


val sortedIncomingLogs = incomingLogs.sorted()


/*

In the infrequent case where incoming logs (which should already be in the queue) are

just now arriving (ie on app startup, if the first query takes significantly longer than the second),

then the arrangement will resemble this example:

last() |=========> peek() first()[0]

logStorage: 9 8 7 6 5

incomingLogs: 4 3 2 1 0 ^

Earlier logs are being inserted "after" later logs.

Time will need to pass to "9" before logs with timestamp "0" will be processed.

Unless the queue is checked and sorted first.

*/

if (logStorage.isNotEmpty() && sortedIncomingLogs.last().timestamp < logStorage.first().timestamp) {

val totalBefore = incomingLogs.size + logStorage.size

log.warn("Inverted insertion order detected in logStorage. Correcting...")

val fromLogStorage = logStorage.sorted()

logStorage.clear()

logStorage.addAll((fromLogStorage + sortedIncomingLogs).sorted())

when (logStorage.size) {

totalBefore -> log.info("Correction completed. Sizes match")

else -> log.error("Correction completed. Sizes do not match.")

}

} else {

logStorage.addAll(sortedIncomingLogs)

}


log.info("Added ${incomingLogs.size} chatLogs. New queue size: ${logStorage.size}")

}


private fun ChatLog.hadOccurred() = hadOcurredAt(sourceProperties.flashbackInstant())


}