=per native typed implementation of Eventsourced=>PersistendBehavior

timeout is explicitly a message of Command

persitAll and chainable side effects work well

more tests pasing

additional sanity check that mutable behaviors work as expected

unstashing needs to "loop through" the AdapterActor otherwise Stopped
won't work

solve unstashing/stop issue, by not randomly init()ing, but unstashing

snapshotting works

all tests green

rebased

nicer log source

remove IncomingCommand wrapper, we dont need it

no need for shared counter

remove not needed methods and state

more state cleanup, using Behaviors.same

reminder that we DO need that same alias, since stash does not work with
the Behavior.same

introduce config for stash buffer

stopping now works after persisting

compile fix

cleanup

reduced number of adapter styles needed for co-existence of persistence

final cleanup done, less passing around 40 objects, carriers provided
now
This commit is contained in:
Konrad Malawski 2018-01-23 19:32:11 +09:00 committed by Konrad `ktoso` Malawski
parent 907c6a6931
commit 70e225b734
43 changed files with 1696 additions and 532 deletions

View file

@ -3,8 +3,8 @@
*/
package akka.actor.typed
import akka.actor.typed.scaladsl.{ Behaviors SActor }
import akka.actor.typed.javadsl.{ ActorContext JActorContext, Behaviors JActor }
import akka.actor.typed.scaladsl.{ Behaviors SBehaviors }
import akka.actor.typed.javadsl.{ ActorContext JActorContext, Behaviors JBehaviors }
import akka.japi.function.{ Function F1e, Function2 F2, Procedure2 P2 }
import akka.japi.pf.{ FI, PFBuilder }
import java.util.function.{ Function F1 }
@ -137,16 +137,16 @@ object BehaviorSpec {
}
def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = {
SActor.immutable[Command] {
SBehaviors.immutable[Command] {
case (ctx, GetSelf)
monitor ! Self(ctx.self)
SActor.same
SBehaviors.same
case (_, Miss)
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case (_, Ignore)
monitor ! Ignored
SActor.same
SBehaviors.same
case (_, Ping)
monitor ! Pong
mkFull(monitor, state)
@ -155,13 +155,13 @@ object BehaviorSpec {
mkFull(monitor, state.next)
case (_, GetState())
monitor ! state
SActor.same
case (_, Stop) SActor.stopped
case (_, _) SActor.unhandled
SBehaviors.same
case (_, Stop) SBehaviors.stopped
case (_, _) SBehaviors.unhandled
} onSignal {
case (_, signal)
monitor ! GotSignal(signal)
SActor.same
SBehaviors.same
}
}
/*
@ -345,16 +345,16 @@ class FullBehaviorSpec extends TypedAkkaSpec with Messages with BecomeWithLifecy
class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = {
SActor.immutable[Command] {
SBehaviors.immutable[Command] {
case (ctx, GetSelf)
monitor ! Self(ctx.self)
SActor.same
SBehaviors.same
case (_, Miss)
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case (_, Ignore)
monitor ! Ignored
SActor.same
SBehaviors.same
case (_, Ping)
monitor ! Pong
behv(monitor, state)
@ -363,13 +363,13 @@ class ImmutableBehaviorSpec extends Messages with BecomeWithLifecycle with Stopp
behv(monitor, state.next)
case (_, GetState())
monitor ! state
SActor.same
case (_, Stop) SActor.stopped
case (_, _: AuxPing) SActor.unhandled
SBehaviors.same
case (_, Stop) SBehaviors.stopped
case (_, _: AuxPing) SBehaviors.unhandled
} onSignal {
case (_, signal)
monitor ! GotSignal(signal)
SActor.same
SBehaviors.same
}
}
}
@ -379,18 +379,18 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.immutable[Command] {
SBehaviors.immutable[Command] {
(ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
SBehaviors.same
case Miss
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case Ignore
monitor ! Ignored
SActor.same
SBehaviors.same
case Ping
monitor ! Pong
behv(monitor, state)
@ -399,14 +399,14 @@ class ImmutableWithSignalScalaBehaviorSpec extends TypedAkkaSpec with Messages w
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
SBehaviors.same
case Stop SBehaviors.stopped
case _: AuxPing SBehaviors.unhandled
}
} onSignal {
case (_, sig)
monitor ! GotSignal(sig)
SActor.same
SBehaviors.same
}
}
@ -415,17 +415,17 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
SActor.immutable[Command] { (ctx, msg)
SBehaviors.immutable[Command] { (ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.same
SBehaviors.same
case Miss
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case Ignore
monitor ! Ignored
SActor.same
SBehaviors.same
case Ping
monitor ! Pong
behv(monitor, state)
@ -434,9 +434,9 @@ class ImmutableScalaBehaviorSpec extends Messages with Become with Stoppable {
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
SBehaviors.same
case Stop SBehaviors.stopped
case _: AuxPing SBehaviors.unhandled
}
}
}
@ -446,8 +446,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event]): Behavior[Command] =
SActor.mutable[Command] { ctx
new SActor.MutableBehavior[Command] {
SBehaviors.mutable[Command] { ctx
new SBehaviors.MutableBehavior[Command] {
private var state: State = StateA
override def onMessage(msg: Command): Behavior[Command] = {
@ -457,10 +457,10 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
this
case Miss
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case Ignore
monitor ! Ignored
SActor.same // this or same works the same way
SBehaviors.same // this or same works the same way
case Ping
monitor ! Pong
this
@ -471,8 +471,8 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
case GetState()
monitor ! state
this
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
case Stop SBehaviors.stopped
case _: AuxPing SBehaviors.unhandled
}
}
}
@ -481,7 +481,7 @@ class MutableScalaBehaviorSpec extends Messages with Become with Stoppable {
class WidenedScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with Siphon {
import SActor.BehaviorDecorators
import SBehaviors.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("widenedListener")
@ -494,7 +494,7 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Done]("deferredListener")
(SActor.setup(_ {
(SBehaviors.setup(_ {
inbox.ref ! Done
super.behavior(monitor)._1
}), inbox)
@ -507,30 +507,30 @@ class DeferredScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec {
class TapScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Either[Signal, Command]]("tapListener")
(SActor.tap((_, msg) inbox.ref ! Right(msg), (_, sig) inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox)
(SBehaviors.tap((_, msg) inbox.ref ! Right(msg), (_, sig) inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox)
}
}
class RestarterScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) null
SBehaviors.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) null
}
}
class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.immutable(
JBehaviors.immutable(
fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.same
SBehaviors.same
case Miss
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case Ignore
monitor ! Ignored
SActor.same
SBehaviors.same
case Ping
monitor ! Pong
behv(monitor, state)
@ -539,31 +539,31 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
SBehaviors.same
case Stop SBehaviors.stopped
case _: AuxPing SBehaviors.unhandled
}),
fs((_, sig) {
monitor ! GotSignal(sig)
SActor.same
SBehaviors.same
}))
}
class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
JActor.immutable {
JBehaviors.immutable {
fc((ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.same
SBehaviors.same
case Miss
monitor ! Missed
SActor.unhandled
SBehaviors.unhandled
case Ignore
monitor ! Ignored
SActor.same
SBehaviors.same
case Ping
monitor ! Pong
behv(monitor, state)
@ -572,9 +572,9 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable {
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.same
case Stop SActor.stopped
case _: AuxPing SActor.unhandled
SBehaviors.same
case Stop SBehaviors.stopped
case _: AuxPing SBehaviors.unhandled
})
}
}
@ -582,7 +582,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable {
class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x {
JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x {
inbox.ref ! x
x
})))) inbox
@ -594,7 +594,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Done]("deferredListener")
(JActor.setup(df(_ {
(JBehaviors.setup(df(_ {
inbox.ref ! Done
super.behavior(monitor)._1
})), inbox)
@ -607,7 +607,7 @@ class DeferredJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec {
class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
(JBehaviors.tap(
pc((_, msg) inbox.ref ! Right(msg)),
ps((_, sig) inbox.ref ! Left(sig)),
super.behavior(monitor)._1), inbox)
@ -616,7 +616,7 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse
class RestarterJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.supervise(super.behavior(monitor)._1)
JBehaviors.supervise(super.behavior(monitor)._1)
.onFailure(classOf[Exception], SupervisorStrategy.restart) null
}
}

View file

@ -5,6 +5,7 @@ package akka.actor.typed
import akka.Done
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.EventFilter
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
@ -23,6 +24,12 @@ object WatchSpec {
case (_, Stop) Behaviors.stopped
}
val mutableTerminatorBehavior = new MutableBehavior[Stop.type] {
override def onMessage(msg: Stop.type) = msg match {
case Stop Behaviors.stopped
}
}
sealed trait Message
sealed trait CustomTerminationMessage extends Message
case object CustomTerminationMessage extends CustomTerminationMessage

View file

@ -10,7 +10,7 @@ import akka.actor.{ InvalidMessageException, Props }
import akka.actor.typed.scaladsl.Behaviors
import akka.{ Done, NotUsed, actor untyped }
import akka.testkit._
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Behavior.UntypedPropsBehavior
import scala.concurrent.Await
@ -301,8 +301,9 @@ class AdapterSpec extends AkkaSpec {
"spawn untyped behavior anonymously" in {
val probe = TestProbe()
val untypedBehavior: Behavior[String] = new UntypedBehavior[String] {
override private[akka] def untypedProps: Props = untypedForwarder(probe.ref)
val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] {
override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props =
untypedForwarder(probe.ref)
}
val ref = system.spawnAnonymous(untypedBehavior)
ref ! "hello"
@ -311,8 +312,9 @@ class AdapterSpec extends AkkaSpec {
"spawn untyped behavior" in {
val probe = TestProbe()
val untypedBehavior: Behavior[String] = new UntypedBehavior[String] {
override private[akka] def untypedProps: Props = untypedForwarder(probe.ref)
val untypedBehavior: Behavior[String] = new UntypedPropsBehavior[String] {
override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props =
untypedForwarder(probe.ref)
}
val ref = system.spawn(untypedBehavior, "test")
ref ! "hello"

View file

@ -150,7 +150,7 @@ object Behavior {
}
/**
* INTERNAL API.
* INTERNAL API
*/
@InternalApi
private[akka] object UnhandledBehavior extends Behavior[Nothing] {
@ -158,14 +158,13 @@ object Behavior {
}
/**
* INTERNAL API.
* INTERNAL API
* Used to create untyped props from behaviours, or directly returning an untyped props that implements this behavior.
*/
@InternalApi
private[akka] abstract class UntypedBehavior[T] extends Behavior[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] def untypedProps: akka.actor.Props
private[akka] abstract class UntypedPropsBehavior[T] extends Behavior[T] {
/** INTERNAL API */
@InternalApi private[akka] def untypedProps(props: Props): akka.actor.Props
}
/**
@ -180,7 +179,8 @@ object Behavior {
* Not placed in internal.BehaviorImpl because Behavior is sealed.
*/
@InternalApi
private[akka] final case class DeferredBehavior[T](factory: SAC[T] Behavior[T]) extends Behavior[T] {
@DoNotInherit
private[akka] class DeferredBehavior[T](val factory: SAC[T] Behavior[T]) extends Behavior[T] {
/** start the deferred behavior */
@throws(classOf[Exception])
@ -188,6 +188,10 @@ object Behavior {
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
object DeferredBehavior {
def apply[T](factory: SAC[T] Behavior[T]) =
new DeferredBehavior[T](factory)
}
/**
* INTERNAL API.
@ -299,12 +303,12 @@ object Behavior {
def interpretSignal[T](behavior: Behavior[T], ctx: ActorContext[T], signal: Signal): Behavior[T] =
interpret(behavior, ctx, signal)
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] =
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = {
behavior match {
case null throw new InvalidMessageException("[null] is not an allowed message")
case SameBehavior | UnhandledBehavior
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
case _: UntypedBehavior[_]
case _: UntypedPropsBehavior[_]
throw new IllegalArgumentException(s"cannot wrap behavior [$behavior] in " +
"Behaviors.setup, Behaviors.supervise or similar")
case d: DeferredBehavior[_] throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
@ -318,6 +322,7 @@ object Behavior {
}
start(possiblyDeferredResult, ctx)
}
}
/**
* INTERNAL API
@ -331,10 +336,8 @@ object Behavior {
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
else {
val nextB = messages.next() match {
case sig: Signal
Behavior.interpretSignal(b2, ctx, sig)
case msg
Behavior.interpretMessage(b2, ctx, msg)
case sig: Signal Behavior.interpretSignal(b2, ctx, sig)
case msg Behavior.interpretMessage(b2, ctx, msg)
}
interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive
}

View file

@ -18,7 +18,6 @@ import scala.util.Try
import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.event.LoggingAdapter
import akka.util.Timeout
/**

View file

@ -61,7 +61,9 @@ import scala.reflect.ClassTag
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg)
override def toString = s"Immutable(${LineNumbers(onMessage)})"
}

View file

@ -86,7 +86,7 @@ import akka.util.ConstantFun
}
}
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
override def forEach(f: Consumer[T]): Unit = foreach(f.accept(_))
override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T])
@ -98,8 +98,8 @@ import akka.util.ConstantFun
numberOfMessages: Int, wrap: T T): Behavior[T] = {
val iter = new Iterator[T] {
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
override def next(): T = StashBufferImpl.this.dropHead()
}.take(numberOfMessages).map(wrap)
override def next(): T = wrap(StashBufferImpl.this.dropHead())
}.take(numberOfMessages)
val ctx = scaladslCtx.asInstanceOf[ActorContext[T]]
Behavior.interpretMessages[T](behavior, ctx, iter)
}
@ -108,5 +108,7 @@ import akka.util.ConstantFun
numberOfMessages: Int, wrap: JFunction[T, T]): Behavior[T] =
unstash(ctx.asScala, behavior, numberOfMessages, x wrap.apply(x))
override def toString: String =
s"StashBuffer($size/$capacity)"
}

View file

@ -26,11 +26,13 @@ import scala.reflect.ClassTag
final case class TimerMsg(key: Any, generation: Int, owner: AnyRef)
def withTimers[T](factory: TimerSchedulerImpl[T] Behavior[T]): Behavior[T] = {
scaladsl.Behaviors.setup[T] { ctx
val timerScheduler = new TimerSchedulerImpl[T](ctx)
val behavior = factory(timerScheduler)
timerScheduler.intercept(behavior)
}
scaladsl.Behaviors.setup[T](wrapWithTimers(factory))
}
def wrapWithTimers[T](factory: TimerSchedulerImpl[T] Behavior[T])(ctx: ActorContext[T]): Behavior[T] = {
val timerScheduler = new TimerSchedulerImpl[T](ctx)
val behavior = factory(timerScheduler)
timerScheduler.intercept(behavior)
}
}

View file

@ -6,7 +6,7 @@ package internal
package adapter
import akka.actor.ExtendedActorSystem
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.{ ConfigurationException, actor a }
@ -125,9 +125,10 @@ import scala.concurrent.duration._
def spawnAnonymous[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], props: Props): ActorRef[T] = {
behavior match {
case b: UntypedBehavior[_]
case b: UntypedPropsBehavior[_]
// TODO dispatcher from props
ActorRefAdapter(ctx.actorOf(b.untypedProps))
ActorRefAdapter(ctx.actorOf(b.untypedProps(props)))
case _
try {
Behavior.validateAsInitial(behavior)
@ -141,9 +142,10 @@ import scala.concurrent.duration._
def spawn[T](ctx: akka.actor.ActorContext, behavior: Behavior[T], name: String, props: Props): ActorRef[T] = {
behavior match {
case b: UntypedBehavior[_]
case b: UntypedPropsBehavior[_]
// TODO dispatcher from props
ActorRefAdapter(ctx.actorOf(b.untypedProps, name))
ActorRefAdapter(ctx.actorOf(b.untypedProps(props), name))
case _
try {
Behavior.validateAsInitial(behavior)

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import akka.actor.typed.internal.ActorRefImpl

View file

@ -273,7 +273,6 @@ trait ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
* @tparam Req The request protocol, what the other actor accepts
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[Req, Res](
otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
def ask[Req, Res](otherActor: ActorRef[Req])(createRequest: ActorRef[Res] Req)(mapResponse: Try[Res] T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
}

View file

@ -99,7 +99,7 @@ object Behaviors {
@throws(classOf[Exception])
override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], msg: Signal): Behavior[T] =
onSignal.applyOrElse(msg, { case _ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]])
onSignal.applyOrElse(msg, { case msg Behavior.unhandled }: PartialFunction[Signal, Behavior[T]])
/**
* Override this method to process an incoming [[akka.actor.typed.Signal]] and return the next behavior.

View file

@ -4,7 +4,7 @@
package akka.actor.typed
package scaladsl
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.actor.typed.internal.adapter._
/**
@ -38,8 +38,8 @@ package object adapter {
def spawnAnonymous[T](behavior: Behavior[T], props: Props = Props.empty): ActorRef[T] = {
behavior match {
case b: UntypedBehavior[_]
ActorRefAdapter(sys.actorOf(b.untypedProps))
case b: UntypedPropsBehavior[_]
ActorRefAdapter(sys.actorOf(b.untypedProps(props)))
case _
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props)))
}
@ -47,8 +47,8 @@ package object adapter {
def spawn[T](behavior: Behavior[T], name: String, props: Props = Props.empty): ActorRef[T] = {
behavior match {
case b: UntypedBehavior[_]
ActorRefAdapter(sys.actorOf(b.untypedProps, name))
case b: UntypedPropsBehavior[_]
ActorRefAdapter(sys.actorOf(b.untypedProps(props), name))
case _
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), props), name))
}

View file

@ -27,6 +27,8 @@ import akka.japi.{ Pair ⇒ JPair }
def scalaAnyToNone[A, B]: A Option[B] = none
def scalaAnyTwoToNone[A, B, C]: (A, B) Option[C] = two2none
def scalaAnyTwoToUnit[A, B]: (A, B) Unit = two2unit
def scalaAnyThreeToFalse[A, B, C]: (A, B, C) Boolean = three2false
def javaAnyToNone[A, B]: A Option[B] = none
def nullFun[T] = _nullFun.asInstanceOf[Any T]
@ -44,4 +46,8 @@ import akka.japi.{ Pair ⇒ JPair }
private val two2none = (_: Any, _: Any) None
private val two2unit = (_: Any, _: Any) ()
private val three2false = (_: Any, _: Any, _: Any) false
}

View file

@ -14,7 +14,7 @@ import akka.actor.{ InternalActorRef, Scheduler }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.actor.typed.Props
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
@ -142,8 +142,8 @@ import akka.japi.function.{ Function ⇒ JFunction }
val untypedEntityPropsFactory: String akka.actor.Props = { entityId
behavior(entityId) match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case b PropsAdapter(b, entityProps)
case u: UntypedPropsBehavior[_] u.untypedProps(Props.empty) // PersistentBehavior
case b PropsAdapter(b, entityProps)
}
}

View file

@ -10,7 +10,7 @@ import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors }
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import com.typesafe.config.ConfigFactory

View file

@ -9,7 +9,7 @@ import java.util.function.{ Function ⇒ JFunction }
import akka.actor.{ ExtendedActorSystem, InvalidActorNameException }
import akka.annotation.InternalApi
import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager OldSingletonManager }
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.cluster.typed.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings }
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.adapter._
@ -40,8 +40,8 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex
val managerName = managerNameFor(singletonName)
// start singleton on this node
val untypedProps = behavior match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
case _ PropsAdapter(behavior, props)
case u: UntypedPropsBehavior[_] u.untypedProps(props) // PersistentBehavior
case _ PropsAdapter(behavior, props)
}
try {
untypedSystem.systemActorOf(

View file

@ -5,8 +5,7 @@
package akka.cluster.typed
import akka.actor.typed.{ ActorRef, Behavior, Props, TypedAkkaSpecWithShutdown }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect }
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehaviors }
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import com.typesafe.config.ConfigFactory

View file

@ -62,8 +62,10 @@ and can be used to create various effects such as:
External side effects can be performed after successful persist with the `andThen` function e.g
@scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`].
In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying
the event is passed as parameter to the `andThen` function.
In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying
the event is passed as parameter to the `andThen` function. All `andThen*` registered callbacks
are executed after successful execution of the persist statement (or immediately, in case of `none` and `unhandled`).
### Event handler

View file

@ -0,0 +1,11 @@
akka.persistence.typed {
# default stash buffer size for incoming messages to persistent actors
stash-buffer-size = 1024
# enables automatic logging of messages stashed automatically by an PersistentBehavior,
# this may happen while it receives commands while it is recovering events or while it is persisting events
# Set to a log-level (debug, info, warn, error) to log stashed messages on given log level;
log-stashing = off
}

View file

@ -3,17 +3,15 @@
*/
package akka.persistence.typed.internal
import akka.persistence.typed.{ javadsl j }
import akka.persistence.typed.javadsl
import akka.persistence.typed.scaladsl
import scala.collection.{ immutable im }
import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl.PersistentBehaviors.{ ChainableEffect, Effect }
import akka.annotation.{ DoNotInherit, InternalApi }
/**
* INTERNAL API
*/
/** INTERNAL API */
@InternalApi
private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, State] with Effect[Event, State] {
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] {
/* All events that will be persisted in this effect */
override def events: im.Seq[Event] = Nil
@ -21,19 +19,14 @@ private[akka] abstract class EffectImpl[+Event, State] extends j.Effect[Event, S
override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil
}
/**
* INTERNAL API
*/
/** INTERNAL API */
@InternalApi
private[akka] object CompositeEffect {
def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] = {
CompositeEffect[Event, State](
effect,
sideEffects :: Nil
)
}
def apply[Event, State](effect: EffectImpl[Event, State], sideEffects: ChainableEffect[Event, State]): EffectImpl[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil)
}
/** INTERNAL API */
@InternalApi
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: EffectImpl[Event, State],
@ -43,25 +36,39 @@ private[akka] final case class CompositeEffect[Event, State](
override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]]
override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
}
/** INTERNAL API */
@InternalApi
private[akka] case object PersistNothing extends EffectImpl[Nothing, Nothing]
/** INTERNAL API */
@InternalApi
private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[Event, State] {
override def events = event :: Nil
}
/** INTERNAL API */
@InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State]
/** INTERNAL API */
@InternalApi
private[akka] case class SideEffect[Event, State](effect: State Unit) extends ChainableEffect[Event, State]
/** INTERNAL API */
@InternalApi
private[akka] case object Stop extends ChainableEffect[Nothing, Nothing]
/** INTERNAL API */
@InternalApi
private[akka] case object Unhandled extends EffectImpl[Nothing, Nothing]
/**
* Not for user extension
*/
@DoNotInherit
abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State]

View file

@ -0,0 +1,102 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.StoppedBehavior
import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.{ LogSource, Logging }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.{ JournalProtocol, Persistence, RecoveryPermitter, SnapshotProtocol }
import akka.{ actor a }
/** INTERNAL API */
@InternalApi
private[akka] object EventsourcedBehavior {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
private[akka] val instanceIdCounter = new AtomicInteger(1)
@InternalApi private[akka] object WriterIdentity {
def newIdentity(): WriterIdentity = {
val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement()
val writerUuid: String = UUID.randomUUID.toString
WriterIdentity(instanceId, writerUuid)
}
}
private[akka] final case class WriterIdentity(instanceId: Int, writerUuid: String)
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
private[akka] sealed trait EventsourcedProtocol
private[akka] case object RecoveryPermitGranted extends EventsourcedProtocol
private[akka] final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends EventsourcedProtocol
private[akka] final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends EventsourcedProtocol
private[akka] final case class RecoveryTickEvent(snapshot: Boolean) extends EventsourcedProtocol
private[akka] final case class ReceiveTimeout(timeout: akka.actor.ReceiveTimeout) extends EventsourcedProtocol
implicit object PersistentBehaviorLogSource extends LogSource[EventsourcedBehavior[_, _, _]] {
override def genString(b: EventsourcedBehavior[_, _, _]): String = {
val behaviorShortName = b match {
case _: EventsourcedRunning[_, _, _] "running"
case _: EventsourcedRecoveringEvents[_, _, _] "recover-events"
case _: EventsourcedRecoveringSnapshot[_, _, _] "recover-snap"
case _: EventsourcedRequestingRecoveryPermit[_, _, _] "awaiting-permit"
}
s"PersistentBehavior[id:${b.persistenceId}][${b.context.self.path}][$behaviorShortName]"
}
}
}
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedBehavior[Command, Event, State] {
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
protected def context: ActorContext[Any]
protected def timers: TimerScheduler[Any]
type C = Command
type AC = ActorContext[C]
type E = Event
type S = State
// used for signaling intent in type signatures
type SeqNr = Long
def persistenceId: String
protected def callbacks: EventsourcedCallbacks[Command, Event, State]
protected def initialState: State = callbacks.initialState
protected def commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State] = callbacks.commandHandler
protected def eventHandler: (State, Event) State = callbacks.eventHandler
protected def snapshotWhen: (State, Event, SeqNr) Boolean = callbacks.snapshotWhen
protected def tagger: Event Set[String] = callbacks.tagger
protected def pluginIds: EventsourcedPluginIds
protected final def journalPluginId: String = pluginIds.journalPluginId
protected final def snapshotPluginId: String = pluginIds.snapshotPluginId
// ------ common -------
protected lazy val extension = Persistence(context.system.toUntyped)
protected lazy val journal: a.ActorRef = extension.journalFor(journalPluginId)
protected lazy val snapshotStore: a.ActorRef = extension.snapshotStoreFor(snapshotPluginId)
protected lazy val selfUntyped: a.ActorRef = context.self.toUntyped
protected lazy val selfUntypedAdapted: a.ActorRef = context.messageAdapter[Any] {
case res: JournalProtocol.Response JournalResponse(res)
case RecoveryPermitter.RecoveryPermitGranted RecoveryPermitGranted
case res: SnapshotProtocol.Response SnapshotterResponse(res)
case cmd: Command @unchecked cmd // if it was wrong, we'll realise when trying to onMessage the cmd
}.toUntyped
}

