more efficient MDC in Typed persistence, #24787

This commit is contained in:
Patrik Nordwall 2018-04-04 14:20:57 +02:00 committed by Johan Andrén
parent d986c7b6bb
commit 90541b20db
12 changed files with 154 additions and 113 deletions

View file

@ -45,6 +45,7 @@ public class ActorLoggingTest extends JUnitSuite {
@Test
public void loggingProvidesMDC() {
Behavior<Protocol> behavior = Behaviors.withMdc(
null,
(msg) -> {
Map<String, Object> mdc = new HashMap<>();
mdc.put("txId", msg.getTransactionId());

View file

@ -4,6 +4,8 @@
package akka.actor.typed.internal
import scala.collection.immutable.HashMap
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.internal.adapter.AbstractLogger
import akka.actor.typed.{ ActorContext, Behavior, ExtensibleBehavior, Signal }
@ -65,7 +67,7 @@ import akka.annotation.InternalApi
}
override def receive(ctx: ActorContext[T], msg: T): Behavior[T] = {
val mdc = staticMdc ++ mdcForMessage(msg)
val mdc = merge(staticMdc, mdcForMessage(msg))
ctx.asScala.log.asInstanceOf[AbstractLogger].mdc = mdc
val next =
try {
@ -76,6 +78,17 @@ import akka.annotation.InternalApi
wrapWithMdc(next, ctx)
}
private def merge(staticMdc: Map[String, Any], mdcForMessage: Map[String, Any]): Map[String, Any] = {
if (staticMdc.isEmpty) mdcForMessage
else if (mdcForMessage.isEmpty) staticMdc
else if (staticMdc.isInstanceOf[HashMap[String, Any]] && mdcForMessage.isInstanceOf[HashMap[String, Any]]) {
// merged is more efficient than ++
mdcForMessage.asInstanceOf[HashMap[String, Any]].merged(staticMdc.asInstanceOf[HashMap[String, Any]])(null)
} else {
staticMdc ++ mdcForMessage
}
}
override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
val next = Behavior.interpretSignal(behavior, ctx, signal)
wrapWithMdc(next, ctx)

View file

@ -4,6 +4,7 @@
package akka.actor.typed.javadsl
import java.util.Collections
import java.util.function.{ Function JFunction }
import akka.actor.typed.{ ActorRef, Behavior, ExtensibleBehavior, Signal, SupervisorStrategy }
@ -12,7 +13,6 @@ import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.japi.function.{ Procedure2, Function2 JapiFunction2 }
import akka.japi.pf.PFBuilder
import akka.util.ConstantFun
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
/**
@ -294,7 +294,7 @@ object Behaviors {
*/
def withMdc[T](
mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] =
WithMdcBehavior[T](Map.empty, message mdcForMessage.apply(message).asScala.toMap, behavior)
withMdc(Collections.emptyMap[String, Any], mdcForMessage, behavior)
/**
* Static MDC (Mapped Diagnostic Context)
@ -306,7 +306,7 @@ object Behaviors {
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](staticMdc: java.util.Map[String, Any], behavior: Behavior[T]): Behavior[T] =
WithMdcBehavior[T](staticMdc.asScala.toMap, WithMdcBehavior.noMdcPerMessage, behavior)
withMdc(staticMdc, null, behavior)
/**
* Combination of static and per message MDC (Mapped Diagnostic Context).
@ -315,6 +315,8 @@ object Behaviors {
* are in both the static and the per message MDC the per message one overwrites the static one
* in the resulting log entries.
*
* * The `staticMdc` or `mdcForMessage` may be empty.
*
* @param staticMdc A static MDC applied for each message
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.
@ -326,10 +328,23 @@ object Behaviors {
def withMdc[T](
staticMdc: java.util.Map[String, Any],
mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]],
behavior: Behavior[T]): Behavior[T] =
behavior: Behavior[T]): Behavior[T] = {
def asScalaMap(m: java.util.Map[String, Any]): Map[String, Any] = {
if (m == null || m.isEmpty) Map.empty[String, Any]
else m.asScala.toMap
}
val mdcForMessageFun: T Map[String, Any] =
if (mdcForMessage == null) Map.empty
else {
message asScalaMap(mdcForMessage.apply(message))
}
WithMdcBehavior[T](
staticMdc.asScala.toMap,
message mdcForMessage.apply(message).asScala.toMap,
asScalaMap(staticMdc),
mdcForMessageFun,
behavior)
}
}

View file

@ -213,7 +213,7 @@ object Behaviors {
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](mdcForMessage: T Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
WithMdcBehavior[T](Map.empty, mdcForMessage, behavior)
withMdc[T](Map.empty[String, Any], mdcForMessage)(behavior)
/**
* Static MDC (Mapped Diagnostic Context)
@ -225,7 +225,7 @@ object Behaviors {
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
WithMdcBehavior[T](staticMdc, WithMdcBehavior.noMdcPerMessage, behavior)
withMdc[T](staticMdc, (_: T) Map.empty[String, Any])(behavior)
/**
* Combination of static and per message MDC (Mapped Diagnostic Context).
@ -234,6 +234,8 @@ object Behaviors {
* are in both the static and the per message MDC the per message one overwrites the static one
* in the resulting log entries.
*
* The `staticMdc` or `mdcForMessage` may be empty.
*
* @param staticMdc A static MDC applied for each message
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.

View file

@ -29,19 +29,20 @@ private[akka] object EventsourcedBehavior {
object MDC {
// format: OFF
val AwaitingPermit = "get-permit"
val ReplayingSnapshot = "replay-snap"
val ReplayingEvents = "replay-evts"
val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts"
val AwaitingPermit = "get-permit"
val ReplayingSnapshot = "replay-snap"
val ReplayingEvents = "replay-evts"
val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts"
// format: ON
}
def withMdc(setup: EventsourcedSetup[_, _, _], phaseName: String)(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] =
Behaviors.withMdc(Map(
"persistenceId" setup.persistenceId,
"phase" phaseName
))(b)
def create(persistenceId: String, phaseName: String): Map[String, Any] = {
Map(
"persistenceId" persistenceId,
"phase" phaseName
)
}
}
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
sealed trait InternalProtocol

View file

@ -37,45 +37,41 @@ private[persistence] object EventsourcedReplayingEvents {
private[persistence] final case class ReplayingState[State](
seqNr: Long,
state: State,
eventSeenInInterval: Boolean = false
eventSeenInInterval: Boolean,
toSeqNr: Long
)
def apply[C, E, S](
setup: EventsourcedSetup[C, E, S],
state: ReplayingState[S]
): Behavior[InternalProtocol] =
new EventsourcedReplayingEvents(setup).createBehavior(state)
new EventsourcedReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state)
}
@InternalApi
private[persistence] class EventsourcedReplayingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import setup.context.log
import EventsourcedReplayingEvents.ReplayingState
def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = {
Behaviors.setup { _
startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout)
replayEvents(state.seqNr + 1L, setup.recovery.toSequenceNr)
replayEvents(state.seqNr + 1L, state.toSeqNr)
withMdc(setup, MDC.ReplayingEvents) {
stay(state)
}
stay(state)
}
}
private def stay(state: ReplayingState[S]): Behavior[InternalProtocol] =
withMdc(setup, MDC.ReplayingEvents) {
Behaviors.receiveMessage[InternalProtocol] {
case JournalResponse(r) onJournalResponse(state, r)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case RecoveryTickEvent(snap) onRecoveryTick(state, snap)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
}
Behaviors.receiveMessage[InternalProtocol] {
case JournalResponse(r) onJournalResponse(state, r)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case RecoveryTickEvent(snap) onRecoveryTick(state, snap)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
private def onJournalResponse(
state: ReplayingState[S],
@ -95,7 +91,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
case NonFatal(ex) onRecoveryFailure(ex, repr.sequenceNr, Some(event))
}
case RecoverySuccess(highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
cancelRecoveryTimer(setup.timers)
onRecoveryCompleted(state)

View file

@ -32,7 +32,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior._
private[akka] object EventsourcedReplayingSnapshot {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] =
new EventsourcedReplayingSnapshot(setup).createBehavior()
new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior()
}
@ -45,15 +45,13 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
withMdc(setup, MDC.ReplayingSnapshot) {
Behaviors.receiveMessage[InternalProtocol] {
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
}
Behaviors.receiveMessage[InternalProtocol] {
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
}
/**
@ -121,15 +119,9 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
cancelRecoveryTimer(setup.timers)
val rec = setup.recovery.copy(
toSequenceNr = toSnr,
fromSnapshot = SnapshotSelectionCriteria.None
)
EventsourcedReplayingEvents[C, E, S](
setup.copy(recovery = rec),
// setup.internalStash, // TODO move it out of setup
EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state)
setup,
EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr)
)
}

View file

@ -8,6 +8,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.MDC
/**
* INTERNAL API
@ -23,7 +24,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
private[akka] object EventsourcedRequestingRecoveryPermit {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] =
new EventsourcedRequestingRecoveryPermit(setup).createBehavior()
new EventsourcedRequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior()
}
@ -35,18 +36,13 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s
// request a permit, as only once we obtain one we can start replaying
requestRecoveryPermit()
Behaviors.withMdc(Map(
"persistenceId" setup.persistenceId,
"phase" "awaiting-permit"
)) {
Behaviors.receiveMessage[InternalProtocol] {
case InternalProtocol.RecoveryPermitGranted
becomeReplaying()
Behaviors.receiveMessage[InternalProtocol] {
case InternalProtocol.RecoveryPermitGranted
becomeReplaying()
case other
stash(other)
Behaviors.same
}
case other
stash(other)
Behaviors.same
}
}

View file

@ -55,7 +55,7 @@ private[akka] object EventsourcedRunning {
}
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] =
new EventsourcedRunning(setup).handlingCommands(state)
new EventsourcedRunning(setup.setMdc(MDC.RunningCmds)).handlingCommands(state)
}
// ===============================================
@ -66,11 +66,11 @@ private[akka] object EventsourcedRunning {
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedRunning.EventsourcedState
import EventsourcedBehavior.withMdc
private def log = setup.log
private def commandContext = setup.commandContext
private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds)
private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents)
def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = {
def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = {
@ -84,8 +84,10 @@ private[akka] object EventsourcedRunning {
effect: EffectImpl[E, S],
sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil
): Behavior[InternalProtocol] = {
if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
if (setup.log.isDebugEnabled)
setup.log.debug(
s"Handled command [{}], resulting effect: [{}], side effects: [{}]",
msg.getClass.getName, effect, sideEffects.size)
effect match {
case CompositeEffect(eff, currentSideEffects)
@ -146,11 +148,11 @@ private[akka] object EventsourcedRunning {
if (tags.isEmpty) event else Tagged(event, tags)
}
withMdc(setup, MDC.RunningCmds) {
Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case _ Behaviors.unhandled
}
setup.setMdc(runningCmdsMdc)
Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case _ Behaviors.unhandled
}
}
@ -163,9 +165,8 @@ private[akka] object EventsourcedRunning {
shouldSnapshotAfterPersist: Boolean,
sideEffects: immutable.Seq[ChainableEffect[_, S]]
): Behavior[InternalProtocol] = {
withMdc(setup, MDC.PersistingEvents) {
new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects)
}
setup.setMdc(persistingEventsMdc)
new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects)
}
class PersistingEvents(

View file

@ -29,7 +29,10 @@ private[akka] trait EventsourcedSettings {
def withSnapshotPluginId(id: String): EventsourcedSettings
}
object EventsourcedSettings {
/**
* INTERNAL API
*/
@InternalApi private[akka] object EventsourcedSettings {
def apply(system: ActorSystem[_]): EventsourcedSettings =
apply(system.settings.config)
@ -77,10 +80,14 @@ private[persistence] final case class EventsourcedSettingsImpl(
snapshotPluginId: String
) extends EventsourcedSettings {
def withJournalPluginId(id: String): EventsourcedSettings =
def withJournalPluginId(id: String): EventsourcedSettings = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = id)
def withSnapshotPluginId(id: String): EventsourcedSettings =
}
def withSnapshotPluginId(id: String): EventsourcedSettings = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = id)
}
private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId)
val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis

View file

@ -4,50 +4,41 @@
package akka.persistence.typed.internal
import akka.actor.typed.Logger
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.MDC
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.util.Collections.EmptyImmutableSeq
import akka.util.OptionVal
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors
*/
@InternalApi
private[persistence] final case class EventsourcedSetup[C, E, S](
context: ActorContext[InternalProtocol],
timers: TimerScheduler[InternalProtocol],
persistenceId: String,
initialState: S,
commandHandler: PersistentBehaviors.CommandHandler[C, E, S],
eventHandler: (S, E) S,
writerIdentity: WriterIdentity,
recoveryCompleted: (ActorContext[C], S) Unit,
tagger: E Set[String],
snapshotWhen: (S, E, Long) Boolean,
recovery: Recovery,
private[persistence] final class EventsourcedSetup[C, E, S](
val context: ActorContext[InternalProtocol],
val timers: TimerScheduler[InternalProtocol],
val persistenceId: String,
val initialState: S,
val commandHandler: PersistentBehaviors.CommandHandler[C, E, S],
val eventHandler: (S, E) S,
val writerIdentity: WriterIdentity,
val recoveryCompleted: (ActorContext[C], S) Unit,
val tagger: E Set[String],
val snapshotWhen: (S, E, Long) Boolean,
val recovery: Recovery,
var holdingRecoveryPermit: Boolean,
settings: EventsourcedSettings,
internalStash: StashBuffer[InternalProtocol]
val settings: EventsourcedSettings,
val internalStash: StashBuffer[InternalProtocol]
) {
import akka.actor.typed.scaladsl.adapter._
def withJournalPluginId(id: String): EventsourcedSetup[C, E, S] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(settings = settings.withJournalPluginId(id))
}
def withSnapshotPluginId(id: String): EventsourcedSetup[C, E, S] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(settings = settings.withSnapshotPluginId(id))
}
def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]]
def log = context.log
val persistence: Persistence = Persistence(context.system.toUntyped)
val journal: ActorRef = persistence.journalFor(settings.journalPluginId)
@ -61,5 +52,30 @@ private[persistence] final case class EventsourcedSetup[C, E, S](
def selfUntyped = context.self.toUntyped
private var mdc: Map[String, Any] = Map.empty
private var _log: OptionVal[Logger] = OptionVal.Some(context.log) // changed when mdc is changed
def log: Logger = {
_log match {
case OptionVal.Some(l) l
case OptionVal.None
// lazy init if mdc changed
val l = context.log.withMdc(mdc)
_log = OptionVal.Some(l)
l
}
}
def setMdc(newMdc: Map[String, Any]): EventsourcedSetup[C, E, S] = {
mdc = newMdc
// mdc is changed often, for each persisted event, but logging is rare, so lazy init of Logger
_log = OptionVal.None
this
}
def setMdc(phaseName: String): EventsourcedSetup[C, E, S] = {
setMdc(MDC.create(persistenceId, phaseName))
this
}
}

View file

@ -31,7 +31,9 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers { timers
val settings = EventsourcedSettings(ctx.system)
val setup = EventsourcedSetup(
.withJournalPluginId(journalPluginId.getOrElse(""))
.withSnapshotPluginId(snapshotPluginId.getOrElse(""))
val setup = new EventsourcedSetup(
ctx,
timers,
persistenceId,
@ -46,8 +48,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
holdingRecoveryPermit = false,
settings = settings,
internalStash = StashBuffer(settings.stashCapacity)
).withJournalPluginId(journalPluginId.getOrElse(""))
.withSnapshotPluginId(snapshotPluginId.getOrElse(""))
)
EventsourcedRequestingRecoveryPermit(setup)
}