Allow internal custom implementations of DeferredBehavior

Pass around context for stashing
This commit is contained in:
Johannes Rudolph 2018-03-01 11:25:49 +01:00 committed by Konrad `ktoso` Malawski
parent fa05695548
commit a9293b3df2
7 changed files with 48 additions and 44 deletions

View file

@ -179,18 +179,17 @@ object Behavior {
* Not placed in internal.BehaviorImpl because Behavior is sealed.
*/
@InternalApi
@DoNotInherit
private[akka] class DeferredBehavior[T](val factory: SAC[T] Behavior[T]) extends Behavior[T] {
/** start the deferred behavior */
@throws(classOf[Exception])
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
private[akka] abstract class DeferredBehavior[T] extends Behavior[T] {
def apply(ctx: ActorContext[T]): Behavior[T]
}
object DeferredBehavior {
/** INTERNAL API */
@InternalApi
private[akka] object DeferredBehavior {
def apply[T](factory: SAC[T] Behavior[T]) =
new DeferredBehavior[T](factory)
new DeferredBehavior[T] {
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
}
/**

View file

@ -85,7 +85,7 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
def onCommand(cmd: Command): Behavior[Any] = {
// during recovery, stash all incoming commands
stash(cmd)
stash(context, cmd)
same
}
@ -111,7 +111,7 @@ private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
onRecoveryFailure(cause, event = None)
case other
stash(other)
stash(context, other)
Behaviors.same
}
} catch {

View file

@ -80,7 +80,7 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State](
def onCommand(cmd: Command): Behavior[Any] = {
// during recovery, stash all incoming commands
stash(cmd)
stash(context, cmd)
Behavior.same
}
@ -124,7 +124,7 @@ final class EventsourcedRecoveringSnapshot[Command, Event, State](
onRecoveryFailure(cause, event = None)
case other
stash(other)
stash(context, other)
same
}
} catch {

View file

@ -80,7 +80,7 @@ private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, S
becomeRecovering()
case other
stash(other)
stash(context, other)
Behaviors.same
}
}

View file

@ -115,7 +115,7 @@ class EventsourcedRunning[Command, Event, State](
def name = "PersistingEvents"
final override def onCommand(c: Command): Behavior[Any] = {
stash(c)
stash(context, c)
same
}

View file

@ -21,7 +21,6 @@ private[akka] trait EventsourcedStashManagement {
protected def log: LoggingAdapter
protected def extension: Persistence
protected def context: ActorContext[Any]
protected val internalStash: StashBuffer[Any]
@ -43,7 +42,7 @@ private[akka] trait EventsourcedStashManagement {
other // the other strategies are supported
}
protected def stash(msg: Any): Unit = {
protected def stash(ctx: ActorContext[Any], msg: Any): Unit = {
if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg)
try internalStash.stash(msg) catch {
@ -51,7 +50,7 @@ private[akka] trait EventsourcedStashManagement {
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy
val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow?
context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped))
ctx.system.deadLetters.tell(DeadLetter(msg, snd, ctx.self.toUntyped))
case ReplyToStrategy(response)
throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!")
@ -65,7 +64,7 @@ private[akka] trait EventsourcedStashManagement {
protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = {
if (internalStash.nonEmpty) {
log.debug("Unstashing message: {}", internalStash.head.getClass)
internalStash.unstash(context, behavior, 1, ConstantFun.scalaIdentityFunction)
internalStash.unstash(ctx, behavior, 1, ConstantFun.scalaIdentityFunction)
} else behavior
}

View file

@ -1,5 +1,7 @@
package akka.persistence.typed.internal
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.internal.TimerSchedulerImpl
import akka.actor.typed.scaladsl.ActorContext
@ -26,30 +28,7 @@ private[persistence] class PersistentBehaviorImpl[Command, Event, State] private
snapshotPluginId: String,
snapshotWhen: (State, Event, Long) Boolean,
recovery: Recovery
) extends DeferredBehavior[Command](ctx
TimerSchedulerImpl.wrapWithTimers[Command] { timers
val callbacks = EventsourcedCallbacks[Command, Event, State](
initialState,
commandHandler,
eventHandler,
snapshotWhen,
recoveryCompleted,
tagger
)
val pluginIds = EventsourcedPluginIds(
journalPluginId,
snapshotPluginId
)
new EventsourcedRequestingRecoveryPermit(
persistenceId,
ctx.asInstanceOf[ActorContext[Any]], // sorry
timers.asInstanceOf[TimerScheduler[Any]], // sorry
recovery,
callbacks,
pluginIds
).narrow[Command]
}(ctx)) with PersistentBehavior[Command, Event, State] {
) extends PersistentBehavior[Command, Event, State] {
def this(
persistenceId: String,
@ -70,6 +49,33 @@ private[persistence] class PersistentBehaviorImpl[Command, Event, State] private
)
}
override def apply(ctx: typed.ActorContext[Command]): Behavior[Command] = {
DeferredBehavior[Command](ctx
TimerSchedulerImpl.wrapWithTimers[Command] { timers
val callbacks = EventsourcedCallbacks[Command, Event, State](
initialState,
commandHandler,
eventHandler,
snapshotWhen,
recoveryCompleted,
tagger
)
val pluginIds = EventsourcedPluginIds(
journalPluginId,
snapshotPluginId
)
new EventsourcedRequestingRecoveryPermit(
persistenceId,
ctx.asInstanceOf[ActorContext[Any]], // sorry
timers.asInstanceOf[TimerScheduler[Any]], // sorry
recovery,
callbacks,
pluginIds
).narrow[Command]
}(ctx))
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.