View file

@ -0,0 +1,20 @@
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.ActorContext
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
/** INTERNAL API: Used to carry user callbacks between behaviors of an Persistent Behavior */
private[akka] final case class EventsourcedCallbacks[Command, Event, State](
initialState: State,
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State,
snapshotWhen: (State, Event, Long) Boolean,
recoveryCompleted: (ActorContext[Command], State) Unit,
tagger: Event Set[String]
)
/** INTERNAL API: Used to carry settings between behaviors of an Persistent Behavior */
private[akka] final case class EventsourcedPluginIds(
journalPluginId: String,
snapshotPluginId: String
)

View file

@ -0,0 +1,222 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.persistence.typed.scaladsl.PersistentBehaviors._
import akka.util.Helpers._
import scala.util.control.NonFatal
/**
* INTERNAL API
*
* Third (of four) behavior of an PersistentBehavior.
*
* In this behavior we finally start replaying events, beginning from the last applied sequence number
* (i.e. the one up-until-which the snapshot recovery has brought us).
*
* Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed
* and control is given to the user's handlers to drive the actors behavior from there.
*
*/
@InternalApi
private[akka] class EventsourcedRecoveringEvents[Command, Event, State](
val persistenceId: String,
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
val recovery: Recovery,
private var sequenceNr: Long,
val writerIdentity: WriterIdentity,
private var state: State,
val callbacks: EventsourcedCallbacks[Command, Event, State],
val pluginIds: EventsourcedPluginIds
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import Behaviors.same
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
protected val log = Logging(context.system.toUntyped, this)
// -------- initialize --------
startRecoveryTimer()
replayEvents(sequenceNr + 1L, recovery.toSequenceNr)
// ---- end of initialize ----
private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
// ----------
def snapshotSequenceNr: Long = sequenceNr
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
private def setLastSequenceNr(value: Long): Unit =
sequenceNr = value
// ----------
// FIXME it's a bit of a pain to have those lazy vals, change everything to constructor parameters
lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(): Unit = timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
private var eventSeenInInterval = false
def onCommand(cmd: Command): Behavior[Any] = {
// during recovery, stash all incoming commands
stash(cmd)
same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
response match {
case ReplayedMessage(repr)
eventSeenInInterval = true
updateLastSequenceNr(repr)
// TODO we need some state adapters here?
val newState = eventHandler(state, repr.payload.asInstanceOf[Event])
state = newState
same
case RecoverySuccess(highestSeqNr)
log.debug("Recovery successful, recovered until sequenceNr: {}", highestSeqNr)
cancelRecoveryTimer()
setLastSequenceNr(highestSeqNr)
try onRecoveryCompleted(state)
catch { case NonFatal(ex) onRecoveryFailure(ex, Some(state)) }
case ReplayMessagesFailure(cause)
onRecoveryFailure(cause, event = None)
case other
stash(other)
Behaviors.same
}
} catch {
case NonFatal(e)
cancelRecoveryTimer()
onRecoveryFailure(e, None)
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
log.warning("Unexpected [{}] from SnapshotStore, already in recovering events state.", Logging.simpleName(response))
Behaviors.same // ignore the response
}
/**
* Called whenever a message replay fails. By default it logs the error.
*
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
returnRecoveryPermit("on recovery failure: " + cause.getMessage)
cancelRecoveryTimer()
event match {
case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr)
Behaviors.stopped
case None
log.error(cause, "Persistence failure when replaying events. Last known sequence number [{}]", persistenceId, sequenceNr)
Behaviors.stopped
}
}
protected def onRecoveryCompleted(state: State): Behavior[Any] = {
try {
returnRecoveryPermit("recovery completed successfully")
callbacks.recoveryCompleted(commandContext, state)
val running = new EventsourcedRunning[Command, Event, State](
persistenceId,
context,
timers,
internalStash,
sequenceNr,
writerIdentity,
state,
callbacks,
pluginIds
)
tryUnstash(context, running)
} finally {
cancelRecoveryTimer()
}
}
protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
if (!snapshot) {
if (!eventSeenInInterval) {
cancelRecoveryTimer()
val msg = s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr"
onRecoveryFailure(new RecoveryTimedOut(msg), event = None) // TODO allow users to hook into this?
} else {
eventSeenInInterval = false
same
}
} else {
// snapshot timeout, but we're already in the events recovery phase
Behavior.unhandled
}
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot = snapshot)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case c: Command @unchecked onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
}
}
// ----------
// ---------- journal interactions ---------
private def replayEvents(fromSeqNr: SeqNr, toSeqNr: SeqNr): Unit = {
log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
// reply is sent to `selfUntypedAdapted`, it is important to target that one
journal ! ReplayMessages(fromSeqNr, toSeqNr, recovery.replayMax, persistenceId, selfUntypedAdapted)
}
private def returnRecoveryPermit(reason: String): Unit = {
log.debug("Returning recovery permit, reason: " + reason)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRecoveringEvents($persistenceId)"
}

View file

@ -0,0 +1,217 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.SnapshotProtocol.{ LoadSnapshot, LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import akka.util.Helpers._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
*
* Second (of four) behavior of an PersistentBehavior.
*
* In this behavior the recovery process is initiated.
* We try to obtain a snapshot from the configured snapshot store,
* and if it exists, we use it instead of the `initialState`.
*
* Once snapshot recovery is done (or no snapshot was selected),
* recovery of events continues in [[EventsourcedRecoveringEvents]].
*/
@InternalApi
final class EventsourcedRecoveringSnapshot[Command, Event, State](
val persistenceId: String,
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
val recovery: Recovery,
val writerIdentity: WriterIdentity,
val callbacks: EventsourcedCallbacks[Command, Event, State],
val pluginIds: EventsourcedPluginIds
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import Behaviors.same
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
protected val log = Logging(context.system.toUntyped, this)
// -------- initialize --------
startRecoveryTimer()
loadSnapshot(persistenceId, recovery.fromSnapshot, recovery.toSequenceNr)
// ---- end of initialize ----
val commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
// ----------
protected var awaitingSnapshot: Boolean = true
// ----------
private var lastSequenceNr: Long = 0L
def snapshotSequenceNr: Long = lastSequenceNr
// ----------
lazy val timeout = extension.journalConfigFor(journalPluginId).getMillisDuration("recovery-event-timeout")
// protect against snapshot stalling forever because of journal overloaded and such
private val RecoveryTickTimerKey = "recovery-tick"
private def startRecoveryTimer(): Unit = {
timers.startPeriodicTimer(RecoveryTickTimerKey, RecoveryTickEvent(snapshot = false), timeout)
}
private def cancelRecoveryTimer(): Unit = timers.cancel(RecoveryTickTimerKey)
def onCommand(cmd: Command): Behavior[Any] = {
// during recovery, stash all incoming commands
stash(cmd)
Behavior.same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any] = try {
throw new Exception("Should not talk to journal yet! But got: " + response)
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
throw cause
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = try {
response match {
case LoadSnapshotResult(sso, toSnr)
var state: S = initialState
val re: Try[SeqNr] = Try {
sso match {
case Some(SelectedSnapshot(metadata, snapshot))
state = snapshot.asInstanceOf[State]
metadata.sequenceNr
case None
0 // from the start please
}
}
re match {
case Success(seqNr)
lastSequenceNr = seqNr
replayMessages(state, toSnr)
case Failure(cause)
// FIXME better exception type
val ex = new RuntimeException(s"Failed to recover state for [$persistenceId] from snapshot offer.", cause)
onRecoveryFailure(ex, event = None) // FIXME the failure logs has bad messages... FIXME
}
case LoadSnapshotFailed(cause)
cancelRecoveryTimer()
onRecoveryFailure(cause, event = None)
case other
stash(other)
same
}
} catch {
case NonFatal(cause)
returnRecoveryPermitOnlyOnFailure(cause)
throw cause
}
private def replayMessages(state: State, toSnr: SeqNr): Behavior[Any] = {
cancelRecoveryTimer()
val rec = recovery.copy(toSequenceNr = toSnr, fromSnapshot = SnapshotSelectionCriteria.None) // TODO introduce new types
new EventsourcedRecoveringEvents[Command, Event, State](
persistenceId,
context,
timers,
internalStash,
rec,
lastSequenceNr,
writerIdentity,
state,
callbacks,
pluginIds
)
}
/**
* Called whenever a message replay fails. By default it logs the error.
*
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception was thrown there
*/
protected def onRecoveryFailure(cause: Throwable, event: Option[Any]): Behavior[Any] = {
cancelRecoveryTimer()
event match {
case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
"persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId)
Behaviors.stopped
case None
log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", persistenceId, lastSequenceNr)
Behaviors.stopped
}
}
protected def onRecoveryTick(snapshot: Boolean): Behavior[Any] =
// we know we're in snapshotting mode
if (snapshot) onRecoveryFailure(new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within $timeout"), event = None)
else same // ignore, since we received the snapshot already
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot = snapshot)
case c: Command @unchecked onCommand(c.asInstanceOf[Command]) // explicit cast to fail eagerly
}
}
// ----------
// ---------- journal interactions ---------
/**
* Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
* to the running [[PersistentActor]].
*/
private def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
snapshotStore.tell(LoadSnapshot(persistenceId, criteria, toSequenceNr), selfUntypedAdapted)
}
private def returnRecoveryPermitOnlyOnFailure(cause: Throwable): Unit = {
log.debug("Returning recovery permit, on failure because: " + cause.getMessage)
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRecoveringSnapshot($persistenceId)"
}

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
/**
* INTERNAL API
*
* First (of four) behaviour of an PersistentBehaviour.
*
* Requests a permit to start recovering this actor; this is tone to avoid
* hammering the journal with too many concurrently recovering actors.
*/
@InternalApi
private[akka] final class EventsourcedRequestingRecoveryPermit[Command, Event, State](
val persistenceId: String,
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
val recovery: Recovery,
val callbacks: EventsourcedCallbacks[Command, Event, State],
val pluginIds: EventsourcedPluginIds
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement {
import akka.actor.typed.scaladsl.adapter._
// has to be lazy, since we want to obtain the persistenceId
protected lazy val log = Logging(context.system.toUntyped, this)
override protected val internalStash: StashBuffer[Any] = {
val stashSize = context.system.settings.config
.getInt("akka.persistence.typed.stash-buffer-size")
StashBuffer[Any](stashSize)
}
// --- initialization ---
// only once we have a permit, we can become active:
requestRecoveryPermit()
val writerIdentity: WriterIdentity = WriterIdentity.newIdentity()
// --- end of initialization ---
// ----------
def becomeRecovering(): Behavior[Any] = {
log.debug(s"Initializing snapshot recovery: {}", recovery)
new EventsourcedRecoveringSnapshot(
persistenceId,
context,
timers,
internalStash,
recovery,
writerIdentity,
callbacks,
pluginIds
)
}
// ----------
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
case RecoveryPermitter.RecoveryPermitGranted
log.debug("Awaiting permit, received: RecoveryPermitGranted")
becomeRecovering()
case other
stash(other)
Behaviors.same
}
}
// ---------- journal interactions ---------
private def requestRecoveryPermit(): Unit = {
// IMPORTANT to use selfUntyped, and not an adapter, since recovery permitter watches/unwatches those refs (and adapters are new refs)
extension.recoveryPermitter.tell(RecoveryPermitter.RequestRecoveryPermit, selfUntyped)
}
override def toString = s"EventsourcedRequestingRecoveryPermit($persistenceId)"
}

