OchRSocketService.kt

package com.blizzard.content.filter.chatreplay.sink


import com.blizzard.content.filter.chatreplay.config.EXECUTION

import com.blizzard.content.filter.chatreplay.config.ExternalServices

import com.blizzard.content.filter.chatreplay.config.INTERACTION

import com.blizzard.content.filter.chatreplay.config.METRICS_PREFIX

import com.blizzard.content.filter.chatreplay.config.MODE

import com.blizzard.content.filter.chatreplay.config.SERVICETARGET

import com.blizzard.content.filter.chatreplay.config.TRANSFORM

import com.blizzard.content.filter.chatreplay.config.TRANSPORT

import com.blizzard.content.filter.chatreplay.domain.TransformResponse

import com.blizzard.content.filter.chatreplay.domain.failedTransformResponse

import com.blizzard.content.filter.chatreplay.domain.log

import com.blizzard.content.filter.chatreplay.domain.parseId

import com.blizzard.content.filter.chatreplay.metrics.toMap

import com.blizzard.content.filter.chatreplay.metrics.toTags

import com.blizzard.content.filter.chatreplay.source.LogStreamService

import com.newrelic.api.agent.NewRelic

import io.micrometer.core.instrument.MeterRegistry

import io.micrometer.core.instrument.Metrics

import io.micrometer.core.instrument.Tag

import io.micrometer.core.instrument.Tags

import kotlinx.coroutines.CoroutineExceptionHandler

import kotlinx.coroutines.CoroutineScope

import kotlinx.coroutines.Dispatchers

import kotlinx.coroutines.SupervisorJob

import kotlinx.coroutines.flow.catch

import kotlinx.coroutines.flow.flowOn

import kotlinx.coroutines.flow.launchIn

import kotlinx.coroutines.flow.onCompletion

import kotlinx.coroutines.flow.onEach

import kotlinx.coroutines.flow.onEmpty

import kotlinx.coroutines.flow.onStart

import kotlinx.coroutines.flow.retryWhen

import kotlinx.coroutines.flow.transform

import kotlinx.coroutines.launch

import kotlinx.coroutines.reactive.awaitFirstOrNull

import kotlinx.coroutines.reactor.awaitSingle

import org.slf4j.LoggerFactory

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty

import org.springframework.boot.context.event.ApplicationReadyEvent

import org.springframework.context.event.EventListener

import org.springframework.core.NestedRuntimeException

import org.springframework.messaging.rsocket.RSocketRequester

import org.springframework.messaging.rsocket.retrieveFlow

import org.springframework.stereotype.Service

import org.springframework.util.MimeTypeUtils

import reactor.core.publisher.Flux

import reactor.core.publisher.Mono

import reactor.util.retry.Retry

import java.time.Duration.ofMillis

import kotlin.random.Random


const val ROUTE_TRANSFORM_CHANNEL = "replace-channel-controller"

const val ROUTE_TRANSFORM_REQUEST_RESPONSE = "replace-request-response-controller"

const val PATH_RSOCKET = "/rsocket-api"


/**

* - [https://rsocket.io]

* - [https://docs.spring.io/spring-boot/docs/2.4.3/reference/html/spring-boot-features.html#boot-features-rsocket]

* - [https://docs.spring.io/spring/docs/5.2.9.RELEASE/spring-framework-reference/web-reactive.html#rsocket-spring]

*

* See file rest-client/metrics.http

*/

@Service

@ConditionalOnProperty("chat-replay.subscribers.rsocket-service.enable")

class OchRSocketService(

private val rSocketReqesterBuilder: RSocketRequester.Builder,

private val logStreamService: LogStreamService,

private val registry: MeterRegistry,

externalProperties: ExternalServices,

) {


private val cff = externalProperties.och.contentFilterFunctions


private val log = LoggerFactory.getLogger(OchRSocketService::class.java)


private val localTags = Tags.of(Tag.of(EXECUTION, "remote"), Tag.of(SERVICETARGET, "och"), Tag.of(TRANSPORT, "rsocket"), Tag.of(MODE, "streamlistener"), Tag.of(INTERACTION, "request-channel"))


private val transformedCounter = Metrics.counter(TRANSFORM, localTags.and("transformed", "true"))

private val untransformedCounter = Metrics.counter(TRANSFORM, localTags.and("transformed", "false"))

private val retryCounter = Metrics.counter("$METRICS_PREFIX.rsocket.request-channel.retry", localTags)

private val logStreamRetryCounter = Metrics.counter("$METRICS_PREFIX.rsocket.request-channel.logstream.retry", localTags)


private val rSocketScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.IO + CoroutineExceptionHandler { _, e ->

log.error("Exception from rSocketScope handler: ${e.message}")

})


