From 90541b20db5a243a8a13eda9b3e728aa532b2bf1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 4 Apr 2018 14:20:57 +0200 Subject: [PATCH] more efficient MDC in Typed persistence, #24787 --- .../actor/typed/javadsl/ActorLoggingTest.java | 1 + .../typed/internal/WithMdcBehavior.scala | 15 +++- .../akka/actor/typed/javadsl/Behaviors.scala | 27 ++++++-- .../akka/actor/typed/scaladsl/Behaviors.scala | 6 +- .../typed/internal/EventsourcedBehavior.scala | 23 ++++--- .../EventsourcedReplayingEvents.scala | 30 ++++---- .../EventsourcedReplayingSnapshot.scala | 28 +++----- ...EventsourcedRequestingRecoveryPermit.scala | 20 +++--- .../typed/internal/EventsourcedRunning.scala | 29 ++++---- .../typed/internal/EventsourcedSettings.scala | 13 +++- .../typed/internal/EventsourcedSetup.scala | 68 ++++++++++++------- .../internal/PersistentBehaviorImpl.scala | 7 +- 12 files changed, 154 insertions(+), 113 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorLoggingTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorLoggingTest.java index 6e31bcee1a..b3daf6c608 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorLoggingTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorLoggingTest.java @@ -45,6 +45,7 @@ public class ActorLoggingTest extends JUnitSuite { @Test public void loggingProvidesMDC() { Behavior behavior = Behaviors.withMdc( + null, (msg) -> { Map mdc = new HashMap<>(); mdc.put("txId", msg.getTransactionId()); diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehavior.scala index 28f0e39b6b..c86e21e621 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehavior.scala @@ -4,6 +4,8 @@ package akka.actor.typed.internal +import scala.collection.immutable.HashMap + import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.internal.adapter.AbstractLogger import akka.actor.typed.{ ActorContext, Behavior, ExtensibleBehavior, Signal } @@ -65,7 +67,7 @@ import akka.annotation.InternalApi } override def receive(ctx: ActorContext[T], msg: T): Behavior[T] = { - val mdc = staticMdc ++ mdcForMessage(msg) + val mdc = merge(staticMdc, mdcForMessage(msg)) ctx.asScala.log.asInstanceOf[AbstractLogger].mdc = mdc val next = try { @@ -76,6 +78,17 @@ import akka.annotation.InternalApi wrapWithMdc(next, ctx) } + private def merge(staticMdc: Map[String, Any], mdcForMessage: Map[String, Any]): Map[String, Any] = { + if (staticMdc.isEmpty) mdcForMessage + else if (mdcForMessage.isEmpty) staticMdc + else if (staticMdc.isInstanceOf[HashMap[String, Any]] && mdcForMessage.isInstanceOf[HashMap[String, Any]]) { + // merged is more efficient than ++ + mdcForMessage.asInstanceOf[HashMap[String, Any]].merged(staticMdc.asInstanceOf[HashMap[String, Any]])(null) + } else { + staticMdc ++ mdcForMessage + } + } + override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = { val next = Behavior.interpretSignal(behavior, ctx, signal) wrapWithMdc(next, ctx) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 100470ca71..942a9020da 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -4,6 +4,7 @@ package akka.actor.typed.javadsl +import java.util.Collections import java.util.function.{ Function ⇒ JFunction } import akka.actor.typed.{ ActorRef, Behavior, ExtensibleBehavior, Signal, SupervisorStrategy } @@ -12,7 +13,6 @@ import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.japi.function.{ Procedure2, Function2 ⇒ JapiFunction2 } import akka.japi.pf.PFBuilder import akka.util.ConstantFun - import scala.collection.JavaConverters._ import scala.reflect.ClassTag /** @@ -294,7 +294,7 @@ object Behaviors { */ def withMdc[T]( mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] = - WithMdcBehavior[T](Map.empty, message ⇒ mdcForMessage.apply(message).asScala.toMap, behavior) + withMdc(Collections.emptyMap[String, Any], mdcForMessage, behavior) /** * Static MDC (Mapped Diagnostic Context) @@ -306,7 +306,7 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T](staticMdc: java.util.Map[String, Any], behavior: Behavior[T]): Behavior[T] = - WithMdcBehavior[T](staticMdc.asScala.toMap, WithMdcBehavior.noMdcPerMessage, behavior) + withMdc(staticMdc, null, behavior) /** * Combination of static and per message MDC (Mapped Diagnostic Context). @@ -315,6 +315,8 @@ object Behaviors { * are in both the static and the per message MDC the per message one overwrites the static one * in the resulting log entries. * + * * The `staticMdc` or `mdcForMessage` may be empty. + * * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. @@ -326,10 +328,23 @@ object Behaviors { def withMdc[T]( staticMdc: java.util.Map[String, Any], mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], - behavior: Behavior[T]): Behavior[T] = + behavior: Behavior[T]): Behavior[T] = { + + def asScalaMap(m: java.util.Map[String, Any]): Map[String, Any] = { + if (m == null || m.isEmpty) Map.empty[String, Any] + else m.asScala.toMap + } + + val mdcForMessageFun: T ⇒ Map[String, Any] = + if (mdcForMessage == null) Map.empty + else { + message ⇒ asScalaMap(mdcForMessage.apply(message)) + } + WithMdcBehavior[T]( - staticMdc.asScala.toMap, - message ⇒ mdcForMessage.apply(message).asScala.toMap, + asScalaMap(staticMdc), + mdcForMessageFun, behavior) + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 5e9f3c6b28..1d37c68a3c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -213,7 +213,7 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T](mdcForMessage: T ⇒ Map[String, Any])(behavior: Behavior[T]): Behavior[T] = - WithMdcBehavior[T](Map.empty, mdcForMessage, behavior) + withMdc[T](Map.empty[String, Any], mdcForMessage)(behavior) /** * Static MDC (Mapped Diagnostic Context) @@ -225,7 +225,7 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] = - WithMdcBehavior[T](staticMdc, WithMdcBehavior.noMdcPerMessage, behavior) + withMdc[T](staticMdc, (_: T) ⇒ Map.empty[String, Any])(behavior) /** * Combination of static and per message MDC (Mapped Diagnostic Context). @@ -234,6 +234,8 @@ object Behaviors { * are in both the static and the per message MDC the per message one overwrites the static one * in the resulting log entries. * + * The `staticMdc` or `mdcForMessage` may be empty. + * * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala index b2a4cfcc67..8015ee78ef 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -29,19 +29,20 @@ private[akka] object EventsourcedBehavior { object MDC { // format: OFF - val AwaitingPermit = "get-permit" - val ReplayingSnapshot = "replay-snap" - val ReplayingEvents = "replay-evts" - val RunningCmds = "running-cmnds" - val PersistingEvents = "persist-evts" + val AwaitingPermit = "get-permit" + val ReplayingSnapshot = "replay-snap" + val ReplayingEvents = "replay-evts" + val RunningCmds = "running-cmnds" + val PersistingEvents = "persist-evts" // format: ON - } - def withMdc(setup: EventsourcedSetup[_, _, _], phaseName: String)(b: Behavior[InternalProtocol]): Behavior[InternalProtocol] = - Behaviors.withMdc(Map( - "persistenceId" → setup.persistenceId, - "phase" → phaseName - ))(b) + def create(persistenceId: String, phaseName: String): Map[String, Any] = { + Map( + "persistenceId" → persistenceId, + "phase" → phaseName + ) + } + } /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ sealed trait InternalProtocol diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 58ad9d0530..e976dc81de 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -37,45 +37,41 @@ private[persistence] object EventsourcedReplayingEvents { private[persistence] final case class ReplayingState[State]( seqNr: Long, state: State, - eventSeenInInterval: Boolean = false + eventSeenInInterval: Boolean, + toSeqNr: Long ) def apply[C, E, S]( setup: EventsourcedSetup[C, E, S], state: ReplayingState[S] ): Behavior[InternalProtocol] = - new EventsourcedReplayingEvents(setup).createBehavior(state) + new EventsourcedReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state) } @InternalApi private[persistence] class EventsourcedReplayingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S]) extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { - import setup.context.log import EventsourcedReplayingEvents.ReplayingState def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = { Behaviors.setup { _ ⇒ startRecoveryTimer(setup.timers, setup.settings.recoveryEventTimeout) - replayEvents(state.seqNr + 1L, setup.recovery.toSequenceNr) + replayEvents(state.seqNr + 1L, state.toSeqNr) - withMdc(setup, MDC.ReplayingEvents) { - stay(state) - } + stay(state) } } private def stay(state: ReplayingState[S]): Behavior[InternalProtocol] = - withMdc(setup, MDC.ReplayingEvents) { - Behaviors.receiveMessage[InternalProtocol] { - case JournalResponse(r) ⇒ onJournalResponse(state, r) - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case RecoveryTickEvent(snap) ⇒ onRecoveryTick(state, snap) - case cmd: IncomingCommand[C] ⇒ onCommand(cmd) - case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit - }.receiveSignal(returnPermitOnStop) - } + Behaviors.receiveMessage[InternalProtocol] { + case JournalResponse(r) ⇒ onJournalResponse(state, r) + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case RecoveryTickEvent(snap) ⇒ onRecoveryTick(state, snap) + case cmd: IncomingCommand[C] ⇒ onCommand(cmd) + case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit + }.receiveSignal(returnPermitOnStop) private def onJournalResponse( state: ReplayingState[S], @@ -95,7 +91,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set case NonFatal(ex) ⇒ onRecoveryFailure(ex, repr.sequenceNr, Some(event)) } case RecoverySuccess(highestSeqNr) ⇒ - log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr) + setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr) cancelRecoveryTimer(setup.timers) onRecoveryCompleted(state) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala index b1cfadc330..93063b48ad 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala @@ -32,7 +32,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior._ private[akka] object EventsourcedReplayingSnapshot { def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] = - new EventsourcedReplayingSnapshot(setup).createBehavior() + new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior() } @@ -45,15 +45,13 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr) - withMdc(setup, MDC.ReplayingSnapshot) { - Behaviors.receiveMessage[InternalProtocol] { - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case JournalResponse(r) ⇒ onJournalResponse(r) - case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot) - case cmd: IncomingCommand[C] ⇒ onCommand(cmd) - case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit - }.receiveSignal(returnPermitOnStop) - } + Behaviors.receiveMessage[InternalProtocol] { + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot) + case cmd: IncomingCommand[C] ⇒ onCommand(cmd) + case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit + }.receiveSignal(returnPermitOnStop) } /** @@ -121,15 +119,9 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { cancelRecoveryTimer(setup.timers) - val rec = setup.recovery.copy( - toSequenceNr = toSnr, - fromSnapshot = SnapshotSelectionCriteria.None - ) - EventsourcedReplayingEvents[C, E, S]( - setup.copy(recovery = rec), - // setup.internalStash, // TODO move it out of setup - EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state) + setup, + EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr) ) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index 3a0cf305cb..7d54ead639 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -8,6 +8,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.persistence.typed.internal.EventsourcedBehavior.MDC /** * INTERNAL API @@ -23,7 +24,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol private[akka] object EventsourcedRequestingRecoveryPermit { def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] = - new EventsourcedRequestingRecoveryPermit(setup).createBehavior() + new EventsourcedRequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior() } @@ -35,18 +36,13 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s // request a permit, as only once we obtain one we can start replaying requestRecoveryPermit() - Behaviors.withMdc(Map( - "persistenceId" → setup.persistenceId, - "phase" → "awaiting-permit" - )) { - Behaviors.receiveMessage[InternalProtocol] { - case InternalProtocol.RecoveryPermitGranted ⇒ - becomeReplaying() + Behaviors.receiveMessage[InternalProtocol] { + case InternalProtocol.RecoveryPermitGranted ⇒ + becomeReplaying() - case other ⇒ - stash(other) - Behaviors.same - } + case other ⇒ + stash(other) + Behaviors.same } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index ce96be8693..6d0216d3f7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -55,7 +55,7 @@ private[akka] object EventsourcedRunning { } def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = - new EventsourcedRunning(setup).handlingCommands(state) + new EventsourcedRunning(setup.setMdc(MDC.RunningCmds)).handlingCommands(state) } // =============================================== @@ -66,11 +66,11 @@ private[akka] object EventsourcedRunning { extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { import EventsourcedRunning.EventsourcedState - import EventsourcedBehavior.withMdc - - private def log = setup.log private def commandContext = setup.commandContext + private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds) + private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) + def handlingCommands(state: EventsourcedState[S]): Behavior[InternalProtocol] = { def onCommand(state: EventsourcedState[S], cmd: C): Behavior[InternalProtocol] = { @@ -84,8 +84,10 @@ private[akka] object EventsourcedRunning { effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil ): Behavior[InternalProtocol] = { - if (log.isDebugEnabled) - log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size) + if (setup.log.isDebugEnabled) + setup.log.debug( + s"Handled command [{}], resulting effect: [{}], side effects: [{}]", + msg.getClass.getName, effect, sideEffects.size) effect match { case CompositeEffect(eff, currentSideEffects) ⇒ @@ -146,11 +148,11 @@ private[akka] object EventsourcedRunning { if (tags.isEmpty) event else Tagged(event, tags) } - withMdc(setup, MDC.RunningCmds) { - Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] { - case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) - case _ ⇒ Behaviors.unhandled - } + setup.setMdc(runningCmdsMdc) + + Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] { + case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) + case _ ⇒ Behaviors.unhandled } } @@ -163,9 +165,8 @@ private[akka] object EventsourcedRunning { shouldSnapshotAfterPersist: Boolean, sideEffects: immutable.Seq[ChainableEffect[_, S]] ): Behavior[InternalProtocol] = { - withMdc(setup, MDC.PersistingEvents) { - new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) - } + setup.setMdc(persistingEventsMdc) + new PersistingEvents(state, numberOfEvents, shouldSnapshotAfterPersist, sideEffects) } class PersistingEvents( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala index 88dd44bca2..e1c98dd1df 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala @@ -29,7 +29,10 @@ private[akka] trait EventsourcedSettings { def withSnapshotPluginId(id: String): EventsourcedSettings } -object EventsourcedSettings { +/** + * INTERNAL API + */ +@InternalApi private[akka] object EventsourcedSettings { def apply(system: ActorSystem[_]): EventsourcedSettings = apply(system.settings.config) @@ -77,10 +80,14 @@ private[persistence] final case class EventsourcedSettingsImpl( snapshotPluginId: String ) extends EventsourcedSettings { - def withJournalPluginId(id: String): EventsourcedSettings = + def withJournalPluginId(id: String): EventsourcedSettings = { + require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") copy(journalPluginId = id) - def withSnapshotPluginId(id: String): EventsourcedSettings = + } + def withSnapshotPluginId(id: String): EventsourcedSettings = { + require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") copy(snapshotPluginId = id) + } private val journalConfig = EventsourcedSettings.journalConfigFor(config, journalPluginId) val recoveryEventTimeout = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index 79ede40df4..eaab929054 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -4,50 +4,41 @@ package akka.persistence.typed.internal +import akka.actor.typed.Logger import akka.actor.{ ActorRef, ExtendedActorSystem } import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler } import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.internal.EventsourcedBehavior.MDC import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.util.Collections.EmptyImmutableSeq +import akka.util.OptionVal /** * INTERNAL API: Carry state for the Persistent behavior implementation behaviors */ @InternalApi -private[persistence] final case class EventsourcedSetup[C, E, S]( - context: ActorContext[InternalProtocol], - timers: TimerScheduler[InternalProtocol], - persistenceId: String, - initialState: S, - commandHandler: PersistentBehaviors.CommandHandler[C, E, S], - eventHandler: (S, E) ⇒ S, - writerIdentity: WriterIdentity, - recoveryCompleted: (ActorContext[C], S) ⇒ Unit, - tagger: E ⇒ Set[String], - snapshotWhen: (S, E, Long) ⇒ Boolean, - recovery: Recovery, +private[persistence] final class EventsourcedSetup[C, E, S]( + val context: ActorContext[InternalProtocol], + val timers: TimerScheduler[InternalProtocol], + val persistenceId: String, + val initialState: S, + val commandHandler: PersistentBehaviors.CommandHandler[C, E, S], + val eventHandler: (S, E) ⇒ S, + val writerIdentity: WriterIdentity, + val recoveryCompleted: (ActorContext[C], S) ⇒ Unit, + val tagger: E ⇒ Set[String], + val snapshotWhen: (S, E, Long) ⇒ Boolean, + val recovery: Recovery, var holdingRecoveryPermit: Boolean, - settings: EventsourcedSettings, - internalStash: StashBuffer[InternalProtocol] + val settings: EventsourcedSettings, + val internalStash: StashBuffer[InternalProtocol] ) { import akka.actor.typed.scaladsl.adapter._ - def withJournalPluginId(id: String): EventsourcedSetup[C, E, S] = { - require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") - copy(settings = settings.withJournalPluginId(id)) - } - - def withSnapshotPluginId(id: String): EventsourcedSetup[C, E, S] = { - require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") - copy(settings = settings.withSnapshotPluginId(id)) - } - def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]] - def log = context.log - val persistence: Persistence = Persistence(context.system.toUntyped) val journal: ActorRef = persistence.journalFor(settings.journalPluginId) @@ -61,5 +52,30 @@ private[persistence] final case class EventsourcedSetup[C, E, S]( def selfUntyped = context.self.toUntyped + private var mdc: Map[String, Any] = Map.empty + private var _log: OptionVal[Logger] = OptionVal.Some(context.log) // changed when mdc is changed + def log: Logger = { + _log match { + case OptionVal.Some(l) ⇒ l + case OptionVal.None ⇒ + // lazy init if mdc changed + val l = context.log.withMdc(mdc) + _log = OptionVal.Some(l) + l + } + } + + def setMdc(newMdc: Map[String, Any]): EventsourcedSetup[C, E, S] = { + mdc = newMdc + // mdc is changed often, for each persisted event, but logging is rare, so lazy init of Logger + _log = OptionVal.None + this + } + + def setMdc(phaseName: String): EventsourcedSetup[C, E, S] = { + setMdc(MDC.create(persistenceId, phaseName)) + this + } + } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index 5a593abfc3..c6b8058364 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -31,7 +31,9 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ Behaviors.withTimers { timers ⇒ val settings = EventsourcedSettings(ctx.system) - val setup = EventsourcedSetup( + .withJournalPluginId(journalPluginId.getOrElse("")) + .withSnapshotPluginId(snapshotPluginId.getOrElse("")) + val setup = new EventsourcedSetup( ctx, timers, persistenceId, @@ -46,8 +48,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( holdingRecoveryPermit = false, settings = settings, internalStash = StashBuffer(settings.stashCapacity) - ).withJournalPluginId(journalPluginId.getOrElse("")) - .withSnapshotPluginId(snapshotPluginId.getOrElse("")) + ) EventsourcedRequestingRecoveryPermit(setup) }