View file

@ -0,0 +1,365 @@
/**
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.StoppedBehavior
import akka.actor.typed.scaladsl.Behaviors.MutableBehavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.Eventsourced.{ PendingHandlerInvocation, StashingHandlerInvocation }
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.internal.EventsourcedBehavior.WriterIdentity
import scala.annotation.tailrec
import scala.collection.immutable
/**
* INTERNAL API
*
* Fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior.
*
* In this phase recovery has completed successfully and we continue handling incoming commands,
* as well as persisting new events as dictated by the user handlers.
*
* This behavior operates in two phases:
* - HandlingCommands - where the command handler is invoked for incoming commands
* - PersistingEvents - where incoming commands are stashed until persistence completes
*
* This is implemented as such to avoid creating many EventsourcedRunning instances,
* which perform the Persistence extension lookup on creation and similar things (config lookup)
*
*/
@InternalApi
class EventsourcedRunning[Command, Event, State](
val persistenceId: String,
override val context: ActorContext[Any],
override val timers: TimerScheduler[Any],
override val internalStash: StashBuffer[Any],
private var sequenceNr: Long,
val writerIdentity: WriterIdentity,
private var state: State,
val callbacks: EventsourcedCallbacks[Command, Event, State],
val pluginIds: EventsourcedPluginIds
) extends MutableBehavior[Any]
with EventsourcedBehavior[Command, Event, State]
with EventsourcedStashManagement { same
import EventsourcedBehavior._
import akka.actor.typed.scaladsl.adapter._
protected val log = Logging(context.system.toUntyped, this)
private def commandContext: ActorContext[Command] = context.asInstanceOf[ActorContext[Command]]
// ----------
// Holds callbacks for persist calls (note that we do not implement persistAsync currently)
private def hasNoPendingInvocations: Boolean = pendingInvocations.isEmpty
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
// ----------
private def snapshotSequenceNr: Long = sequenceNr
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > sequenceNr) sequenceNr = persistent.sequenceNr
private def nextSequenceNr(): Long = {
sequenceNr += 1L
sequenceNr
}
// ----------
private def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[Any] = {
response match {
case SaveSnapshotSuccess(meta)
log.debug("Save snapshot successful: " + meta)
same
case SaveSnapshotFailure(meta, ex)
log.error(ex, "Save snapshot failed! " + meta)
same // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
}
}
// ----------
trait EventsourcedRunningPhase {
def name: String
def onCommand(c: Command): Behavior[Any]
def onJournalResponse(response: JournalProtocol.Response): Behavior[Any]
}
object HandlingCommands extends EventsourcedRunningPhase {
def name = "HandlingCommands"
final override def onCommand(command: Command): Behavior[Any] = {
val effect = commandHandler(commandContext, state, command)
applyEffects(command, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
final override def onJournalResponse(response: Response): Behavior[Any] = {
// should not happen, what would it reply?
throw new RuntimeException("Received message which should not happen in Running state!")
}
}
object PersistingEventsNoSideEffects extends PersistingEvents(Nil)
sealed class PersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]) extends EventsourcedRunningPhase {
def name = "PersistingEvents"
final override def onCommand(c: Command): Behavior[Any] = {
stash(c)
same
}
final override def onJournalResponse(response: Response): Behavior[Any] = {
log.debug("Received Journal response: {}", response)
response match {
case WriteMessageSuccess(p, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == writerIdentity.instanceId) {
updateLastSequenceNr(p)
popApplyHandler(p.payload)
onWriteMessageComplete()
tryUnstash(context, applySideEffects(sideEffects))
} else same
case WriteMessageRejected(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == writerIdentity.instanceId) {
updateLastSequenceNr(p)
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop
tryUnstash(context, applySideEffects(sideEffects))
} else same
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == writerIdentity.instanceId) {
onWriteMessageComplete()
onPersistFailureThenStop(cause, p.payload, p.sequenceNr)
} else same
case WriteMessagesSuccessful
// ignore
same
case WriteMessagesFailed(_)
// ignore
same // it will be stopped by the first WriteMessageFailure message; not applying side effects
case _: LoopMessageSuccess
// ignore, should never happen as there is no persistAsync in typed
same
}
}
private def onWriteMessageComplete(): Unit =
tryBecomeHandlingCommands()
private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
event.getClass.getName, seqNr, persistenceId, cause.getMessage)
}
private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[Any] = {
log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
}
// the active phase switches between PersistingEvents and HandlingCommands;
// we do this via a var instead of behaviours to keep allocations down as this will be flip/flaping on every Persist effect
private[this] var phase: EventsourcedRunningPhase = HandlingCommands
override def onMessage(msg: Any): Behavior[Any] = {
msg match {
// TODO explore crazy hashcode hack to make this match quicker...?
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) phase.onJournalResponse(r)
case command: Command @unchecked
// the above type-check does nothing, since Command is tun
// we cast explicitly to fail early in case of type mismatch
val c = command.asInstanceOf[Command]
phase.onCommand(c)
}
}
// ----------
def applySideEffects(effects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
var res: Behavior[Any] = same
val it = effects.iterator
// if at least one effect results in a `stop`, we need to stop
// manual loop implementation to avoid allocations and multiple scans
while (it.hasNext) {
val effect = it.next()
applySideEffect(effect) match {
case _: StoppedBehavior[_] res = Behaviors.stopped
case _ // nothing to do
}
}
res
}
def applySideEffect(effect: ChainableEffect[_, S]): Behavior[Any] = effect match {
case _: Stop.type @unchecked
Behaviors.stopped
case SideEffect(sideEffects)
sideEffects(state)
same
case _
throw new IllegalArgumentException(s"Not supported effect detected [${effect.getClass.getName}]!")
}
def applyEvent(s: S, event: E): S =
eventHandler(s, event)
@tailrec private def applyEffects(msg: Any, effect: EffectImpl[E, S], sideEffects: immutable.Seq[ChainableEffect[_, S]] = Nil): Behavior[Any] = {
if (log.isDebugEnabled)
log.debug(s"Handled command [{}], resulting effect: [{}], side effects: [{}]", msg.getClass.getName, effect, sideEffects.size)
effect match {
case CompositeEffect(e, currentSideEffects)
// unwrap and accumulate effects
applyEffects(msg, e, currentSideEffects ++ sideEffects)
case Persist(event)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
state = applyEvent(state, event)
val tags = tagger(event)
val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
internalPersist(eventToPersist, sideEffects) { _
if (snapshotWhen(state, event, sequenceNr))
internalSaveSnapshot(state)
}
case PersistAll(events)
if (events.nonEmpty) {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var count = events.size
var seqNr = sequenceNr
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
case ((currentState, snapshot), event)
seqNr += 1
val shouldSnapshot = snapshot || snapshotWhen(currentState, event, seqNr)
(applyEvent(currentState, event), shouldSnapshot)
}
state = newState
val eventsToPersist = events.map { event
val tags = tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
}
internalPersistAll(eventsToPersist, sideEffects) { _
count -= 1
if (count == 0) {
sideEffects.foreach(applySideEffect)
if (shouldSnapshotAfterPersist)
internalSaveSnapshot(state)
}
}
} else {
// run side-effects even when no events are emitted
tryUnstash(context, applySideEffects(sideEffects))
}
case e: PersistNothing.type @unchecked
tryUnstash(context, applySideEffects(sideEffects))
case _: Unhandled.type @unchecked
applySideEffects(sideEffects)
Behavior.unhandled
case c: ChainableEffect[_, S]
applySideEffect(c)
}
}
private def popApplyHandler(payload: Any): Unit =
pendingInvocations.pop().handler(payload)
private def becomePersistingEvents(sideEffects: immutable.Seq[ChainableEffect[_, S]]): Behavior[Any] = {
if (phase.isInstanceOf[PersistingEvents]) throw new IllegalArgumentException(
"Attempted to become PersistingEvents while already in this phase! Logic error?")
phase =
if (sideEffects.isEmpty) PersistingEventsNoSideEffects
else new PersistingEvents(sideEffects)
same
}
private def tryBecomeHandlingCommands(): Behavior[Any] = {
if (phase == HandlingCommands) throw new IllegalArgumentException(
"Attempted to become HandlingCommands while already in this phase! Logic error?")
if (hasNoPendingInvocations) { // CAN THIS EVER NOT HAPPEN?
phase = HandlingCommands
}
same
}
// ---------- journal interactions ---------
// Any since can be `E` or `Tagged`
private def internalPersist(event: Any, sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[Any] = {
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)
val eventBatch = AtomicWrite(repr) :: Nil // batching not used, since no persistAsync
journal.tell(JournalProtocol.WriteMessages(eventBatch, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
becomePersistingEvents(sideEffects)
}
private def internalPersistAll(events: immutable.Seq[Any], sideEffects: immutable.Seq[ChainableEffect[_, S]])(handler: Any Unit): Behavior[Any] = {
if (events.nonEmpty) {
val senderNotKnownBecauseAkkaTyped = null
events.foreach { event
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
}
val write = AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped)))
journal.tell(JournalProtocol.WriteMessages(write :: Nil, selfUntypedAdapted, writerIdentity.instanceId), selfUntypedAdapted)
becomePersistingEvents(sideEffects)
} else same
}
private def internalSaveSnapshot(snapshot: State): Unit = {
snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(persistenceId, snapshotSequenceNr), snapshot), selfUntypedAdapted)
}
override def toString = s"EventsourcedRunning($persistenceId,${phase.name})"
}