private val handlingScope: CoroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineExceptionHandler { _, e ->

log.error("Exception from handlingScope handler: ${e.message}")

})


private var requesterMono = Mono.empty<RSocketRequester>()


@EventListener(condition = "@environment.getProperty('chat-replay.subscribers.rsocket-service.enable')")

fun onApplicationEvent(readyEvent: ApplicationReadyEvent) {

log.info("ApplicationReady: ${javaClass.simpleName}")

launchChannelConnection()

}


private fun requesterMono(): Mono<RSocketRequester> {

log.info("Caching RSocketRequester for REQUEST_CHANNEL")

return Mono.just(rSocketReqesterBuilder

.rsocketConnector { it.configureConnector(log, localTags, registry) }

.dataMimeType(MimeTypeUtils.APPLICATION_JSON)

.websocket(resolveWsUri(cff).also {

log.info("RSocket REQUEST_CHANNEL connecting to $it")

}))

}


private fun launchChannelConnection() =

rSocketScope.launch {

log.info("Starting RSocket Channel connection...")

/* If we initialize this connection before app is ready, we can "steal" the

connection from the previously running instance. And if this app fails to start up,

no one is happy.

*/

if (requesterMono.awaitFirstOrNull() == null) {

requesterMono = requesterMono()

}


requesterMono.awaitSingle()

.route(ROUTE_TRANSFORM_CHANNEL)

.data(logStreamService.logStream()

.retryWhen(Retry.backoff(Long.MAX_VALUE, ofMillis(100)).doBeforeRetry {

log.error("Retrying [logStream] attempt [${it.totalRetries()}] due to ${it.failure()} : ${it.failure().message}")

logStreamRetryCounter.increment()

})

.map { chatLog -> chatLog.text ?: "" }.addReactiveMetrics())


/* End of RSocket config, beginning of Kotlin Suspending Flow processing */


.retrieveFlow<TransformResponse>()

.retryWhen { cause, attempt ->

log.error("Retrying attempt [$attempt] due to $cause : ${cause.message}")

retryCounter.increment()

true

}

.flowOn(Dispatchers.IO)

.onEach(::incrementCounters)

.onEach { if (log.isDebugEnabled) log.debug("Handling : ${it.parseId()} ${it.log()}") }


.transform {

try {

//occasionallyThrowException(/* To demonstrate error-handling */)

emit(it)

} catch (o_O: Exception) {

recordError(o_O, "Error: ${o_O.message}")

// Skip, continue

}

}

.catch /*Some other exception */{ o_O ->

recordError(o_O, "Actual exception: ${o_O.message}")

emit(failedTransformResponse)

}

.onStart { log.info("RSocket REQUEST_CHANNEL connected to ${cff.host}:${cff.port}$PATH_RSOCKET") }

.onCompletion { log.info("Stream complete") }

.onEmpty { log.info("onEmpty") }

.launchIn(handlingScope)

}


private fun occasionallyThrowException(): Unit =

if (Random.nextInt(10) == 0) throw RuntimeException("Random Exception") else Unit


private fun incrementCounters(response: TransformResponse) {

when {

response.wasModified -> transformedCounter.increment()

else -> untransformedCounter.increment()

}

}


private fun recordError(t: Throwable, s: String) {

log.error("$s: ${t.message}")

val contextTags: MutableMap<String, String> = mutableMapOf(

"class" to t.javaClass.simpleName,

"message" to "${t.message}",

)

if (t is NestedRuntimeException) {

contextTags += mutableMapOf(

"cause.class" to t.mostSpecificCause.javaClass.simpleName,

"cause.message" to "${t.mostSpecificCause.message}",

)

}

Metrics.counter("$TRANSFORM.error", localTags + contextTags.toTags()).increment()

NewRelic.noticeError(t, localTags.toMap() + contextTags)

}



/**

* - [https://projectreactor.io/docs/core/milestone/reference/#_publisher_metrics]

* - [https://projectreactor.io/docs/core/milestone/reference/#metrics]

*

* These metrics result in metric names of:

*

* name + ".onNext.delay"

* name + ".requested"

* name + ".subscribed"

*

*/

private fun <T : Any> Flux<T>.addReactiveMetrics(): Flux<T> {

var flux = name(TRANSFORM)

localTags.forEach {

flux = flux.tag(it.key, it.value)

}

return flux.metrics()

}


}