diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 96f6c8c2c4..3788ca63f3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -179,18 +179,17 @@ object Behavior { * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi - @DoNotInherit - private[akka] class DeferredBehavior[T](val factory: SAC[T] ⇒ Behavior[T]) extends Behavior[T] { - - /** start the deferred behavior */ - @throws(classOf[Exception]) - def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala) - - override def toString: String = s"Deferred(${LineNumbers(factory)})" + private[akka] abstract class DeferredBehavior[T] extends Behavior[T] { + def apply(ctx: ActorContext[T]): Behavior[T] } - object DeferredBehavior { + /** INTERNAL API */ + @InternalApi + private[akka] object DeferredBehavior { def apply[T](factory: SAC[T] ⇒ Behavior[T]) = - new DeferredBehavior[T](factory) + new DeferredBehavior[T] { + def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala) + override def toString: String = s"Deferred(${LineNumbers(factory)})" + } } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala index f09483187d..1fb850d89b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringEvents.scala @@ -85,7 +85,7 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( def onCommand(cmd: Command): Behavior[Any] = { // during recovery, stash all incoming commands - stash(cmd) + stash(context, cmd) same } @@ -111,7 +111,7 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State]( onRecoveryFailure(cause, event = None) case other ⇒ - stash(other) + stash(context, other) Behaviors.same } } catch { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala index 232d81e535..543bb352fa 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRecoveringSnapshot.scala @@ -80,7 +80,7 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State]( def onCommand(cmd: Command): Behavior[Any] = { // during recovery, stash all incoming commands - stash(cmd) + stash(context, cmd) Behavior.same } @@ -124,7 +124,7 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State]( onRecoveryFailure(cause, event = None) case other ⇒ - stash(other) + stash(context, other) same } } catch { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index 6b1d7517da..c129588df2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -80,7 +80,7 @@ private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, S becomeRecovering() case other ⇒ - stash(other) + stash(context, other) Behaviors.same } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index f19a0ed886..33bfb166fa 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -115,7 +115,7 @@ class EventsourcedRunning[Command, Event, State]( def name = "PersistingEvents" final override def onCommand(c: Command): Behavior[Any] = { - stash(c) + stash(context, c) same } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala index 416ce45c74..9c2781d0a1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -21,7 +21,6 @@ private[akka] trait EventsourcedStashManagement { protected def log: LoggingAdapter protected def extension: Persistence - protected def context: ActorContext[Any] protected val internalStash: StashBuffer[Any] @@ -43,7 +42,7 @@ private[akka] trait EventsourcedStashManagement { other // the other strategies are supported } - protected def stash(msg: Any): Unit = { + protected def stash(ctx: ActorContext[Any], msg: Any): Unit = { if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg) try internalStash.stash(msg) catch { @@ -51,7 +50,7 @@ private[akka] trait EventsourcedStashManagement { internalStashOverflowStrategy match { case DiscardToDeadLetterStrategy ⇒ val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow? - context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped)) + ctx.system.deadLetters.tell(DeadLetter(msg, snd, ctx.self.toUntyped)) case ReplyToStrategy(response) ⇒ throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!") @@ -65,7 +64,7 @@ private[akka] trait EventsourcedStashManagement { protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = { if (internalStash.nonEmpty) { log.debug("Unstashing message: {}", internalStash.head.getClass) - internalStash.unstash(context, behavior, 1, ConstantFun.scalaIdentityFunction) + internalStash.unstash(ctx, behavior, 1, ConstantFun.scalaIdentityFunction) } else behavior } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index 5180a718b2..be22586f55 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -1,5 +1,7 @@ package akka.persistence.typed.internal +import akka.actor.typed +import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.internal.TimerSchedulerImpl import akka.actor.typed.scaladsl.ActorContext @@ -26,30 +28,7 @@ private[persistence] class PersistentBehaviorImpl[Command, Event, State] private snapshotPluginId: String, snapshotWhen: (State, Event, Long) ⇒ Boolean, recovery: Recovery -) extends DeferredBehavior[Command](ctx ⇒ - TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒ - val callbacks = EventsourcedCallbacks[Command, Event, State]( - initialState, - commandHandler, - eventHandler, - snapshotWhen, - recoveryCompleted, - tagger - ) - val pluginIds = EventsourcedPluginIds( - journalPluginId, - snapshotPluginId - ) - new EventsourcedRequestingRecoveryPermit( - persistenceId, - ctx.asInstanceOf[ActorContext[Any]], // sorry - timers.asInstanceOf[TimerScheduler[Any]], // sorry - recovery, - callbacks, - pluginIds - ).narrow[Command] - - }(ctx)) with PersistentBehavior[Command, Event, State] { +) extends PersistentBehavior[Command, Event, State] { def this( persistenceId: String, @@ -70,6 +49,33 @@ private[persistence] class PersistentBehaviorImpl[Command, Event, State] private ) } + override def apply(ctx: typed.ActorContext[Command]): Behavior[Command] = { + DeferredBehavior[Command](ctx ⇒ + TimerSchedulerImpl.wrapWithTimers[Command] { timers ⇒ + val callbacks = EventsourcedCallbacks[Command, Event, State]( + initialState, + commandHandler, + eventHandler, + snapshotWhen, + recoveryCompleted, + tagger + ) + val pluginIds = EventsourcedPluginIds( + journalPluginId, + snapshotPluginId + ) + new EventsourcedRequestingRecoveryPermit( + persistenceId, + ctx.asInstanceOf[ActorContext[Any]], // sorry + timers.asInstanceOf[TimerScheduler[Any]], // sorry + recovery, + callbacks, + pluginIds + ).narrow[Command] + + }(ctx)) + } + /** * The `callback` function is called to notify the actor that the recovery process * is finished.