View file

@ -0,0 +1,76 @@
package akka.persistence.typed.internal
import java.util.Locale
import akka.actor.typed.{ ActorSystem, Behavior }
import akka.actor.{ DeadLetter, StashOverflowException }
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
import akka.annotation.InternalApi
import akka.event.Logging.LogLevel
import akka.event.{ Logging, LoggingAdapter }
import akka.persistence._
import akka.util.ConstantFun
import akka.{ actor a }
/** INTERNAL API: Stash management for persistent behaviors */
@InternalApi
private[akka] trait EventsourcedStashManagement {
import EventsourcedStashManagement._
import akka.actor.typed.scaladsl.adapter._
protected def log: LoggingAdapter
protected def extension: Persistence
protected def context: ActorContext[Any]
protected val internalStash: StashBuffer[Any]
private lazy val logLevel = {
val configuredLevel = extension.system.settings.config
.getString("akka.persistence.typed.log-stashing")
Logging.levelFor(configuredLevel).getOrElse(OffLevel) // this is OffLevel
}
/**
* The returned [[StashOverflowStrategy]] object determines how to handle the message failed to stash
* when the internal Stash capacity exceeded.
*/
protected val internalStashOverflowStrategy: StashOverflowStrategy =
extension.defaultInternalStashOverflowStrategy match {
case ReplyToStrategy(_)
throw new RuntimeException("ReplyToStrategy is not supported in Akka Typed, since there is no sender()!")
case other
other // the other strategies are supported
}
protected def stash(msg: Any): Unit = {
if (logLevel != OffLevel) log.log(logLevel, "Stashing message: {}", msg)
try internalStash.stash(msg) catch {
case e: StashOverflowException
internalStashOverflowStrategy match {
case DiscardToDeadLetterStrategy
val snd: a.ActorRef = a.ActorRef.noSender // FIXME can we improve it somehow?
context.system.deadLetters.tell(DeadLetter(msg, snd, context.self.toUntyped))
case ReplyToStrategy(response)
throw new RuntimeException("ReplyToStrategy does not make sense at all in Akka Typed, since there is no sender()!")
case ThrowOverflowExceptionStrategy
throw e
}
}
}
protected def tryUnstash(ctx: ActorContext[Any], behavior: Behavior[Any]): Behavior[Any] = {
if (internalStash.nonEmpty) {
log.debug("Unstashing message: {}", internalStash.head.getClass)
internalStash.unstash(context, behavior, 1, ConstantFun.scalaIdentityFunction)
} else behavior
}
}
object EventsourcedStashManagement {
private val OffLevel = LogLevel(Int.MinValue)
}

