Rename PersistentBehavior (#25721)

Migrated InternalProtocol with least refactor changes, in the end.
This commit is contained in:
Helena Edelson 2018-12-03 21:06:25 -08:00 committed by Patrik Nordwall
parent 7afd847758
commit dbfe6f38c7
63 changed files with 386 additions and 426 deletions

View file

@ -221,7 +221,7 @@ object Entity {
}
/**
* Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* Defines how the [[EventSourcedEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
@ -234,7 +234,7 @@ object Entity {
*/
def ofPersistentEntity[Command, Event, State >: Null](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = {
createPersistentEntity: JFunction[EntityContext[Command], EventSourcedEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = {
of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {

View file

@ -8,21 +8,21 @@ import java.util.Optional
import akka.actor.typed.BackoffSupervisorStrategy
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.PersistentBehavior
import akka.persistence.typed.javadsl.EventSourcedBehavior
/**
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` class is provided as convenience.
*
* It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId`
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
abstract class PersistentEntity[Command, Event, State >: Null] private (
abstract class EventSourcedEntity[Command, Event, State >: Null] private (
val entityTypeKey: EntityTypeKey[Command],
val entityId: String,
persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) {
extends EventSourcedBehavior[Command, Event, State](persistenceId, supervisorStrategy) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(entityTypeKey, entityId,

View file

@ -214,7 +214,7 @@ object Entity {
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore [[PersistentEntity]] is provided as a convenience for creating such
* is very common and therefore [[EventSourcedEntity]] is provided as a convenience for creating such
* `PersistentBehavior`.
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster

View file

@ -5,9 +5,9 @@
package akka.cluster.sharding.typed.scaladsl
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
object PersistentEntity {
object EventSourcedEntity {
/**
* Create a `Behavior` for a persistent actor that is used with Cluster Sharding.
@ -15,7 +15,7 @@ object PersistentEntity {
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent
* actors is very common and therefore this `PersistentEntity` is provided as convenience.
*
* It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId`
* It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId`
* automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using
* [[EntityTypeKey.persistenceIdFrom]].
*/
@ -24,6 +24,6 @@ object PersistentEntity {
entityId: String,
emptyState: State,
commandHandler: (State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler)
eventHandler: (State, Event) State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler)
}

View file

@ -23,8 +23,6 @@ import org.scalatest.junit.JUnitSuite;
import java.util.concurrent.CompletionStage;
import static org.junit.Assert.assertEquals;
public class ClusterShardingPersistenceTest extends JUnitSuite {
public static final Config config = ConfigFactory.parseString(
@ -72,7 +70,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
static class TestPersistentEntity extends PersistentEntity<Command, String, String> {
static class TestPersistentEntity extends EventSourcedEntity<Command, String, String> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");

View file

@ -20,7 +20,7 @@ import org.scalatest.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.HelloWorldPersistentEntityExample.*;
import static org.junit.Assert.assertEquals;
public class HelloWorldPersistentEntityExampleTest extends JUnitSuite {
public class HelloWorldEventSourcedEntityExampleTest extends JUnitSuite {
public static final Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +

View file

@ -15,7 +15,7 @@ import java.util.concurrent.CompletionStage;
//#persistent-entity-import
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.PersistentEntity;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntity;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
@ -61,7 +61,7 @@ public class HelloWorldPersistentEntityExample {
//#persistent-entity
public static class HelloWorld extends PersistentEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
public static class HelloWorld extends EventSourcedEntity<HelloWorld.Command, HelloWorld.Greeted, HelloWorld.KnownPeople> {
// Command
interface Command {

View file

@ -72,7 +72,7 @@ object ClusterShardingPersistenceSpec {
case promise promise.trySuccess(ctx.self.unsafeUpcast)
}
PersistentEntity[Command, String, String](
EventSourcedEntity[Command, String, String](
entityTypeKey = typeKey,
entityId = entityId,
emptyState = "",

View file

@ -12,7 +12,7 @@ import akka.cluster.typed.Join
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object HelloWorldPersistentEntityExampleSpec {
object HelloWorldEventSourcedEntityExampleSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
@ -25,7 +25,7 @@ object HelloWorldPersistentEntityExampleSpec {
""")
}
class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldPersistentEntityExampleSpec.config) with WordSpecLike {
class HelloWorldEventSourcedEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldEventSourcedEntityExampleSpec.config) with WordSpecLike {
import HelloWorldPersistentEntityExample.HelloWorld
import HelloWorldPersistentEntityExample.HelloWorld._

View file

@ -41,7 +41,7 @@ object HelloWorldPersistentEntityExample {
//#persistent-entity
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.PersistentEntity
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.scaladsl.Effect
object HelloWorld {
@ -80,7 +80,7 @@ object HelloWorldPersistentEntityExample {
val entityTypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("HelloWorld")
def persistentEntity(entityId: String): Behavior[Command] = PersistentEntity(
def persistentEntity(entityId: String): Behavior[Command] = EventSourcedEntity(
entityTypeKey = entityTypeKey,
entityId = entityId,
emptyState = KnownPeople(Set.empty),

View file

@ -6,7 +6,6 @@ package akka.cluster.sharding
import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.{ TestDuration, TestProbe }

View file

@ -4,12 +4,10 @@
package akka.cluster.sharding
import scala.concurrent.duration._
import java.io.File
import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName

View file

@ -4,12 +4,10 @@
package akka.cluster.sharding
import scala.concurrent.duration._
import java.io.File
import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
@ -19,8 +17,6 @@ import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
import akka.cluster.sharding.ShardRegion.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
object ClusterShardingRememberEntitiesSpec {

View file

@ -6,7 +6,7 @@ package akka.cluster.typed
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.{ ActorRef, Behavior, Props }
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
@ -36,7 +36,7 @@ object ClusterSingletonPersistenceSpec {
private final case object StopPlz extends Command
val persistentActor: Behavior[Command] =
PersistentBehavior[Command, String, String](
EventSourcedBehavior[Command, String, String](
persistenceId = PersistenceId("TheSingleton"),
emptyState = "",
commandHandler = (state, cmd) cmd match {

View file

@ -33,7 +33,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in
## Example
Let's start with a simple example. The minimum required for a `PersistentBehavior` is:
Let's start with a simple example. The minimum required for a `EventSourcedBehavior` is:
Scala
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #structure }
@ -45,7 +45,7 @@ The first important thing to notice is the `Behavior` of a persistent actor is t
because this is the type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system.
The event and state are only used internally.
The components that make up a PersistentBehavior are:
The components that make up a EventSourcedBehavior are:
* `persistenceId` is the stable unique identifier for the persistent actor.
* `emptyState` defines the `State` when the entity is first created e.g. a Counter would start with 0 as state.
@ -68,7 +68,7 @@ and can be one of:
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
In addition to returning the primary `Effect` for the command `PersistentBehavior`s can also
In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also
chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun`
function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function
is a convenience around creating a `SideEffect`.
@ -122,7 +122,7 @@ Scala
Java
: @@snip [PersistentActorCompileOnyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-handler }
These are used to create a `PersistentBehavior`:
These are used to create a `EventSourcedBehavior`:
Scala
: @@snip [PersistentActorCompileOnyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #behavior }
@ -137,7 +137,7 @@ where resilience is important so that if a node crashes the persistent actors ar
resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a
cluster and address them by id.
The `PersistentBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md),
The `EventSourcedBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md),
but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together
with Cluster Sharding. For a particular `persistenceId` only one persistent actor instance should be active at one time.
If multiple instances were to persist events at the same time, the events would be interleaved and might not be
@ -233,7 +233,7 @@ Scala
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #event-handler }
And finally the behavior is created @scala[from the `PersistentBehavior.apply`]:
And finally the behavior is created @scala[from the `EventSourcedBehavior.apply`]:
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior }
@ -299,13 +299,13 @@ Java
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`.
If it's defined with @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
it can be used to enforce that replies are not forgotten when implementing the `EventSourcedBehavior`.
If it's defined with @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`]
there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be
created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`],
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].
These effects will send the reply message even when @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
These effects will send the reply message even when @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`]
is not used, but then there will be no compilation errors if the reply decision is left out.
Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
@ -363,13 +363,13 @@ Java
## Event adapters
Event adapters can be programmatically added to your `PersistentBehavior`s that can convert from your `Event` type
Event adapters can be programmatically added to your `EventSourcedBehavior`s that can convert from your `Event` type
to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:
Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #event-wrapper }
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }
@ -377,14 +377,14 @@ Java
Then install it on a persistent behavior:
Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #install-event-adapter }
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }
## Wrapping Persistent Behaviors
When creating a `PersistentBehavior`, it is possible to wrap `PersistentBehavior` in
When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
to access the actor logging upon taking snapshots for debug purpose.
@ -397,7 +397,7 @@ Java
## Journal failures
By default a `PersistentBehavior` will stop if an exception is thrown from the journal. It is possible to override this with
By default a `EventSourcedBehavior` will stop if an exception is thrown from the journal. It is possible to override this with
any `BackoffSupervisorStrategy`. It is not possible to use the normal supervision wrapping for this as it isn't valid to
`resume` a behavior on a journal failure as it is not known if the event was persisted.
@ -412,5 +412,5 @@ Java
Journals can reject events. The difference from a failure is that the journal must decide to reject an event before
trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won't be in the journal.
This is signalled to a `PersistentBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md).
This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md).

View file

@ -5,36 +5,32 @@
package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext
import scala.util.Try
import akka.Done
import akka.actor.typed.Logger
import akka.actor.Cancellable
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.actor.typed.Logger
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal.EventsourcedBehavior.MDC
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
import akka.util.Collections.EmptyImmutableSeq
import akka.util.OptionVal
import scala.util.Try
import akka.actor.Cancellable
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors.
*/
@InternalApi
private[persistence] final class EventsourcedSetup[C, E, S](
private[persistence] final class BehaviorSetup[C, E, S](
val context: ActorContext[InternalProtocol],
val persistenceId: PersistenceId,
val emptyState: S,
val commandHandler: PersistentBehavior.CommandHandler[C, E, S],
val eventHandler: PersistentBehavior.EventHandler[S, E],
val writerIdentity: WriterIdentity,
val commandHandler: EventSourcedBehavior.CommandHandler[C, E, S],
val eventHandler: EventSourcedBehavior.EventHandler[S, E],
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
val recoveryCompleted: S Unit,
val onRecoveryFailure: Throwable Unit,
val onSnapshot: (SnapshotMetadata, Try[Done]) Unit,
@ -43,10 +39,11 @@ private[persistence] final class EventsourcedSetup[C, E, S](
val snapshotWhen: (S, E, Long) Boolean,
val recovery: Recovery,
var holdingRecoveryPermit: Boolean,
val settings: EventsourcedSettings,
val settings: EventSourcedSettings,
val internalStash: StashBuffer[InternalProtocol]
) {
import akka.actor.typed.scaladsl.adapter._
import InternalProtocol.RecoveryTickEvent
val persistence: Persistence = Persistence(context.system.toUntyped)
@ -74,14 +71,14 @@ private[persistence] final class EventsourcedSetup[C, E, S](
}
}
def setMdc(newMdc: Map[String, Any]): EventsourcedSetup[C, E, S] = {
def setMdc(newMdc: Map[String, Any]): BehaviorSetup[C, E, S] = {
mdc = newMdc
// mdc is changed often, for each persisted event, but logging is rare, so lazy init of Logger
_log = OptionVal.None
this
}
def setMdc(phaseName: String): EventsourcedSetup[C, E, S] = {
def setMdc(phaseName: String): BehaviorSetup[C, E, S] = {
setMdc(MDC.create(persistenceId, phaseName))
this
}
@ -111,3 +108,19 @@ private[persistence] final class EventsourcedSetup[C, E, S](
}
object MDC {
// format: OFF
val AwaitingPermit = "get-permit"
val ReplayingSnapshot = "replay-snap"
val ReplayingEvents = "replay-evts"
val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts"
// format: ON
def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = {
Map(
"persistenceId" persistenceId.id,
"phase" phaseName
)
}
}

View file

@ -4,6 +4,9 @@
package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.typed
import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, BehaviorInterceptor, PostStop, Signal, SupervisorStrategy }
@ -11,15 +14,14 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl._
import akka.persistence.typed.PersistenceId
import akka.util.ConstantFun
import scala.util.{ Failure, Success, Try }
import akka.persistence.typed.PersistenceId
@InternalApi
private[akka] object PersistentBehaviorImpl {
private[akka] object EventSourcedBehaviorImpl {
def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = {
result match {
@ -29,40 +31,56 @@ private[akka] object PersistentBehaviorImpl {
ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta)
}
}
object WriterIdentity {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
private[akka] val instanceIdCounter = new AtomicInteger(1)
def newIdentity(): WriterIdentity = {
val instanceId: Int = WriterIdentity.instanceIdCounter.getAndIncrement()
val writerUuid: String = UUID.randomUUID.toString
WriterIdentity(instanceId, writerUuid)
}
}
final case class WriterIdentity(instanceId: Int, writerUuid: String)
}
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: PersistentBehavior.CommandHandler[Command, Event, State],
eventHandler: PersistentBehavior.EventHandler[State, Event],
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
recoveryCompleted: State Unit = ConstantFun.scalaAnyToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
onSnapshot: (SnapshotMetadata, Try[Done]) Unit = ConstantFun.scalaAnyTwoToUnit,
onRecoveryFailure: Throwable Unit = ConstantFun.scalaAnyToUnit
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State],
eventHandler: EventSourcedBehavior.EventHandler[State, Event],
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
recoveryCompleted: State Unit = ConstantFun.scalaAnyToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
onSnapshot: (SnapshotMetadata, Try[Done]) Unit = ConstantFun.scalaAnyTwoToUnit,
onRecoveryFailure: Throwable Unit = ConstantFun.scalaAnyToUnit
) extends EventSourcedBehavior[Command, Event, State] with StashReferenceManagement {
import EventSourcedBehaviorImpl.WriterIdentity
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.supervise {
Behaviors.setup[Command] { ctx
val settings = EventsourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
val internalStash = stashBuffer(settings)
// the default impl needs context which isn't available until here, so we
// use the anyTwoToUnit as a marker to use the default
val actualOnSnapshot: (SnapshotMetadata, Try[Done]) Unit =
if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) PersistentBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _)
if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) EventSourcedBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _)
else onSnapshot
val eventsourcedSetup = new EventsourcedSetup(
val eventsourcedSetup = new BehaviorSetup(
ctx.asInstanceOf[ActorContext[InternalProtocol]],
persistenceId,
emptyState,
@ -97,7 +115,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
target(ctx, signal)
}
}
val widened = EventsourcedRequestingRecoveryPermit(eventsourcedSetup).widen[Any] {
val widened = RequestingRecoveryPermit(eventsourcedSetup).widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
@ -113,7 +131,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: State Unit): PersistentBehavior[Command, Event, State] =
def onRecoveryCompleted(callback: State Unit): EventSourcedBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
@ -123,7 +141,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State] =
def snapshotWhen(predicate: (State, Event, Long) Boolean): EventSourcedBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
@ -131,7 +149,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = {
def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
@ -139,7 +157,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = {
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
}
@ -147,7 +165,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = {
def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = if (id != "") Some(id) else None)
}
@ -160,27 +178,27 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = {
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
def withTagger(tagger: Event Set[String]): EventSourcedBehavior[Command, Event, State] =
copy(tagger = tagger)
/**
* Adapt the event before sending to the journal e.g. wrapping the event in a type
* the journal understands
*/
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] =
def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] =
copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]])
/**
* The `callback` function is called to notify the actor that a snapshot has finished
*/
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] =
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): EventSourcedBehavior[Command, Event, State] =
copy(onSnapshot = callback)
/**
@ -191,13 +209,23 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
*
* If not specified the actor will be stopped on failure.
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] =
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy)
/**
* The `callback` function is called to notify that recovery has failed. For setting a supervision
* strategy `onPersistFailure`
*/
def onRecoveryFailure(callback: Throwable Unit): PersistentBehavior[Command, Event, State] =
def onRecoveryFailure(callback: Throwable Unit): EventSourcedBehavior[Command, Event, State] =
copy(onRecoveryFailure = callback)
}
/** Protocol used internally by the eventsourced behaviors. */
private[akka] sealed trait InternalProtocol
private[akka] object InternalProtocol {
case object RecoveryPermitGranted extends InternalProtocol
final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol
final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol
}

View file

@ -16,12 +16,12 @@ import scala.concurrent.duration._
/**
* INTERNAL API
*/
@InternalApi private[akka] object EventsourcedSettings {
@InternalApi private[akka] object EventSourcedSettings {
def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventsourcedSettings =
def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventSourcedSettings =
apply(system.settings.config, journalPluginId, snapshotPluginId)
def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = {
def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = {
val typedConfig = config.getConfig("akka.persistence.typed")
// StashOverflowStrategy
@ -36,7 +36,7 @@ import scala.concurrent.duration._
val recoveryEventTimeout: FiniteDuration =
journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis
EventsourcedSettings(
EventSourcedSettings(
stashCapacity = stashCapacity,
stashOverflowStrategyConfigurator,
logOnStashing = logOnStashing,
@ -59,7 +59,7 @@ import scala.concurrent.duration._
}
@InternalApi
private[akka] final case class EventsourcedSettings(
private[akka] final case class EventSourcedSettings(
stashCapacity: Int,
stashOverflowStrategyConfigurator: String,
logOnStashing: Boolean,

View file

@ -1,55 +0,0 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.annotation.InternalApi
import akka.persistence.typed.PersistenceId
/** INTERNAL API */
@InternalApi
private[akka] object EventsourcedBehavior {
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
private[akka] val instanceIdCounter = new AtomicInteger(1)
object WriterIdentity {
def newIdentity(): WriterIdentity = {
val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement()
val writerUuid: String = UUID.randomUUID.toString
WriterIdentity(instanceId, writerUuid)
}
}
final case class WriterIdentity(instanceId: Int, writerUuid: String)
object MDC {
// format: OFF
val AwaitingPermit = "get-permit"
val ReplayingSnapshot = "replay-snap"
val ReplayingEvents = "replay-evts"
val RunningCmds = "running-cmnds"
val PersistingEvents = "persist-evts"
// format: ON
def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = {
Map(
"persistenceId" persistenceId.id,
"phase" phaseName
)
}
}
/** Protocol used internally by the eventsourced behaviors, never exposed to user-land */
sealed trait InternalProtocol
object InternalProtocol {
case object RecoveryPermitGranted extends InternalProtocol
final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol
final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol
}
}

View file

@ -1,31 +0,0 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.util.OptionVal
/**
* INTERNAL API
* Main reason for introduction of this trait is stash buffer reference management
* in order to survive restart of internal behavior
*/
@InternalApi private[akka] trait EventsourcedStashReferenceManagement {
private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None
def stashBuffer(settings: EventsourcedSettings): StashBuffer[InternalProtocol] = {
val buffer: StashBuffer[InternalProtocol] = stashBuffer match {
case OptionVal.Some(value) value
case _ StashBuffer(settings.stashCapacity)
}
this.stashBuffer = OptionVal.Some(buffer)
stashBuffer.get
}
def clearStashBuffer(): Unit = stashBuffer = OptionVal.None
}

View file

@ -11,23 +11,22 @@ import akka.annotation.InternalApi
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import scala.collection.immutable
/** INTERNAL API */
@InternalApi
private[akka] trait EventsourcedJournalInteractions[C, E, S] {
private[akka] trait JournalInteractions[C, E, S] {
def setup: EventsourcedSetup[C, E, S]
def setup: BehaviorSetup[C, E, S]
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
// ---------- journal interactions ---------
protected def internalPersist(
state: EventsourcedRunning.EventsourcedState[S],
event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = {
state: Running.EventsourcedState[S],
event: EventOrTagged): Running.EventsourcedState[S] = {
val newState = state.nextSequenceNr()
@ -48,7 +47,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
protected def internalPersistAll(
events: immutable.Seq[EventOrTagged],
state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = {
state: Running.EventsourcedState[S]): Running.EventsourcedState[S] = {
if (events.nonEmpty) {
var newState = state
@ -107,7 +106,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfUntyped)
}
protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = {
protected def internalSaveSnapshot(state: Running.EventsourcedState[S]): Unit = {
// don't store null state
if (state.state != null)
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(

View file

@ -10,12 +10,10 @@ import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.internal.EventsourcedBehavior._
import scala.util.control.NonFatal
import akka.actor.typed.internal.PoisonPill
import scala.util.control.NonFatal
/***
* INTERNAL API
*
@ -24,14 +22,14 @@ import akka.actor.typed.internal.PoisonPill
* In this behavior we finally start replaying events, beginning from the last applied sequence number
* (i.e. the one up-until-which the snapshot recovery has brought us).
*
* Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed
* Once recovery is completed, the actor becomes [[Running]], stashed messages are flushed
* and control is given to the user's handlers to drive the actors behavior from there.
*
* See next behavior [[EventsourcedRunning]].
* See previous behavior [[EventsourcedReplayingSnapshot]].
* See next behavior [[Running]].
* See previous behavior [[ReplayingSnapshot]].
*/
@InternalApi
private[persistence] object EventsourcedReplayingEvents {
private[persistence] object ReplayingEvents {
@InternalApi
private[persistence] final case class ReplayingState[State](
@ -43,17 +41,18 @@ private[persistence] object EventsourcedReplayingEvents {
)
def apply[C, E, S](
setup: EventsourcedSetup[C, E, S],
setup: BehaviorSetup[C, E, S],
state: ReplayingState[S]
): Behavior[InternalProtocol] =
new EventsourcedReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state)
new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state)
}
@InternalApi
private[persistence] class EventsourcedReplayingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedReplayingEvents.ReplayingState
private[persistence] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C, E, S])
extends JournalInteractions[C, E, S] with StashManagement[C, E, S] {
import ReplayingEvents.ReplayingState
import InternalProtocol._
def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = {
Behaviors.setup { _
@ -175,9 +174,9 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
if (state.receivedPoisonPill && isStashEmpty)
Behaviors.stopped
else {
val running = EventsourcedRunning[C, E, S](
val running = Running[C, E, S](
setup,
EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill)
Running.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill)
)
tryUnstash(running)

View file

@ -4,15 +4,12 @@
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.annotation.InternalApi
import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.internal.EventsourcedBehavior._
/**
* INTERNAL API
@ -24,22 +21,24 @@ import akka.persistence.typed.internal.EventsourcedBehavior._
* and if it exists, we use it instead of the initial `emptyState`.
*
* Once snapshot recovery is done (or no snapshot was selected),
* recovery of events continues in [[EventsourcedReplayingEvents]].
* recovery of events continues in [[ReplayingEvents]].
*
* See next behavior [[EventsourcedReplayingEvents]].
* See previous behavior [[EventsourcedRequestingRecoveryPermit]].
* See next behavior [[ReplayingEvents]].
* See previous behavior [[RequestingRecoveryPermit]].
*/
@InternalApi
private[akka] object EventsourcedReplayingSnapshot {
private[akka] object ReplayingSnapshot {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] =
new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill)
def apply[C, E, S](setup: BehaviorSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] =
new ReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill)
}
@InternalApi
private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup[C, E, S])
extends JournalInteractions[C, E, S] with StashManagement[C, E, S] {
import InternalProtocol._
def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = {
// protect against snapshot stalling forever because of journal overloaded and such
@ -89,7 +88,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
// we know we're in snapshotting mode; snapshot recovery timeout arrived
val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}")
onRecoveryFailure(ex, None)
} else same // ignore, since we received the snapshot already
} else Behaviors.same // ignore, since we received the snapshot already
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
@ -127,9 +126,9 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
setup.cancelRecoveryTimer()
EventsourcedReplayingEvents[C, E, S](
ReplayingEvents[C, E, S](
setup,
EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill)
ReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill)
)
}

View file

@ -8,8 +8,6 @@ import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.MDC
/**
* INTERNAL API
@ -19,19 +17,19 @@ import akka.persistence.typed.internal.EventsourcedBehavior.MDC
* Requests a permit to start replaying this actor; this is tone to avoid
* hammering the journal with too many concurrently replaying actors.
*
* See next behavior [[EventsourcedReplayingSnapshot]].
* See next behavior [[ReplayingSnapshot]].
*/
@InternalApi
private[akka] object EventsourcedRequestingRecoveryPermit {
private[akka] object RequestingRecoveryPermit {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] =
new EventsourcedRequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior()
def apply[C, E, S](setup: BehaviorSetup[C, E, S]): Behavior[InternalProtocol] =
new RequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior()
}
@InternalApi
private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedStashManagement[C, E, S] with EventsourcedJournalInteractions[C, E, S] {
private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: BehaviorSetup[C, E, S])
extends StashManagement[C, E, S] with JournalInteractions[C, E, S] {
def createBehavior(): Behavior[InternalProtocol] = {
// request a permit, as only once we obtain one we can start replaying
@ -66,7 +64,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s
setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)
setup.holdingRecoveryPermit = true
EventsourcedReplayingSnapshot(setup, receivedPoisonPill)
ReplayingSnapshot(setup, receivedPoisonPill)
}
}

View file

@ -4,29 +4,27 @@
package akka.persistence.typed.internal
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.{ Failure, Success }
import akka.Done
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.Signal
import akka.actor.typed.internal.PoisonPill
import akka.annotation.InternalApi
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.journal.Tagged
import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, Stop }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.scaladsl.Effect
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.{ Failure, Success }
import akka.actor.typed.Signal
import akka.actor.typed.internal.PoisonPill
/**
* INTERNAL API
*
* Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior.
* Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of EventSourcedBehavior.
*
* In this phase recovery has completed successfully and we continue handling incoming commands,
* as well as persisting new events as dictated by the user handlers.
@ -35,13 +33,14 @@ import akka.actor.typed.internal.PoisonPill
* - HandlingCommands - where the command handler is invoked for incoming commands
* - PersistingEvents - where incoming commands are stashed until persistence completes
*
* This is implemented as such to avoid creating many EventsourcedRunning instances,
* This is implemented as such to avoid creating many EventSourced Running instances,
* which perform the Persistence extension lookup on creation and similar things (config lookup)
*
* See previous [[EventsourcedReplayingEvents]].
* See previous [[ReplayingEvents]].
* TODO rename
*/
@InternalApi
private[akka] object EventsourcedRunning {
private[akka] object Running {
final case class EventsourcedState[State](
seqNr: Long,
@ -55,23 +54,24 @@ private[akka] object EventsourcedRunning {
def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] =
if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this
def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = {
def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E): EventsourcedState[State] = {
val updated = setup.eventHandler(state, event)
copy(state = updated)
}
}
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] =
new EventsourcedRunning(setup.setMdc(MDC.RunningCmds)).handlingCommands(state)
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] =
new Running(setup.setMdc(MDC.RunningCmds)).handlingCommands(state)
}
// ===============================================
/** INTERNAL API */
@InternalApi private[akka] class EventsourcedRunning[C, E, S](
override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedRunning.EventsourcedState
@InternalApi private[akka] class Running[C, E, S](
override val setup: BehaviorSetup[C, E, S])
extends JournalInteractions[C, E, S] with StashManagement[C, E, S] {
import Running.EventsourcedState
import InternalProtocol._
private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds)
private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents)
@ -157,7 +157,7 @@ private[akka] object EventsourcedRunning {
setup.setMdc(runningCmdsMdc)
Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] {
Behaviors.receiveMessage[InternalProtocol] {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case SnapshotterResponse(r) onSnapshotterResponse(r, Behaviors.same)
case _ Behaviors.unhandled
@ -186,11 +186,11 @@ private[akka] object EventsourcedRunning {
numberOfEvents: Int,
shouldSnapshotAfterPersist: Boolean,
var sideEffects: immutable.Seq[SideEffect[S]])
extends AbstractBehavior[EventsourcedBehavior.InternalProtocol] {
extends AbstractBehavior[InternalProtocol] {
private var eventCounter = 0
override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = {
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
msg match {
case SnapshotterResponse(r) onSnapshotterResponse(r, this)
case JournalResponse(r) onJournalResponse(r)

View file

@ -4,21 +4,21 @@
package akka.persistence.typed.internal
import akka.{ actor a }
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer }
import akka.actor.{ DeadLetter, StashOverflowException }
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence._
import akka.util.ConstantFun
import akka.{ actor a }
import akka.util.OptionVal
/** INTERNAL API: Stash management for persistent behaviors */
@InternalApi
private[akka] trait EventsourcedStashManagement[C, E, S] {
private[akka] trait StashManagement[C, E, S] {
import akka.actor.typed.scaladsl.adapter._
def setup: EventsourcedSetup[C, E, S]
def setup: BehaviorSetup[C, E, S]
private def context: ActorContext[InternalProtocol] = setup.context
@ -55,3 +55,24 @@ private[akka] trait EventsourcedStashManagement[C, E, S] {
}
}
/**
* INTERNAL API
* Main reason for introduction of this trait is stash buffer reference management
* in order to survive restart of internal behavior
*/
@InternalApi private[akka] trait StashReferenceManagement {
private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None
def stashBuffer(settings: EventSourcedSettings): StashBuffer[InternalProtocol] = {
val buffer: StashBuffer[InternalProtocol] = stashBuffer match {
case OptionVal.Some(value) value
case _ StashBuffer(settings.stashCapacity)
}
this.stashBuffer = OptionVal.Some(buffer)
stashBuffer.get
}
def clearStashBuffer(): Unit = stashBuffer = OptionVal.None
}

View file

@ -15,7 +15,7 @@ import akka.util.OptionVal
/**
* FunctionalInterface for reacting on commands
*
* Used with [[CommandHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]]
* Used with [[CommandHandlerBuilder]] to setup the behavior of a [[EventSourcedBehavior]]
*/
@FunctionalInterface
trait CommandHandler[Command, Event, State] {

View file

@ -4,15 +4,14 @@
package akka.persistence.typed.javadsl
import akka.annotation.DoNotInherit
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.japi.function
import akka.persistence.typed.internal._
import akka.persistence.typed.SideEffect
import scala.collection.JavaConverters._
import akka.annotation.InternalApi
import akka.persistence.typed.ExpectingReply
import scala.collection.JavaConverters._
/**
* INTERNAL API: see `class EffectFactories`
*/
@ -20,7 +19,7 @@ import akka.persistence.typed.ExpectingReply
/**
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command.
* Created via [[PersistentBehavior.Effect]].
* Created via [[EventSourcedBehavior.Effect]].
*
* Not for user extension
*/
@ -59,7 +58,7 @@ import akka.persistence.typed.ExpectingReply
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* when the `PersistentBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
@ -70,7 +69,7 @@ import akka.persistence.typed.ExpectingReply
})
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* When [[EventSourcedBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
@ -83,7 +82,7 @@ import akka.persistence.typed.ExpectingReply
*
* Additional side effects can be performed in the callback `andThen`
*
* Instances of `Effect` are available through factories [[PersistentBehavior.Effect]].
* Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]].
*
* Not intended for user extension.
*/
@ -116,7 +115,7 @@ import akka.persistence.typed.ExpectingReply
* This has the same semantics as `cmd.replyTo().tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* when the `PersistentBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
@ -125,7 +124,7 @@ import akka.persistence.typed.ExpectingReply
CompositeEffect(this, SideEffect[State](newState cmd.replyTo ! replyWithMessage(newState)))
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* When [[EventSourcedBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
@ -134,7 +133,7 @@ import akka.persistence.typed.ExpectingReply
}
/**
* [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* [[EventSourcedBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/

View file

@ -13,7 +13,7 @@ import akka.util.OptionVal
/**
* FunctionalInterface for reacting on events having been persisted
*
* Used with [[EventHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]]
* Used with [[EventHandlerBuilder]] to setup the behavior of a [[EventSourcedBehavior]]
*/
@FunctionalInterface
trait EventHandler[State, Event] {

View file

@ -14,10 +14,11 @@ import akka.annotation.{ ApiMayChange, InternalApi }
import akka.persistence.SnapshotMetadata
import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
def this(persistenceId: PersistenceId) = {
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
@ -110,7 +111,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
* If this is overridden `shouldSnapshot` is not used.
*
* @return number of events between snapshots, should be greater than 0
* @see [[PersistentBehavior#shouldSnapshot]]
* @see [[EventSourcedBehavior#shouldSnapshot]]
*/
def snapshotEvery(): Long = 0L
@ -122,7 +123,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
* receives the State, Event and the sequenceNr used for the Event
*
* @return `true` if snapshot should be saved for the given event
* @see [[PersistentBehavior#snapshotEvery]]
* @see [[EventSourcedBehavior#snapshotEvery]]
*/
def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false
@ -153,7 +154,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
else tags.asScala.toSet
}
val behavior = scaladsl.PersistentBehavior[Command, Event, State](
val behavior = scaladsl.EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState,
(state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
@ -188,13 +189,13 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
/**
* FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet.
*
* A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten.
* A [[EventSourcedBehavior]] that is enforcing that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@ApiMayChange
abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) {
abstract class EventSourcedBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy])
extends EventSourcedBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) {
def this(persistenceId: PersistenceId) = {
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])

View file

@ -7,11 +7,11 @@ package akka.persistence.typed.scaladsl
import akka.annotation.DoNotInherit
import akka.persistence.typed.SideEffect
import akka.persistence.typed.internal._
import scala.collection.{ immutable im }
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.ReplyEffectImpl
import scala.collection.{ immutable im }
/**
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command.
*/
@ -67,7 +67,7 @@ object Effect {
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* when the `PersistentBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
@ -76,7 +76,7 @@ object Effect {
none[Event, State].thenReply[ReplyMessage](cmd)(_ replyWithMessage)
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
@ -122,7 +122,7 @@ trait Effect[+Event, State] {
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* when the `PersistentBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
@ -131,7 +131,7 @@ trait Effect[+Event, State] {
CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage))
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
@ -140,7 +140,7 @@ trait Effect[+Event, State] {
}
/**
* [[PersistentBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten.
* [[EventSourcedBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*

View file

@ -7,16 +7,14 @@ package akka.persistence.typed.scaladsl
import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior.DeferredBehavior
import akka.annotation.DoNotInherit
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.{ EventAdapter, ExpectingReply, PersistenceId }
import akka.persistence.typed.internal._
import scala.util.Try
import akka.annotation.DoNotInherit
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
object PersistentBehavior {
object EventSourcedBehavior {
/**
* Type alias for the command handler function that defines how to act on commands.
@ -43,8 +41,8 @@ object PersistentBehavior {
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
eventHandler: (State, Event) State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten.
@ -55,8 +53,8 @@ object PersistentBehavior {
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) ReplyEffect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
eventHandler: (State, Event) State): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* The `CommandHandler` defines how to act on commands. A `CommandHandler` is
@ -88,24 +86,24 @@ object PersistentBehavior {
*
* Not for user extension
*/
@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
@DoNotInherit trait EventSourcedBehavior[Command, Event, State] extends DeferredBehavior[Command] {
def persistenceId: PersistenceId
/**
* The `callback` function is called to notify that the recovery process has finished.
*/
def onRecoveryCompleted(callback: State Unit): PersistentBehavior[Command, Event, State]
def onRecoveryCompleted(callback: State Unit): EventSourcedBehavior[Command, Event, State]
/**
* The `callback` function is called to notify that recovery has failed. For setting a supervision
* strategy `onPersistFailure`
*/
def onRecoveryFailure(callback: Throwable Unit): PersistentBehavior[Command, Event, State]
def onRecoveryFailure(callback: Throwable Unit): EventSourcedBehavior[Command, Event, State]
/**
* The `callback` function is called to notify when a snapshot is complete.
*/
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State]
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): EventSourcedBehavior[Command, Event, State]
/**
* Initiates a snapshot if the given function returns true.
@ -114,23 +112,23 @@ object PersistentBehavior {
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): PersistentBehavior[Command, Event, State]
def snapshotWhen(predicate: (State, Event, Long) Boolean): EventSourcedBehavior[Command, Event, State]
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State]
def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State]
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State]
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State]
def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State]
/**
* Changes the snapshot selection criteria used by this behavior.
@ -140,18 +138,18 @@ object PersistentBehavior {
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State]
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State]
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State]
def withTagger(tagger: Event Set[String]): EventSourcedBehavior[Command, Event, State]
/**
* Transform the event in another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`.
*/
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State]
def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State]
/**
* Back off strategy for persist failures.
@ -161,6 +159,6 @@ object PersistentBehavior {
*
* If not specified the actor will be stopped on failure.
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State]
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State]
}

View file

@ -19,14 +19,15 @@ import org.scalatest.junit.JUnitSuite;
import java.time.Duration;
import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf;
import static akka.persistence.typed.scaladsl.EventSourcedBehaviorFailureSpec.conf;
class FailingPersistentActor extends PersistentBehavior<String, String, String> {
class FailingEventSourcedActor extends EventSourcedBehavior<String, String, String> {
private final ActorRef<String> probe;
private final ActorRef<Throwable> recoveryFailureProbe;
FailingPersistentActor(PersistenceId persistenceId, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
FailingEventSourcedActor(PersistenceId persistenceId, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.probe = probe;
this.recoveryFailureProbe = recoveryFailureProbe;
@ -64,7 +65,7 @@ class FailingPersistentActor extends PersistentBehavior<String, String, String>
}
}
public class PersistentActorFailureTest extends JUnitSuite {
public class EventSourcedActorFailureTest extends JUnitSuite {
public static final Config config = conf().withFallback(ConfigFactory.load());
@ -72,7 +73,7 @@ public class PersistentActorFailureTest extends JUnitSuite {
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
return new FailingPersistentActor(pid, probe, recoveryFailureProbe);
return new FailingEventSourcedActor(pid, probe, recoveryFailureProbe);
}
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe) {
return fail(pid, probe, testKit.<Throwable>createTestProbe().ref());

View file

@ -26,7 +26,7 @@ public class NullEmptyStateTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static class NullEmptyState extends PersistentBehavior<String, String, String> {
static class NullEmptyState extends EventSourcedBehavior<String, String, String> {
private final ActorRef<String> probe;

View file

@ -91,8 +91,8 @@ public class PersistentActorCompileOnlyTest {
//#behavior
public static PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState> pb =
new PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState>(new PersistenceId("p1")) {
public static EventSourcedBehavior<SimpleCommand, SimpleEvent, SimpleState> pb =
new EventSourcedBehavior<SimpleCommand, SimpleEvent, SimpleState>(new PersistenceId("p1")) {
@Override
public SimpleState emptyState() {
@ -161,8 +161,8 @@ public class PersistentActorCompileOnlyTest {
//#commonChainedEffects
private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa =
new PersistentBehavior<MyCommand, MyEvent, ExampleState>(new PersistenceId("pa")) {
private EventSourcedBehavior<MyCommand, MyEvent, ExampleState> pa =
new EventSourcedBehavior<MyCommand, MyEvent, ExampleState>(new PersistenceId("pa")) {
@Override
public ExampleState emptyState() {
@ -281,7 +281,7 @@ public class PersistentActorCompileOnlyTest {
// #actor-context
// #actor-context
class MyPersistentBehavior extends PersistentBehavior<Command, Event, RecoveryComplete.EventsInFlight> {
class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, RecoveryComplete.EventsInFlight> {
// this makes the context available to the command handler etc.
private final ActorContext<Command> ctx;

View file

@ -37,7 +37,7 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import static akka.persistence.typed.scaladsl.PersistentBehaviorSpec.*;
import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
@ -263,7 +263,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
EventAdapter<Incremented, A> transformer) {
return Behaviors.setup(ctx -> {
return new PersistentBehavior<Command, Incremented, State>(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) {
return new EventSourcedBehavior<Command, Incremented, State>(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)

View file

@ -24,7 +24,7 @@ public class PrimitiveStateTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static class PrimitiveState extends PersistentBehavior<Integer, Integer, Integer> {
static class PrimitiveState extends EventSourcedBehavior<Integer, Integer, Integer> {
private final ActorRef<String> probe;

View file

@ -11,9 +11,9 @@ import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
public class AccountExample extends PersistentBehavior<AccountExample.AccountCommand, AccountExample.AccountEvent, AccountExample.Account> {
public class AccountExample extends EventSourcedBehavior<AccountExample.AccountCommand, AccountExample.AccountEvent, AccountExample.Account> {
interface AccountCommand {}
public static class CreateAccount implements AccountCommand {}

View file

@ -6,15 +6,13 @@ package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
public class BasicPersistentBehaviorTest {
@ -25,7 +23,7 @@ public class BasicPersistentBehaviorTest {
public static class State {}
//#supervision
public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> {
public static class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, State> {
public MyPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
}
@ -65,7 +63,7 @@ public class BasicPersistentBehaviorTest {
//#tagging
}
static PersistentBehavior<Command, Event, State> persistentBehavior =
static EventSourcedBehavior<Command, Event, State> eventSourcedBehavior =
new MyPersistentBehavior(new PersistenceId("pid"));
//#structure

View file

@ -13,7 +13,7 @@ import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
public class BlogPostExample {
@ -147,7 +147,7 @@ public class BlogPostExample {
//#commands
//#behavior
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, BlogState> {
public static class BlogBehavior extends EventSourcedBehavior<BlogCommand, BlogEvent, BlogState> {
//#behavior
private final ActorContext<BlogCommand> ctx;

View file

@ -10,13 +10,13 @@ import akka.actor.typed.Behavior;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class MovieWatchList extends PersistentBehavior<MovieWatchList.Command, MovieWatchList.Event, MovieWatchList.MovieList> {
public class MovieWatchList extends EventSourcedBehavior<MovieWatchList.Command, MovieWatchList.Event, MovieWatchList.MovieList> {
interface Command {
}

View file

@ -10,7 +10,7 @@ import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import java.util.Objects;
@ -117,7 +117,7 @@ public class NullBlogState {
}
}
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, BlogState> {
public static class BlogBehavior extends EventSourcedBehavior<BlogCommand, BlogEvent, BlogState> {
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> initialCommandHandler() {
return commandHandlerBuilder(Objects::isNull)

View file

@ -10,7 +10,7 @@ import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import java.util.Optional;
@ -117,7 +117,7 @@ public class OptionalBlogState {
}
}
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, Optional<BlogState>> {
public static class BlogBehavior extends EventSourcedBehavior<BlogCommand, BlogEvent, Optional<BlogState>> {
private CommandHandlerBuilder<BlogCommand, BlogEvent, Optional<BlogState>, Optional<BlogState>> initialCommandHandler() {
return commandHandlerBuilder(state -> !state.isPresent())

View file

@ -11,7 +11,7 @@ import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import akka.persistence.typed.javadsl.EventSourcedBehavior;
import static jdocs.akka.persistence.typed.auction.AuctionCommand.*;
import static jdocs.akka.persistence.typed.auction.AuctionEvent.*;
@ -24,7 +24,7 @@ import java.util.UUID;
/**
* Based on https://github.com/lagom/online-auction-java/blob/master/bidding-impl/src/main/java/com/example/auction/bidding/impl/AuctionEntity.java
*/
public class AuctionEntity extends PersistentBehavior<AuctionCommand, AuctionEvent, AuctionState> {
public class AuctionEntity extends EventSourcedBehavior<AuctionCommand, AuctionEvent, AuctionState> {
private final UUID entityUUID;

View file

@ -6,8 +6,8 @@ package akka.persistence.typed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.testkit.TestLatch
import akka.actor.testkit.typed.scaladsl.TestProbe
@ -25,8 +25,8 @@ object ManyRecoveriesSpec {
def persistentBehavior(
name: String,
probe: TestProbe[String],
latch: Option[TestLatch]): PersistentBehavior[Cmd, Evt, String] =
PersistentBehavior[Cmd, Evt, String](
latch: Option[TestLatch]): EventSourcedBehavior[Cmd, Evt, String] =
EventSourcedBehavior[Cmd, Evt, String](
persistenceId = PersistenceId(name),
emptyState = "",
commandHandler = CommandHandler.command {

View file

@ -11,8 +11,8 @@ import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.Persistence
import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecoveryPermit, ReturnRecoveryPermit }
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.testkit.EventFilter
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
@ -44,7 +44,7 @@ object RecoveryPermitterSpec {
commandProbe: TestProbe[Any],
eventProbe: TestProbe[Any],
throwOnRecovery: Boolean = false): Behavior[Command] =
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId(name),
emptyState = EmptyState,
commandHandler = CommandHandler.command {

View file

@ -6,17 +6,16 @@ package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ Behavior, Signal }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted }
import akka.actor.testkit.typed.scaladsl.TestProbe
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike
class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit with WordSpecLike {
class StashReferenceManagementTest extends ScalaTestWithActorTestKit with WordSpecLike {
import InternalProtocol._
case class Impl() extends EventsourcedStashReferenceManagement
case class Impl() extends StashReferenceManagement
"EventsourcedStashReferenceManagement instance" should {
"initialize stash only once" in {
@ -44,7 +43,7 @@ class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit
}
}
object TestBehavior extends EventsourcedStashReferenceManagement {
object TestBehavior extends StashReferenceManagement {
def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = {
val settings = dummySettings()
@ -65,7 +64,7 @@ class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit
}
private def dummySettings(capacity: Int = 42) =
EventsourcedSettings(
EventSourcedSettings(
stashCapacity = capacity,
stashOverflowStrategyConfigurator = "akka.persistence.ThrowExceptionConfigurator",
logOnStashing = false,

View file

@ -53,7 +53,7 @@ class ChaosJournal extends InmemJournal {
}
}
object PersistentBehaviorFailureSpec {
object EventSourcedBehaviorFailureSpec {
val conf = ConfigFactory.parseString(
s"""
@ -66,23 +66,25 @@ object PersistentBehaviorFailureSpec {
""").withFallback(ConfigFactory.load("reference.conf")).resolve()
}
class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(PersistentBehaviorFailureSpec.conf) with WordSpecLike {
class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorFailureSpec.conf) with WordSpecLike {
implicit val testSettings = TestKitSettings(system)
def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): PersistentBehavior[String, String, String] = PersistentBehavior[String, String, String](
pid, "",
(_, cmd) {
probe.tell("persisting")
Effect.persist(cmd)
},
(state, event) {
probe.tell(event)
state + event
}
).onRecoveryCompleted { _
probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): EventSourcedBehavior[String, String, String] =
EventSourcedBehavior[String, String, String](
pid, "",
(_, cmd) {
probe.tell("persisting")
Effect.persist(cmd)
},
(state, event) {
probe.tell(event)
state + event
}
).onRecoveryCompleted { _
probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
"A typed persistent actor (failures)" must {

View file

@ -20,7 +20,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object PersistentBehaviorReplySpec {
object EventSourcedBehaviorReplySpec {
def conf: Config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
@ -47,8 +47,8 @@ object PersistentBehaviorReplySpec {
def counter(
ctx: ActorContext[Command[_]],
persistenceId: PersistenceId): PersistentBehavior[Command[_], Event, State] = {
PersistentBehavior.withEnforcedReplies[Command[_], Event, State](
persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = {
EventSourcedBehavior.withEnforcedReplies[Command[_], Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) cmd match {
@ -76,9 +76,9 @@ object PersistentBehaviorReplySpec {
}
}
class PersistentBehaviorReplySpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike {
class EventSourcedBehaviorReplySpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike {
import PersistentBehaviorReplySpec._
import EventSourcedBehaviorReplySpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")

View file

@ -30,7 +30,7 @@ import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
object PersistentBehaviorSpec {
object EventSourcedBehaviorSpec {
//#event-wrapper
case class Wrapper[T](t: T)
@ -106,19 +106,19 @@ object PersistentBehaviorSpec {
def counter(persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] =
Behaviors.setup(ctx counter(ctx, persistenceId, logging))
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref)
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe)
def counter(
@ -126,8 +126,8 @@ object PersistentBehaviorSpec {
persistenceId: PersistenceId,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = {
PersistentBehavior[Command, Event, State](
snapshotProbe: ActorRef[Try[Done]]): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) cmd match {
@ -225,9 +225,9 @@ object PersistentBehaviorSpec {
}
class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike {
class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike {
import PersistentBehaviorSpec._
import EventSourcedBehaviorSpec._
implicit val testSettings = TestKitSettings(system)

View file

@ -25,7 +25,7 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co
implicit val testSettings = TestKitSettings(system)
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
PersistentBehavior[String, String, String](
EventSourcedBehavior[String, String, String](
persistenceId,
emptyState = null,
commandHandler = (_, command) {

View file

@ -9,7 +9,7 @@ import java.util.UUID
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps }
import akka.event.Logging
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
@ -27,7 +27,7 @@ object OptionalSnapshotStoreSpec {
def persistentBehavior(
probe: TestProbe[State],
name: String = UUID.randomUUID().toString) =
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId(name),
emptyState = State(),
commandHandler = CommandHandler.command {

View file

@ -8,7 +8,7 @@ import java.util.UUID
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, SupervisorStrategy }
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.actor.testkit.typed.TE
import akka.actor.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
@ -62,7 +62,7 @@ object PerformanceSpec {
def behavior(name: String, probe: TestProbe[Command])(other: (Command, Parameters) Effect[String, String]) = {
Behaviors.supervise({
val parameters = Parameters()
PersistentBehavior[Command, String, String](
EventSourcedBehavior[Command, String, String](
persistenceId = PersistenceId(name),
"",
commandHandler = CommandHandler.command {

View file

@ -15,7 +15,7 @@ import akka.persistence.typed.SideEffect
object PersistentActorCompileOnlyTest {
import akka.persistence.typed.scaladsl.PersistentBehavior._
import akka.persistence.typed.scaladsl.EventSourcedBehavior._
object Simple {
//#command
@ -44,8 +44,8 @@ object PersistentActorCompileOnlyTest {
//#event-handler
//#behavior
val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] =
PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState](
val simpleBehavior: EventSourcedBehavior[SimpleCommand, SimpleEvent, ExampleState] =
EventSourcedBehavior[SimpleCommand, SimpleEvent, ExampleState](
persistenceId = PersistenceId("sample-id-1"),
emptyState = ExampleState(Nil),
commandHandler = commandHandler,
@ -65,7 +65,7 @@ object PersistentActorCompileOnlyTest {
case class ExampleState(events: List[String] = Nil)
PersistentBehavior[MyCommand, MyEvent, ExampleState](
EventSourcedBehavior[MyCommand, MyEvent, ExampleState](
persistenceId = PersistenceId("sample-id-1"),
emptyState = ExampleState(Nil),
@ -110,7 +110,7 @@ object PersistentActorCompileOnlyTest {
}
val behavior: Behavior[Command] = Behaviors.setup(ctx
PersistentBehavior[Command, Event, EventsInFlight](
EventSourcedBehavior[Command, Event, EventsInFlight](
persistenceId = PersistenceId("recovery-complete-id"),
emptyState = EventsInFlight(0, Map.empty),
@ -152,7 +152,7 @@ object PersistentActorCompileOnlyTest {
sealed trait Event
case class MoodChanged(to: Mood) extends Event
val b: Behavior[Command] = PersistentBehavior[Command, Event, Mood](
val b: Behavior[Command] = EventSourcedBehavior[Command, Event, Mood](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = Happy,
commandHandler = { (state, command)
@ -194,7 +194,7 @@ object PersistentActorCompileOnlyTest {
case class State(tasksInFlight: List[Task])
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("asdf"),
emptyState = State(Nil),
commandHandler = CommandHandler.command {
@ -222,7 +222,7 @@ object PersistentActorCompileOnlyTest {
def worker(task: Task): Behavior[Nothing] = ???
val behavior: Behavior[Command] = Behaviors.setup(ctx
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("asdf"),
emptyState = State(Nil),
commandHandler = (_, cmd) cmd match {
@ -285,7 +285,7 @@ object PersistentActorCompileOnlyTest {
.persist[Event, List[Id]](ItemAdded(id))
.thenRun(_ metadataRegistry ! GetMetaData(id, adapt))
PersistentBehavior[Command, Event, List[Id]](
EventSourcedBehavior[Command, Event, List[Id]](
persistenceId = PersistenceId("basket-1"),
emptyState = Nil,
commandHandler = { (state, cmd)
@ -377,7 +377,7 @@ object PersistentActorCompileOnlyTest {
case (state, Remembered(_)) state
}
PersistentBehavior[Command, Event, Mood](
EventSourcedBehavior[Command, Event, Mood](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = Sad,
commandHandler,
@ -405,7 +405,7 @@ object PersistentActorCompileOnlyTest {
case (state, Done) state
}
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new State,
commandHandler,
@ -417,7 +417,7 @@ object PersistentActorCompileOnlyTest {
class First extends State
class Second extends State
PersistentBehavior[String, String, State](
EventSourcedBehavior[String, String, State](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new First,
commandHandler = CommandHandler.command {
@ -442,7 +442,7 @@ object PersistentActorCompileOnlyTest {
// #actor-context
val behavior: Behavior[String] =
Behaviors.setup { ctx
PersistentBehavior[String, String, State](
EventSourcedBehavior[String, String, State](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new State,
commandHandler = CommandHandler.command {

View file

@ -25,7 +25,7 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co
implicit val testSettings = TestKitSettings(system)
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] =
PersistentBehavior[Int, Int, Int](
EventSourcedBehavior[Int, Int, Int](
persistenceId,
emptyState = 0,
commandHandler = (_, command) {

View file

@ -9,7 +9,7 @@ import akka.actor.typed.Behavior
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
/**
* Bank account example illustrating:
@ -140,7 +140,7 @@ object AccountExampleWithCommandHandlersInState {
}
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account](
EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account](
PersistenceId(s"Account|$accountNumber"),
EmptyAccount,
(state, cmd) state.applyCommand(cmd),

View file

@ -9,7 +9,7 @@ import akka.actor.typed.Behavior
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplyEffect
/**
@ -96,7 +96,7 @@ object AccountExampleWithEventHandlersInState {
//#withEnforcedReplies
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
PersistentBehavior.withEnforcedReplies(
EventSourcedBehavior.withEnforcedReplies(
PersistenceId(s"Account|$accountNumber"),
EmptyAccount,
commandHandler,

View file

@ -9,7 +9,7 @@ import akka.actor.typed.Behavior
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
/**
* Bank account example illustrating:
@ -123,7 +123,7 @@ object AccountExampleWithOptionState {
}
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]](
EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]](
PersistenceId(s"Account|$accountNumber"),
None,
(state, cmd) state match {

View file

@ -7,7 +7,7 @@ package docs.akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import scala.concurrent.duration._
import akka.persistence.typed.PersistenceId
@ -20,7 +20,7 @@ object BasicPersistentBehaviorCompileOnly {
case class State()
val behavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
@ -37,7 +37,7 @@ object BasicPersistentBehaviorCompileOnly {
//#recovery
val recoveryBehavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
@ -53,7 +53,7 @@ object BasicPersistentBehaviorCompileOnly {
//#tagging
val taggingBehavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
@ -66,7 +66,7 @@ object BasicPersistentBehaviorCompileOnly {
//#tagging
//#wrapPersistentBehavior
val samplePersistentBehavior = PersistentBehavior[Command, Event, State](
val samplePersistentBehavior = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =

View file

@ -9,7 +9,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
object BlogPostExample {
@ -56,7 +56,7 @@ object BlogPostExample {
//#behavior
def behavior(entityId: String): Behavior[BlogCommand] =
PersistentBehavior[BlogCommand, BlogEvent, BlogState](
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
persistenceId = PersistenceId(s"Blog-$entityId"),
emptyState = BlankState,
commandHandler,

View file

@ -8,8 +8,8 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
object MovieWatchList {
sealed trait Command
@ -44,7 +44,7 @@ object MovieWatchList {
}
def behavior(userId: String): Behavior[Command] = {
PersistentBehavior[Command, Event, MovieList](
EventSourcedBehavior[Command, Event, MovieList](
persistenceId = PersistenceId(s"movies-$userId"),
emptyState = MovieList(Set.empty),
commandHandler,

View file

@ -14,7 +14,7 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
object PersistentActorDeleteFailureSpec {
object EventSourcedActorDeleteFailureSpec {
case class DeleteTo(n: Long)
class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace
@ -44,11 +44,11 @@ object PersistentActorDeleteFailureSpec {
}
class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
class EventSourcedActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
"""
akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorDeleteFailureSpec$DeleteFailingInmemJournal"
akka.persistence.journal.inmem.class = "akka.persistence.EventSourcedActorDeleteFailureSpec$DeleteFailingInmemJournal"
"""))) with ImplicitSender {
import PersistentActorDeleteFailureSpec._
import EventSourcedActorDeleteFailureSpec._
system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]()))

View file

@ -6,16 +6,16 @@ package akka.persistence
import akka.actor.{ OneForOneStrategy, _ }
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.inmem.InmemJournal
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent, TestProbe }
import scala.collection.immutable
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Try }
import akka.persistence.journal.inmem.InmemJournal
import scala.concurrent.Future
object PersistentActorFailureSpec {
object EventSourcedActorFailureSpec {
import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor }
class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace
@ -140,12 +140,12 @@ object PersistentActorFailureSpec {
}
}
class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
class EventSourcedActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
"""
akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal"
akka.persistence.journal.inmem.class = "akka.persistence.EventSourcedActorFailureSpec$FailingInmemJournal"
"""))) with ImplicitSender {
import PersistentActorFailureSpec._
import EventSourcedActorFailureSpec._
import PersistentActorSpec._
system.eventStream.publish(TestEvent.Mute(