View file

@ -1,157 +0,0 @@
/*
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.ActorLogging
import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.annotation.InternalApi
import akka.persistence.journal.Tagged
import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors }
import akka.persistence.{ RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer, PersistentActor UntypedPersistentActor }
import akka.{ actor a }
/**
* INTERNAL API
*/
@InternalApi private[akka] object PersistentActorImpl {
/**
* Stop the actor for passivation. `PoisonPill` does not work well
* with persistent actors.
*/
case object StopForPassivation
def props[C, E, S](
behaviorFactory: () PersistentBehavior[C, E, S]): a.Props =
a.Props(new PersistentActorImpl(behaviorFactory()))
}
/**
* INTERNAL API
* The `PersistentActor` that runs a `PersistentBehavior`.
*/
@InternalApi private[akka] class PersistentActorImpl[C, E, S](
behavior: PersistentBehavior[C, E, S]) extends UntypedPersistentActor with ActorLogging {
import PersistentBehaviors._
override val persistenceId: String = behavior.persistenceIdFromActorName(self.path.name)
private var state: S = behavior.initialState
private val commandHandler: CommandHandler[C, E, S] = behavior.commandHandler
private val eventHandler: (S, E) S = behavior.eventHandler
private val ctxAdapter = new ActorContextAdapter[C](context)
private val ctx = ctxAdapter.asScala
override def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot)
state = snapshot.asInstanceOf[S]
case RecoveryCompleted
behavior.recoveryCompleted(ctx, state)
case event: E @unchecked
state = applyEvent(state, event)
}
def applyEvent(s: S, event: E): S =
eventHandler.apply(s, event)
override def receiveCommand: Receive = {
case PersistentActorImpl.StopForPassivation
context.stop(self)
case SaveSnapshotSuccess(meta)
log.debug("Snapshot saved: {}", meta)
case SaveSnapshotFailure(meta, thr)
log.error(thr, "Snapshot failed: {}", meta)
case msg
try {
val effects = msg match {
case a.ReceiveTimeout
commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg)
// TODO note that PostStop, PreRestart and Terminated signals are not handled, we wouldn't be able to persist there
case cmd: C @unchecked
// FIXME we could make it more safe by using ClassTag for C
commandHandler(ctx, state, cmd)
}
applyEffects(msg, effects)
} catch {
case _: MatchError throw new IllegalStateException(
s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " +
s"in [${behavior.getClass.getName}] with persistenceId [$persistenceId]")
}
}
private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match {
case CompositeEffect(persist, currentSideEffects)
applyEffects(msg, persist, currentSideEffects ++ sideEffects)
case Persist(event)
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
state = applyEvent(state, event)
val tags = behavior.tagger(event)
val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
persist(eventToPersist) { _
sideEffects.foreach(applySideEffect)
if (shouldSnapshot(state, event, lastSequenceNr))
saveSnapshot(state)
}
case PersistAll(events)
if (events.nonEmpty) {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var count = events.size
var seqNr = lastSequenceNr
val (newState, shouldSnapshotAfterPersist) = events.foldLeft((state, false)) {
case ((currentState, snapshot), event)
seqNr += 1
(applyEvent(currentState, event), snapshot || shouldSnapshot(currentState, event, seqNr))
}
state = newState
val eventsToPersist = events.map { event
val tags = behavior.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
}
persistAll(eventsToPersist) { _
count -= 1
if (count == 0) {
sideEffects.foreach(applySideEffect)
if (shouldSnapshotAfterPersist)
saveSnapshot(state)
}
}
} else {
// run side-effects even when no events are emitted
sideEffects.foreach(applySideEffect)
}
case _: PersistNothing.type @unchecked
// FIXME: Why don't we do the side effects here??
sideEffects.foreach(applySideEffect)
case _: Unhandled.type @unchecked
// FIXME: Why don't we do the side effects here?? We do allow users to add them
super.unhandled(msg)
case c: ChainableEffect[_, S]
applySideEffect(c)
}
def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match {
case _: Stop.type @unchecked
context.stop(self)
case SideEffect(callbacks) callbacks.apply(state)
}
private def shouldSnapshot(state: S, event: E, sequenceNr: Long): Boolean = {
behavior.snapshotOn(state, event, sequenceNr)
}
}

View file

@ -4,8 +4,7 @@
package akka.persistence.typed.javadsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.scaladsl.PersistentBehaviors._
import akka.japi.{ function japi }
import akka.japi.function
import akka.persistence.typed.internal._
import scala.collection.JavaConverters._
@ -53,10 +52,10 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
@DoNotInherit abstract class Effect[+Event, State] {
self: EffectImpl[Event, State]
/** Convenience method to register a side effect with just a callback function */
final def andThen(callback: japi.Procedure[State]): Effect[Event, State] =
final def andThen(callback: function.Procedure[State]): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State](s callback.apply(s)))
/** Convenience method to register a side effect that doesn't need access to state */
final def andThen(callback: japi.Effect): Effect[Event, State] =
final def andThen(callback: function.Effect): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State]((_: State) callback.apply()))
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl
import java.util.Collections
import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.actor.typed.internal.adapter.PropsAdapter
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.persistence.typed._
import akka.persistence.typed.internal._
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedPropsBehavior[Command] {
/**
* Factory of effects.
*
* Return effects from your handlers in order to instruct persistence on how to act on the incoming message (i.e. persist events).
*/
protected final def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]]
/**
* Implement by returning the initial state object.
* This object will be passed into this behaviors handlers, until a new state replaces it.
*
* Also known as "zero state" or "neutral state".
*/
protected def initialState: State
/**
* Implement by handling incoming commands and return an `Effect()` to persist or signal other effects
* of the command handling such as stopping the behavior or others.
*
* This method is only invoked when the actor is running (i.e. not recovering).
* While the actor is persisting events, the incoming messages are stashed and only
* delivered to the handler once persisting them has completed.
*/
protected def commandHandler(): CommandHandler[Command, Event, State]
/**
* Implement by applying the event to the current state in order to return a new state.
*
* This method invoked during recovery as well as running operation of this behavior,
* in order to keep updating the state state.
*
* For that reason it is strongly discouraged to perform side-effects in this handler;
* Side effects should be executed in `andThen` or `recoveryCompleted` blocks.
*/
protected def eventHandler(): EventHandler[Event, State]
/**
* @return A new, mutable, by state command handler builder
*/
protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] =
new CommandHandlerBuilder[Command, Event, State]()
/**
* @return A new, mutable, by state command handler builder
*/
protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] =
new ByStateCommandHandlerBuilder[Command, Event, State]()
/**
* @return A new, mutable, event handler builder
*/
protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] =
new EventHandlerBuilder[Event, State]()
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {}
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet()
/** INTERNAL API */
@InternalApi private[akka] override def untypedProps(props: akka.actor.typed.Props): akka.actor.Props = {
val behaviorImpl = scaladsl.PersistentBehaviors.immutable[Command, Event, State](
persistenceId,
initialState,
(c, state, cmd) commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]],
eventHandler()(_, _)
)
PropsAdapter(() behaviorImpl, props)
}
}

View file

@ -1,78 +0,0 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl
import java.util.Collections
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.ApiMayChange
import akka.persistence.typed._
import akka.persistence.typed.internal._
import scala.collection.JavaConverters._
@ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends UntypedBehavior[Command] {
def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]]
val initialState: State
def commandHandler(): CommandHandler[Command, Event, State]
def eventHandler(): EventHandler[Event, State]
/**
* @return A new, mutable, by state command handler builder
*/
protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] =
new CommandHandlerBuilder[Command, Event, State]()
/**
* @return A new, mutable, by state command handler builder
*/
protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] =
new ByStateCommandHandlerBuilder[Command, Event, State]()
/**
* @return A new, mutable, builder
*/
protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] =
new EventHandlerBuilder[Event, State]()
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {}
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet()
/**
* INTERNAL API
*/
override private[akka] def untypedProps = {
new scaladsl.PersistentBehavior[Command, Event, State](
_ persistenceId,
initialState,
(ctx, s, e) commandHandler.apply(ctx.asJava, s, e).asInstanceOf[EffectImpl[Event, State]],
(s: State, e: Event) eventHandler().apply(s, e),
(ctx, s) onRecoveryCompleted(ctx.asJava, s),
e tagsFor(e).asScala.toSet,
shouldSnapshot
).untypedProps
}
}

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.DoNotInherit
import akka.persistence.typed.internal._
import scala.collection.{ immutable im }
/**
* Factories for effects - how a persistent actor reacts on a command
*/
object Effect {
// TODO docs
def persist[Event, State](event: Event): Effect[Event, State] = Persist(event)
// TODO docs
def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] =
persist(evt1 :: evt2 :: events.toList)
// TODO docs
def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] =
eventOpt match {
case Some(evt) persist[Event, State](evt)
case _ none[Event, State]
}
// TODO docs
def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] =
PersistAll(events)
// TODO docs
def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] =
new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects)
/**
* Do not persist anything
*/
def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
/**
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/**
* Stop this persistent actor
*/
def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]]
}
/**
* Instances are created through the factories in the [[Effect]] companion object.
*
* Not for user extension.
*/
@DoNotInherit
trait Effect[+Event, State] extends akka.persistence.typed.javadsl.Effect[Event, State] { self: EffectImpl[Event, State]
/* All events that will be persisted in this effect */
def events: im.Seq[Event]
def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]]
/** Convenience method to register a side effect with just a callback function */
final def andThen(callback: State Unit): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State](callback))
/** Convenience method to register a side effect with just a lazy expression */
final def andThen(callback: Unit): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State]((_: State) callback))
/** The side effect is to stop the actor */
def andThenStop: Effect[Event, State] =
CompositeEffect(this, Effect.stop[Event, State])
}

View file

@ -3,102 +3,52 @@
*/
package akka.persistence.typed.scaladsl
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.internal.TimerSchedulerImpl
import akka.actor.typed.scaladsl.{ ActorContext, TimerScheduler }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.persistence.{ Recovery, SnapshotSelectionCriteria }
import akka.persistence.typed.internal._
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
import akka.util.ConstantFun
import scala.collection.{ immutable im }
import scala.language.implicitConversions
object PersistentBehaviors {
// we use this type internally, however it's easier for users to understand the function, so we use it in external api
type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) Effect[Event, State]
/**
* Create a `Behavior` for a persistent actor.
*/
def immutable[Command, Event, State](
persistenceId: String,
initialState: State,
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] = {
// FIXME remove `persistenceIdFromActorName: String ⇒ String` from PersistentBehavior
new PersistentBehavior(_ persistenceId, initialState, commandHandler, eventHandler,
recoveryCompleted = (_, _) (),
tagger = _ Set.empty,
snapshotOn = (_, _, _) false)
}
commandHandler: (ActorContext[Command], State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(
persistenceId = persistenceId,
initialState = initialState,
commandHandler = commandHandler,
eventHandler = eventHandler
)
/**
* Factories for effects - how a persistent actor reacts on a command
*/
object Effect {
def persist[Event, State](event: Event): Effect[Event, State] =
Persist(event)
def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] =
persist(evt1 :: evt2 :: events.toList)
def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] =
eventOpt match {
case Some(evt) persist[Event, State](evt)
case _ none[Event, State]
}
def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] =
PersistAll(events)
def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] =
new CompositeEffect[Event, State](PersistAll[Event, State](events), sideEffects)
/**
* Do not persist anything
*/
def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
/**
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/**
* Stop this persistent actor
*/
def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]]
}
/**
* Instances are created through the factories in the [[Effect]] companion object.
* Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known
* until the actor is started and typically based on the entityId, which
* is the actor name.
*
* Not for user extension.
* TODO This will not be needed when it can be wrapped in `Actor.deferred`.
*/
@DoNotInherit
trait Effect[+Event, State] {
self: EffectImpl[Event, State]
/* All events that will be persisted in this effect */
def events: im.Seq[Event]
def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]]
/** Convenience method to register a side effect with just a callback function */
final def andThen(callback: State Unit): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State](callback))
/** Convenience method to register a side effect with just a lazy expression */
final def andThen(callback: Unit): Effect[Event, State] =
CompositeEffect(this, SideEffect[Event, State]((_: State) callback))
/** The side effect is to stop the actor */
def andThenStop: Effect[Event, State] =
CompositeEffect(this, Effect.stop[Event, State])
}
/**
* Not for user extension
*/
@DoNotInherit
abstract class ChainableEffect[Event, State] extends EffectImpl[Event, State]
type CommandHandler[Command, Event, State] = (ActorContext[Command], State, Command) Effect[Event, State]
@Deprecated // FIXME remove this
def persistentEntity[Command, Event, State](
persistenceIdFromActorName: String String,
initialState: State,
commandHandler: (ActorContext[Command], State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
???
/**
* The `CommandHandler` defines how to act on commands.
@ -137,19 +87,62 @@ object PersistentBehaviors {
}
}
class PersistentBehavior[Command, Event, State](
@InternalApi private[akka] val persistenceIdFromActorName: String String,
val initialState: State,
val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
val eventHandler: (State, Event) State,
val recoveryCompleted: (ActorContext[Command], State) Unit,
val tagger: Event Set[String],
val snapshotOn: (State, Event, Long) Boolean) extends UntypedBehavior[Command] {
@DoNotInherit
class PersistentBehavior[Command, Event, State] private (
val persistenceId: String,
val initialState: State,
val commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
val eventHandler: (State, Event) State,
import PersistentBehaviors._
val recoveryCompleted: (ActorContext[Command], State) Unit,
val tagger: Event Set[String],
val journalPluginId: String,
val snapshotPluginId: String,
val snapshotWhen: (State, Event, Long) Boolean,
val recovery: Recovery
) extends DeferredBehavior[Command](ctx
TimerSchedulerImpl.wrapWithTimers[Command] { timers
val callbacks = EventsourcedCallbacks[Command, Event, State](
initialState,
commandHandler,
eventHandler,
snapshotWhen,
recoveryCompleted,
tagger
)
val pluginIds = EventsourcedPluginIds(
journalPluginId,
snapshotPluginId
)
new EventsourcedRequestingRecoveryPermit(
persistenceId,
ctx.asInstanceOf[ActorContext[Any]], // sorry
timers.asInstanceOf[TimerScheduler[Any]], // sorry
recovery,
callbacks,
pluginIds
).narrow[Command]
/** INTERNAL API */
@InternalApi private[akka] override def untypedProps: akka.actor.Props = PersistentActorImpl.props(() this)
}(ctx)) {
def this(
persistenceId: String,
initialState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: (State, Event) State) {
this(
persistenceId,
initialState,
commandHandler,
eventHandler,
recoveryCompleted = ConstantFun.scalaAnyTwoToUnit,
tagger = (_: Event) Set.empty[String],
journalPluginId = "" /* default plugin */ ,
snapshotPluginId = "" /* default plugin */ ,
snapshotWhen = ConstantFun.scalaAnyThreeToFalse,
recovery = Recovery()
)
}
/**
* The `callback` function is called to notify the actor that the recovery process
@ -165,8 +158,8 @@ class PersistentBehavior[Command, Event, State](
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotOn(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotOn = predicate)
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
@ -175,7 +168,35 @@ class PersistentBehavior[Command, Event, State](
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotOn = (_, _, seqNr) seqNr % numberOfEvents == 0)
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withPersistencePluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "persistence plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = id)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = id)
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip recovering snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
@ -185,12 +206,26 @@ class PersistentBehavior[Command, Event, State](
copy(tagger = tagger)
private def copy(
persistenceIdFromActorName: String String = persistenceIdFromActorName,
initialState: State = initialState,
commandHandler: CommandHandler[Command, Event, State] = commandHandler,
eventHandler: (State, Event) State = eventHandler,
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted,
tagger: Event Set[String] = tagger,
snapshotOn: (State, Event, Long) Boolean = snapshotOn): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted, tagger, snapshotOn)
initialState: State = initialState,
commandHandler: CommandHandler[Command, Event, State] = commandHandler,
eventHandler: (State, Event) State = eventHandler,
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted,
tagger: Event Set[String] = tagger,
snapshotWhen: (State, Event, Long) Boolean = snapshotWhen,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
recovery: Recovery = recovery): PersistentBehavior[Command, Event, State] =
new PersistentBehavior[Command, Event, State](
persistenceId = persistenceId,
initialState = initialState,
commandHandler = commandHandler,
eventHandler = eventHandler,
recoveryCompleted = recoveryCompleted,
tagger = tagger,
journalPluginId = journalPluginId,
snapshotPluginId = snapshotPluginId,
snapshotWhen = snapshotWhen,
recovery = recovery)
}

View file

@ -7,8 +7,8 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.japi.function.Function3;
import akka.persistence.typed.scaladsl.PersistentActorSpec;
import akka.persistence.typed.scaladsl.PersistentActorSpec$;
import akka.persistence.typed.scaladsl.PersistentBehaviorSpec;
import akka.persistence.typed.scaladsl.PersistentBehaviorSpec$;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.typed.javadsl.TestKitJunitResource;
import akka.testkit.typed.scaladsl.ActorTestKit;
@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals;
public class PersistentActorTest {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentActorSpec$.MODULE$.config());
public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec$.MODULE$.config());
static final Incremented timeoutEvent = new Incremented(100);
static final State emptyState = new State(0, Collections.emptyList());

View file

@ -30,22 +30,12 @@ public class BasicPersistentBehaviorsTest {
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return new CommandHandler<Command, Event, State>() {
@Override
public Effect<Event, State> apply(ActorContext<Command> ctx, State state, Command command) {
return Effect().none();
}
};
return (ctx, state, command) -> Effect().none();
}
@Override
public EventHandler<Event, State> eventHandler() {
return new EventHandler<Event, State>() {
@Override
public State apply(State state, Event event) {
return state;
}
};
return (state, event) -> state;
}
//#recovery

View file

@ -199,7 +199,7 @@ object PersistentActorCompileOnlyTest {
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}).snapshotOn { (state, e, seqNr) state.tasksInFlight.isEmpty }
}).snapshotWhen { (state, e, seqNr) state.tasksInFlight.isEmpty }
}
object SpawnChild {

View file

@ -3,31 +3,31 @@
*/
package akka.persistence.typed.scaladsl
import akka.actor.ActorSystemImpl
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown }
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.scaladsl.PersistentBehaviors._
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.scaladsl._
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.Eventually
import scala.concurrent.Future
import scala.concurrent.duration._
object PersistentActorSpec {
object PersistentBehaviorSpec {
class InMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, SnapshotMetadata)]
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
log.debug("loadAsync: {} {}", persistenceId, criteria)
Future.successful(state.get(persistenceId).map(r SelectedSnapshot(r._2, r._1)))
}
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
log.debug("saveAsync: {} {}", metadata, snapshot)
state += (metadata.persistenceId -> (snapshot, metadata))
Future.successful(())
}
@ -39,13 +39,18 @@ object PersistentActorSpec {
val config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentActorSpec$$InMemorySnapshotStore"
# akka.persistence.typed.log-stashing = INFO
akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore"
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem"
""")
sealed trait Command
final case object Increment extends Command
final case object IncrementThenLogThenStop extends Command
final case object IncrementTwiceThenLogThenStop extends Command
final case class IncrementWithPersistAll(nr: Int) extends Command
final case object IncrementLater extends Command
final case object IncrementAfterReceiveTimeout extends Command
@ -86,11 +91,28 @@ object PersistentActorSpec {
commandHandler = (ctx, state, cmd) cmd match {
case Increment
Effect.persist(Incremented(1))
case IncrementThenLogThenStop
Effect.persist(Incremented(1))
.andThen {
loggingActor ! firstLogging
}
.andThenStop
case IncrementTwiceThenLogThenStop
Effect.persist(Incremented(1), Incremented(2))
.andThen {
loggingActor ! firstLogging
}
.andThenStop
case IncrementWithPersistAll(n)
Effect.persist((0 until n).map(_ Incremented(1)))
case GetValue(replyTo)
replyTo ! state
Effect.none
case IncrementLater
// purpose is to test signals
val delay = ctx.spawnAnonymous(Behaviors.withTimers[Tick.type] { timers
@ -101,14 +123,18 @@ object PersistentActorSpec {
})
ctx.watchWith(delay, DelayFinished)
Effect.none
case DelayFinished
Effect.persist(Incremented(10))
case IncrementAfterReceiveTimeout
ctx.setReceiveTimeout(10.millis, Timeout)
Effect.none
case Timeout
ctx.cancelReceiveTimeout()
Effect.persist(Incremented(100))
case IncrementTwiceAndThenLog
Effect
.persist(Incremented(1), Incremented(1))
@ -132,25 +158,29 @@ object PersistentActorSpec {
.andThen {
loggingActor ! firstLogging
}
case LogThenStop
Effect.none.andThen {
loggingActor ! firstLogging
}.andThenStop
Effect.none
.andThen {
loggingActor ! firstLogging
}
.andThenStop
},
eventHandler = (state, evt) evt match {
case Incremented(delta)
probe ! (state, evt)
probe ! ((state, evt))
State(state.value + delta, state.history :+ state.value)
})
}
}
class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpecWithShutdown {
class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually {
import PersistentBehaviorSpec._
override def config = PersistentActorSpec.config
override def config: Config = PersistentBehaviorSpec.config
import PersistentActorSpec._
implicit val testSettings = TestKitSettings(system)
"A typed persistent actor" must {
@ -171,7 +201,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
c ! Increment
c ! Increment
c ! GetValue(probe.ref)
probe.expectMessage(State(3, Vector(0, 1, 2)))
probe.expectMessage(10.seconds, State(3, Vector(0, 1, 2)))
val c2 = spawn(counter("c2"))
c2 ! GetValue(probe.ref)
@ -224,6 +254,27 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
loggingProbe.expectMessage(secondLogging)
}
"persist then stop" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c5a", loggingProbe.ref))
val watchProbe = watcher(c)
c ! IncrementThenLogThenStop
loggingProbe.expectMessage(firstLogging)
watchProbe.expectMessage("Terminated")
}
"persist(All) then stop" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c5b", loggingProbe.ref))
val watchProbe = watcher(c)
c ! IncrementTwiceThenLogThenStop
loggingProbe.expectMessage(firstLogging)
watchProbe.expectMessage("Terminated")
}
/** Proves that side-effects are called when emitting an empty list of events */
"chainable side effects without events" in {
val loggingProbe = TestProbe[String]
@ -249,10 +300,6 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
}
"work when wrapped in other behavior" in {
// FIXME This is a major problem with current implementation. Since the
// behavior is running as an untyped PersistentActor it's not possible to
// wrap it in Actor.setup or Actor.supervise
pending
val probe = TestProbe[State]
val behavior = Behaviors.supervise[Command](counter("c13"))
.onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
@ -262,7 +309,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
probe.expectMessage(State(1, Vector(0)))
}
"stop after persisting" in {
"stop after logging (no persisting)" in {
val loggingProbe = TestProbe[String]
val c: ActorRef[Command] = spawn(counter("c8", loggingProbe.ref))
val watchProbe = watcher(c)
@ -272,7 +319,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
}
"snapshot via predicate" in {
val alwaysSnapshot = counter("c9").snapshotOn { (_, _, _) true }
val alwaysSnapshot = counter("c9").snapshotWhen { (_, _, _) true }
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
@ -293,9 +340,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
}
"check all events for snapshot in PersistAll" in {
val snapshotAtTwo = counter("c11").snapshotOn { (s, e, _)
s.value == 2
}
val snapshotAtTwo = counter("c11").snapshotWhen { (s, _, _) s.value == 2 }
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
@ -325,7 +370,7 @@ class PersistentActorSpec extends ActorTestKit with Eventually with TypedAkkaSpe
c ! LogThenStop
watchProbe.expectMessage("Terminated")
// no shapshot should have happened
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe("c10", probeC2.ref).snapshotEvery(2))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))

View file

@ -6,7 +6,8 @@ package docs.akka.persistence.typed
import akka.Done
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.{ CommandHandler, Effect }
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
import akka.persistence.typed.scaladsl.Effect
object InDepthPersistentBehaviorSpec {

View file

@ -17,24 +17,24 @@ import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
/** INTERNAL API */
@InternalApi
private[persistence] object Eventsourced {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal round-trip)
private val instanceIdCounter = new AtomicInteger(1)
private sealed trait PendingHandlerInvocation {
/** INTERNAL API */
private[akka] sealed trait PendingHandlerInvocation {
def evt: Any
def handler: Any Unit
}
/** forces actor to stash incoming commands until all these invocations are handled */
private final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */
private final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** INTERNAL API: forces actor to stash incoming commands until all these invocations are handled */
private[akka] final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** INTERNAL API: does not force the actor to stash commands; Originates from either `persistAsync` or `defer` calls */
private[akka] final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** message used to detect that recovery timed out */
private final case class RecoveryTick(snapshot: Boolean)
/** INTERNAL API: message used to detect that recovery timed out */
private[akka] final case class RecoveryTick(snapshot: Boolean)
}
/**

View file

@ -18,9 +18,11 @@ import akka.actor.Terminated
def props(maxPermits: Int): Props =
Props(new RecoveryPermitter(maxPermits))
case object RequestRecoveryPermit
case object RecoveryPermitGranted
case object ReturnRecoveryPermit
sealed trait Protocol
sealed trait Reply extends Protocol
case object RequestRecoveryPermit extends Protocol
case object RecoveryPermitGranted extends Reply
case object ReturnRecoveryPermit extends Protocol
}
@ -49,18 +51,18 @@ import akka.actor.Terminated
}
case ReturnRecoveryPermit
returnRecoveryPermit(sender())
onReturnRecoveryPermit(sender())
case Terminated(ref)
// pre-mature termination should be rare
if (!pending.remove(ref))
returnRecoveryPermit(ref)
onReturnRecoveryPermit(ref)
}
private def returnRecoveryPermit(ref: ActorRef): Unit = {
private def onReturnRecoveryPermit(ref: ActorRef): Unit = {
usedPermits -= 1
context.unwatch(ref)
if (usedPermits < 0) throw new IllegalStateException("permits must not be negative")
if (usedPermits < 0) throw new IllegalStateException(s"permits must not be negative (returned by: ${ref})")
if (!pending.isEmpty) {
val ref = pending.poll()
recoveryPermitGranted(ref)

View file

@ -29,7 +29,7 @@ trait Snapshotter extends Actor {
* Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
* to the running [[PersistentActor]].
*/
def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit =
snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)
/**

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl.io
import java.io.{ IOException, InputStream }
import java.io.{ BufferedOutputStream, ByteArrayOutputStream, IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import akka.annotation.InternalApi