diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 2e35360a83..69591d9972 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -221,7 +221,7 @@ object Entity { } /** - * Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can + * Defines how the [[EventSourcedEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can * be used as a sharded entity actor, but the combination of sharding and persistent actors is very common * and therefore this factory is provided as convenience. * @@ -234,7 +234,7 @@ object Entity { */ def ofPersistentEntity[Command, Event, State >: Null]( typeKey: EntityTypeKey[Command], - createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = { + createPersistentEntity: JFunction[EntityContext[Command], EventSourcedEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = { of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] { override def apply(ctx: EntityContext[Command]): Behavior[Command] = { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala similarity index 79% rename from akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala rename to akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala index 429b646b25..d030fd195e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/EventSourcedEntity.scala @@ -8,21 +8,21 @@ import java.util.Optional import akka.actor.typed.BackoffSupervisorStrategy import akka.persistence.typed.PersistenceId -import akka.persistence.typed.javadsl.PersistentBehavior +import akka.persistence.typed.javadsl.EventSourcedBehavior /** * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent * actors is very common and therefore this `PersistentEntity` class is provided as convenience. * - * It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId` + * It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId` * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using * [[EntityTypeKey.persistenceIdFrom]]. */ -abstract class PersistentEntity[Command, Event, State >: Null] private ( +abstract class EventSourcedEntity[Command, Event, State >: Null] private ( val entityTypeKey: EntityTypeKey[Command], val entityId: String, persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) - extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) { + extends EventSourcedBehavior[Command, Event, State](persistenceId, supervisorStrategy) { def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = { this(entityTypeKey, entityId, diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index a25301f21d..cc13ed37e7 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -214,7 +214,7 @@ object Entity { * settings can be defined using the `with` methods of the returned [[Entity]]. * * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors - * is very common and therefore [[PersistentEntity]] is provided as a convenience for creating such + * is very common and therefore [[EventSourcedEntity]] is provided as a convenience for creating such * `PersistentBehavior`. * * @param typeKey A key that uniquely identifies the type of entity in this cluster diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala similarity index 67% rename from akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala rename to akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala index 948a2dd3f6..4a70819d2e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/PersistentEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala @@ -5,9 +5,9 @@ package akka.cluster.sharding.typed.scaladsl import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior -object PersistentEntity { +object EventSourcedEntity { /** * Create a `Behavior` for a persistent actor that is used with Cluster Sharding. @@ -15,7 +15,7 @@ object PersistentEntity { * Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent * actors is very common and therefore this `PersistentEntity` is provided as convenience. * - * It is a [[PersistentBehavior]] and is implemented in the same way. It selects the `persistenceId` + * It is a [[EventSourcedBehavior]] and is implemented in the same way. It selects the `persistenceId` * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using * [[EntityTypeKey.persistenceIdFrom]]. */ @@ -24,6 +24,6 @@ object PersistentEntity { entityId: String, emptyState: State, commandHandler: (State, Command) ⇒ Effect[Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = - PersistentBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler) + eventHandler: (State, Event) ⇒ State): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehavior(entityTypeKey.persistenceIdFrom(entityId), emptyState, commandHandler, eventHandler) } diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java index d8a958ac39..189b762d3d 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java @@ -23,8 +23,6 @@ import org.scalatest.junit.JUnitSuite; import java.util.concurrent.CompletionStage; -import static org.junit.Assert.assertEquals; - public class ClusterShardingPersistenceTest extends JUnitSuite { public static final Config config = ConfigFactory.parseString( @@ -72,7 +70,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite { - static class TestPersistentEntity extends PersistentEntity { + static class TestPersistentEntity extends EventSourcedEntity { public static final EntityTypeKey ENTITY_TYPE_KEY = EntityTypeKey.create(Command.class, "HelloWorld"); diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java similarity index 97% rename from akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java rename to akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java index 22e1b9d567..7199bfa431 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleTest.java @@ -20,7 +20,7 @@ import org.scalatest.junit.JUnitSuite; import static jdocs.akka.cluster.sharding.typed.HelloWorldPersistentEntityExample.*; import static org.junit.Assert.assertEquals; -public class HelloWorldPersistentEntityExampleTest extends JUnitSuite { +public class HelloWorldEventSourcedEntityExampleTest extends JUnitSuite { public static final Config config = ConfigFactory.parseString( "akka.actor.provider = cluster \n" + diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java index cc85e27037..d30a4677af 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java @@ -15,7 +15,7 @@ import java.util.concurrent.CompletionStage; //#persistent-entity-import import akka.cluster.sharding.typed.javadsl.EntityTypeKey; -import akka.cluster.sharding.typed.javadsl.PersistentEntity; +import akka.cluster.sharding.typed.javadsl.EventSourcedEntity; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.EventHandler; @@ -61,7 +61,7 @@ public class HelloWorldPersistentEntityExample { //#persistent-entity - public static class HelloWorld extends PersistentEntity { + public static class HelloWorld extends EventSourcedEntity { // Command interface Command { diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index a3d59c7a77..75c0ef9aa6 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -72,7 +72,7 @@ object ClusterShardingPersistenceSpec { case promise ⇒ promise.trySuccess(ctx.self.unsafeUpcast) } - PersistentEntity[Command, String, String]( + EventSourcedEntity[Command, String, String]( entityTypeKey = typeKey, entityId = entityId, emptyState = "", diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleSpec.scala similarity index 88% rename from akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala rename to akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleSpec.scala index ce3ff48403..c8e7a64cd6 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldEventSourcedEntityExampleSpec.scala @@ -12,7 +12,7 @@ import akka.cluster.typed.Join import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike -object HelloWorldPersistentEntityExampleSpec { +object HelloWorldEventSourcedEntityExampleSpec { val config = ConfigFactory.parseString( """ akka.actor.provider = cluster @@ -25,7 +25,7 @@ object HelloWorldPersistentEntityExampleSpec { """) } -class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldPersistentEntityExampleSpec.config) with WordSpecLike { +class HelloWorldEventSourcedEntityExampleSpec extends ScalaTestWithActorTestKit(HelloWorldEventSourcedEntityExampleSpec.config) with WordSpecLike { import HelloWorldPersistentEntityExample.HelloWorld import HelloWorldPersistentEntityExample.HelloWorld._ diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala index 0fc612fde3..88612cbbb9 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala @@ -41,7 +41,7 @@ object HelloWorldPersistentEntityExample { //#persistent-entity import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey - import akka.cluster.sharding.typed.scaladsl.PersistentEntity + import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity import akka.persistence.typed.scaladsl.Effect object HelloWorld { @@ -80,7 +80,7 @@ object HelloWorldPersistentEntityExample { val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("HelloWorld") - def persistentEntity(entityId: String): Behavior[Command] = PersistentEntity( + def persistentEntity(entityId: String): Behavior[Command] = EventSourcedEntity( entityTypeKey = entityTypeKey, entityId = entityId, emptyState = KnownPeople(Set.empty), diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 95239f4ead..67a3469953 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -6,7 +6,6 @@ package akka.cluster.sharding import akka.actor._ import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.cluster.ClusterEvent.CurrentClusterState import akka.remote.testconductor.RoleName import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.testkit.{ TestDuration, TestProbe } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index 75331a3f9b..b82f32e307 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -4,12 +4,10 @@ package akka.cluster.sharding -import scala.concurrent.duration._ import java.io.File import akka.actor._ import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.persistence.Persistence import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.remote.testconductor.RoleName diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 2f78adeb03..47a9347385 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -4,12 +4,10 @@ package akka.cluster.sharding -import scala.concurrent.duration._ import java.io.File import akka.actor._ import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } -import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.persistence.Persistence import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.remote.testconductor.RoleName @@ -19,8 +17,6 @@ import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils import scala.concurrent.duration._ -import akka.cluster.sharding.ShardRegion.GetClusterShardingStats -import akka.cluster.sharding.ShardRegion.ClusterShardingStats object ClusterShardingRememberEntitiesSpec { diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 0e7cecb1ae..31b9f6d5cf 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -6,7 +6,7 @@ package akka.cluster.typed import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.{ ActorRef, Behavior, Props } -import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } import akka.actor.testkit.typed.scaladsl.TestProbe import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory @@ -36,7 +36,7 @@ object ClusterSingletonPersistenceSpec { private final case object StopPlz extends Command val persistentActor: Behavior[Command] = - PersistentBehavior[Command, String, String]( + EventSourcedBehavior[Command, String, String]( persistenceId = PersistenceId("TheSingleton"), emptyState = "", commandHandler = (state, cmd) ⇒ cmd match { diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 419290e969..883148e1c7 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -33,7 +33,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in ## Example -Let's start with a simple example. The minimum required for a `PersistentBehavior` is: +Let's start with a simple example. The minimum required for a `EventSourcedBehavior` is: Scala : @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #structure } @@ -45,7 +45,7 @@ The first important thing to notice is the `Behavior` of a persistent actor is t because this is the type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system. The event and state are only used internally. -The components that make up a PersistentBehavior are: +The components that make up a EventSourcedBehavior are: * `persistenceId` is the stable unique identifier for the persistent actor. * `emptyState` defines the `State` when the entity is first created e.g. a Counter would start with 0 as state. @@ -68,7 +68,7 @@ and can be one of: * `unhandled` the command is unhandled (not supported) in current state * `stop` stop this actor -In addition to returning the primary `Effect` for the command `PersistentBehavior`s can also +In addition to returning the primary `Effect` for the command `EventSourcedBehavior`s can also chain side effects (`SideEffect`s) are to be performed after successful persist which is achieved with the `andThen` and `thenRun` function e.g @scala[`Effect.persist(..).andThen`]@java[`Effect().persist(..).andThen`]. The `thenRun` function is a convenience around creating a `SideEffect`. @@ -122,7 +122,7 @@ Scala Java : @@snip [PersistentActorCompileOnyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-handler } -These are used to create a `PersistentBehavior`: +These are used to create a `EventSourcedBehavior`: Scala : @@snip [PersistentActorCompileOnyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #behavior } @@ -137,7 +137,7 @@ where resilience is important so that if a node crashes the persistent actors ar resume operations @ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a cluster and address them by id. -The `PersistentBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md), +The `EventSourcedBehavior` can then be run as with any plain typed actor as described in [actors documentation](actors-typed.md), but since Akka Persistence is based on the single-writer principle the persistent actors are typically used together with Cluster Sharding. For a particular `persistenceId` only one persistent actor instance should be active at one time. If multiple instances were to persist events at the same time, the events would be interleaved and might not be @@ -233,7 +233,7 @@ Scala Java : @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #event-handler } -And finally the behavior is created @scala[from the `PersistentBehavior.apply`]: +And finally the behavior is created @scala[from the `EventSourcedBehavior.apply`]: Scala : @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior } @@ -299,13 +299,13 @@ Java Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that -it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`. -If it's defined with @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`] +it can be used to enforce that replies are not forgotten when implementing the `EventSourcedBehavior`. +If it's defined with @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`] there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`], @scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`]. -These effects will send the reply message even when @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`] +These effects will send the reply message even when @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`] is not used, but then there will be no compilation errors if the reply decision is left out. Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific @@ -363,13 +363,13 @@ Java ## Event adapters -Event adapters can be programmatically added to your `PersistentBehavior`s that can convert from your `Event` type +Event adapters can be programmatically added to your `EventSourcedBehavior`s that can convert from your `Event` type to another type that is then passed to the journal. Defining an event adapter is done by extending an EventAdapter: Scala -: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #event-wrapper } +: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper } Java : @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper } @@ -377,14 +377,14 @@ Java Then install it on a persistent behavior: Scala -: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #install-event-adapter } +: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter } Java : @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter } ## Wrapping Persistent Behaviors -When creating a `PersistentBehavior`, it is possible to wrap `PersistentBehavior` in +When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance to access the actor logging upon taking snapshots for debug purpose. @@ -397,7 +397,7 @@ Java ## Journal failures -By default a `PersistentBehavior` will stop if an exception is thrown from the journal. It is possible to override this with +By default a `EventSourcedBehavior` will stop if an exception is thrown from the journal. It is possible to override this with any `BackoffSupervisorStrategy`. It is not possible to use the normal supervision wrapping for this as it isn't valid to `resume` a behavior on a journal failure as it is not known if the event was persisted. @@ -412,5 +412,5 @@ Java Journals can reject events. The difference from a failure is that the journal must decide to reject an event before trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won't be in the journal. -This is signalled to a `PersistentBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md). +This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md). diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala similarity index 77% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 9980a5a93a..f2864a2f5d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -5,36 +5,32 @@ package akka.persistence.typed.internal import scala.concurrent.ExecutionContext +import scala.util.Try import akka.Done -import akka.actor.typed.Logger +import akka.actor.Cancellable import akka.actor.{ ActorRef, ExtendedActorSystem } +import akka.actor.typed.Logger import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter -import akka.persistence.typed.internal.EventsourcedBehavior.MDC -import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.PersistenceId import akka.util.Collections.EmptyImmutableSeq import akka.util.OptionVal -import scala.util.Try - -import akka.actor.Cancellable -import akka.persistence.typed.PersistenceId -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent /** - * INTERNAL API: Carry state for the Persistent behavior implementation behaviors + * INTERNAL API: Carry state for the Persistent behavior implementation behaviors. */ @InternalApi -private[persistence] final class EventsourcedSetup[C, E, S]( +private[persistence] final class BehaviorSetup[C, E, S]( val context: ActorContext[InternalProtocol], val persistenceId: PersistenceId, val emptyState: S, - val commandHandler: PersistentBehavior.CommandHandler[C, E, S], - val eventHandler: PersistentBehavior.EventHandler[S, E], - val writerIdentity: WriterIdentity, + val commandHandler: EventSourcedBehavior.CommandHandler[C, E, S], + val eventHandler: EventSourcedBehavior.EventHandler[S, E], + val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity, val recoveryCompleted: S ⇒ Unit, val onRecoveryFailure: Throwable ⇒ Unit, val onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit, @@ -43,10 +39,11 @@ private[persistence] final class EventsourcedSetup[C, E, S]( val snapshotWhen: (S, E, Long) ⇒ Boolean, val recovery: Recovery, var holdingRecoveryPermit: Boolean, - val settings: EventsourcedSettings, + val settings: EventSourcedSettings, val internalStash: StashBuffer[InternalProtocol] ) { import akka.actor.typed.scaladsl.adapter._ + import InternalProtocol.RecoveryTickEvent val persistence: Persistence = Persistence(context.system.toUntyped) @@ -74,14 +71,14 @@ private[persistence] final class EventsourcedSetup[C, E, S]( } } - def setMdc(newMdc: Map[String, Any]): EventsourcedSetup[C, E, S] = { + def setMdc(newMdc: Map[String, Any]): BehaviorSetup[C, E, S] = { mdc = newMdc // mdc is changed often, for each persisted event, but logging is rare, so lazy init of Logger _log = OptionVal.None this } - def setMdc(phaseName: String): EventsourcedSetup[C, E, S] = { + def setMdc(phaseName: String): BehaviorSetup[C, E, S] = { setMdc(MDC.create(persistenceId, phaseName)) this } @@ -111,3 +108,19 @@ private[persistence] final class EventsourcedSetup[C, E, S]( } +object MDC { + // format: OFF + val AwaitingPermit = "get-permit" + val ReplayingSnapshot = "replay-snap" + val ReplayingEvents = "replay-evts" + val RunningCmds = "running-cmnds" + val PersistingEvents = "persist-evts" + // format: ON + + def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = { + Map( + "persistenceId" → persistenceId.id, + "phase" → phaseName + ) + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala similarity index 68% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index e70ce0d6d0..36ac92de20 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -4,6 +4,9 @@ package akka.persistence.typed.internal +import java.util.UUID +import java.util.concurrent.atomic.AtomicInteger + import akka.Done import akka.actor.typed import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, BehaviorInterceptor, PostStop, Signal, SupervisorStrategy } @@ -11,15 +14,14 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter } -import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl._ +import akka.persistence.typed.PersistenceId import akka.util.ConstantFun + import scala.util.{ Failure, Success, Try } -import akka.persistence.typed.PersistenceId - @InternalApi -private[akka] object PersistentBehaviorImpl { +private[akka] object EventSourcedBehaviorImpl { def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = { result match { @@ -29,40 +31,56 @@ private[akka] object PersistentBehaviorImpl { ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta) } } + + object WriterIdentity { + + // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) + private[akka] val instanceIdCounter = new AtomicInteger(1) + + def newIdentity(): WriterIdentity = { + val instanceId: Int = WriterIdentity.instanceIdCounter.getAndIncrement() + val writerUuid: String = UUID.randomUUID.toString + WriterIdentity(instanceId, writerUuid) + } + } + final case class WriterIdentity(instanceId: Int, writerUuid: String) + } @InternalApi -private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( +private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( persistenceId: PersistenceId, emptyState: State, - commandHandler: PersistentBehavior.CommandHandler[Command, Event, State], - eventHandler: PersistentBehavior.EventHandler[State, Event], - journalPluginId: Option[String] = None, - snapshotPluginId: Option[String] = None, - recoveryCompleted: State ⇒ Unit = ConstantFun.scalaAnyToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery(), - supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, - onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - onRecoveryFailure: Throwable ⇒ Unit = ConstantFun.scalaAnyToUnit -) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { + commandHandler: EventSourcedBehavior.CommandHandler[Command, Event, State], + eventHandler: EventSourcedBehavior.EventHandler[State, Event], + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + recoveryCompleted: State ⇒ Unit = ConstantFun.scalaAnyToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery(), + supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, + onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + onRecoveryFailure: Throwable ⇒ Unit = ConstantFun.scalaAnyToUnit +) extends EventSourcedBehavior[Command, Event, State] with StashReferenceManagement { + + import EventSourcedBehaviorImpl.WriterIdentity override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { Behaviors.supervise { Behaviors.setup[Command] { ctx ⇒ - val settings = EventsourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse("")) + val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse("")) val internalStash = stashBuffer(settings) // the default impl needs context which isn't available until here, so we // use the anyTwoToUnit as a marker to use the default val actualOnSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = - if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) PersistentBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _) + if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) EventSourcedBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _) else onSnapshot - val eventsourcedSetup = new EventsourcedSetup( + val eventsourcedSetup = new BehaviorSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, emptyState, @@ -97,7 +115,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( target(ctx, signal) } } - val widened = EventsourcedRequestingRecoveryPermit(eventsourcedSetup).widen[Any] { + val widened = RequestingRecoveryPermit(eventsourcedSetup).widen[Any] { case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted @@ -113,7 +131,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * The `callback` function is called to notify the actor that the recovery process * is finished. */ - def onRecoveryCompleted(callback: State ⇒ Unit): PersistentBehavior[Command, Event, State] = + def onRecoveryCompleted(callback: State ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(recoveryCompleted = callback) /** @@ -123,7 +141,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * * `predicate` receives the State, Event and the sequenceNr used for the Event */ - def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] = + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State] = copy(snapshotWhen = predicate) /** @@ -131,7 +149,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * * `numberOfEvents` should be greater than 0 */ - def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] = { + def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = { require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) } @@ -139,7 +157,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( /** * Change the journal plugin id that this actor should use. */ - def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] = { + def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") copy(journalPluginId = if (id != "") Some(id) else None) } @@ -147,7 +165,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( /** * Change the snapshot store plugin id that this actor should use. */ - def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] = { + def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") copy(snapshotPluginId = if (id != "") Some(id) else None) } @@ -160,27 +178,27 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be * performed by replaying all events -- which may take a long time. */ - def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] = { + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = { copy(recovery = Recovery(selection)) } /** * The `tagger` function should give event tags, which will be used in persistence query */ - def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = + def withTagger(tagger: Event ⇒ Set[String]): EventSourcedBehavior[Command, Event, State] = copy(tagger = tagger) /** * Adapt the event before sending to the journal e.g. wrapping the event in a type * the journal understands */ - def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] = + def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) /** * The `callback` function is called to notify the actor that a snapshot has finished */ - def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] = + def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(onSnapshot = callback) /** @@ -191,13 +209,23 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * * If not specified the actor will be stopped on failure. */ - def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] = + def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = copy(supervisionStrategy = backoffStrategy) /** * The `callback` function is called to notify that recovery has failed. For setting a supervision * strategy `onPersistFailure` */ - def onRecoveryFailure(callback: Throwable ⇒ Unit): PersistentBehavior[Command, Event, State] = + def onRecoveryFailure(callback: Throwable ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(onRecoveryFailure = callback) } + +/** Protocol used internally by the eventsourced behaviors. */ +private[akka] sealed trait InternalProtocol +private[akka] object InternalProtocol { + case object RecoveryPermitGranted extends InternalProtocol + final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol + final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol + final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol + final case class IncomingCommand[C](c: C) extends InternalProtocol +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala similarity index 91% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala index 61575b3816..dc24d170f4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSettings.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedSettings.scala @@ -16,12 +16,12 @@ import scala.concurrent.duration._ /** * INTERNAL API */ -@InternalApi private[akka] object EventsourcedSettings { +@InternalApi private[akka] object EventSourcedSettings { - def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = + def apply(system: ActorSystem[_], journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = apply(system.settings.config, journalPluginId, snapshotPluginId) - def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventsourcedSettings = { + def apply(config: Config, journalPluginId: String, snapshotPluginId: String): EventSourcedSettings = { val typedConfig = config.getConfig("akka.persistence.typed") // StashOverflowStrategy @@ -36,7 +36,7 @@ import scala.concurrent.duration._ val recoveryEventTimeout: FiniteDuration = journalConfig.getDuration("recovery-event-timeout", TimeUnit.MILLISECONDS).millis - EventsourcedSettings( + EventSourcedSettings( stashCapacity = stashCapacity, stashOverflowStrategyConfigurator, logOnStashing = logOnStashing, @@ -59,7 +59,7 @@ import scala.concurrent.duration._ } @InternalApi -private[akka] final case class EventsourcedSettings( +private[akka] final case class EventSourcedSettings( stashCapacity: Int, stashOverflowStrategyConfigurator: String, logOnStashing: Boolean, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala deleted file mode 100644 index 93f1647158..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (C) 2016-2018 Lightbend Inc. - */ - -package akka.persistence.typed.internal - -import java.util.UUID -import java.util.concurrent.atomic.AtomicInteger - -import akka.annotation.InternalApi -import akka.persistence.typed.PersistenceId - -/** INTERNAL API */ -@InternalApi -private[akka] object EventsourcedBehavior { - - // ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip) - private[akka] val instanceIdCounter = new AtomicInteger(1) - - object WriterIdentity { - def newIdentity(): WriterIdentity = { - val instanceId: Int = EventsourcedBehavior.instanceIdCounter.getAndIncrement() - val writerUuid: String = UUID.randomUUID.toString - WriterIdentity(instanceId, writerUuid) - } - } - final case class WriterIdentity(instanceId: Int, writerUuid: String) - - object MDC { - // format: OFF - val AwaitingPermit = "get-permit" - val ReplayingSnapshot = "replay-snap" - val ReplayingEvents = "replay-evts" - val RunningCmds = "running-cmnds" - val PersistingEvents = "persist-evts" - // format: ON - - def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = { - Map( - "persistenceId" → persistenceId.id, - "phase" → phaseName - ) - } - } - - /** Protocol used internally by the eventsourced behaviors, never exposed to user-land */ - sealed trait InternalProtocol - object InternalProtocol { - case object RecoveryPermitGranted extends InternalProtocol - final case class JournalResponse(msg: akka.persistence.JournalProtocol.Response) extends InternalProtocol - final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol - final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol - final case class IncomingCommand[C](c: C) extends InternalProtocol - } -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala deleted file mode 100644 index bbca1249aa..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - */ - -package akka.persistence.typed.internal - -import akka.actor.typed.scaladsl.StashBuffer -import akka.annotation.InternalApi -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol -import akka.util.OptionVal - -/** - * INTERNAL API - * Main reason for introduction of this trait is stash buffer reference management - * in order to survive restart of internal behavior - */ -@InternalApi private[akka] trait EventsourcedStashReferenceManagement { - - private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None - - def stashBuffer(settings: EventsourcedSettings): StashBuffer[InternalProtocol] = { - val buffer: StashBuffer[InternalProtocol] = stashBuffer match { - case OptionVal.Some(value) ⇒ value - case _ ⇒ StashBuffer(settings.stashCapacity) - } - this.stashBuffer = OptionVal.Some(buffer) - stashBuffer.get - } - - def clearStashBuffer(): Unit = stashBuffer = OptionVal.None -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala similarity index 88% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala index fbe96890a3..7d316d7640 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalInteractions.scala @@ -11,23 +11,22 @@ import akka.annotation.InternalApi import akka.persistence.JournalProtocol.ReplayMessages import akka.persistence.SnapshotProtocol.LoadSnapshot import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import scala.collection.immutable /** INTERNAL API */ @InternalApi -private[akka] trait EventsourcedJournalInteractions[C, E, S] { +private[akka] trait JournalInteractions[C, E, S] { - def setup: EventsourcedSetup[C, E, S] + def setup: BehaviorSetup[C, E, S] type EventOrTagged = Any // `Any` since can be `E` or `Tagged` // ---------- journal interactions --------- protected def internalPersist( - state: EventsourcedRunning.EventsourcedState[S], - event: EventOrTagged): EventsourcedRunning.EventsourcedState[S] = { + state: Running.EventsourcedState[S], + event: EventOrTagged): Running.EventsourcedState[S] = { val newState = state.nextSequenceNr() @@ -48,7 +47,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { protected def internalPersistAll( events: immutable.Seq[EventOrTagged], - state: EventsourcedRunning.EventsourcedState[S]): EventsourcedRunning.EventsourcedState[S] = { + state: Running.EventsourcedState[S]): Running.EventsourcedState[S] = { if (events.nonEmpty) { var newState = state @@ -107,7 +106,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfUntyped) } - protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = { + protected def internalSaveSnapshot(state: Running.EventsourcedState[S]): Unit = { // don't store null state if (state.state != null) setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala similarity index 86% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 725cb90b8c..709b62a98d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -10,12 +10,10 @@ import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ -import akka.persistence.typed.internal.EventsourcedBehavior._ -import scala.util.control.NonFatal - import akka.actor.typed.internal.PoisonPill +import scala.util.control.NonFatal + /*** * INTERNAL API * @@ -24,14 +22,14 @@ import akka.actor.typed.internal.PoisonPill * In this behavior we finally start replaying events, beginning from the last applied sequence number * (i.e. the one up-until-which the snapshot recovery has brought us). * - * Once recovery is completed, the actor becomes [[EventsourcedRunning]], stashed messages are flushed + * Once recovery is completed, the actor becomes [[Running]], stashed messages are flushed * and control is given to the user's handlers to drive the actors behavior from there. * - * See next behavior [[EventsourcedRunning]]. - * See previous behavior [[EventsourcedReplayingSnapshot]]. + * See next behavior [[Running]]. + * See previous behavior [[ReplayingSnapshot]]. */ @InternalApi -private[persistence] object EventsourcedReplayingEvents { +private[persistence] object ReplayingEvents { @InternalApi private[persistence] final case class ReplayingState[State]( @@ -43,17 +41,18 @@ private[persistence] object EventsourcedReplayingEvents { ) def apply[C, E, S]( - setup: EventsourcedSetup[C, E, S], + setup: BehaviorSetup[C, E, S], state: ReplayingState[S] ): Behavior[InternalProtocol] = - new EventsourcedReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state) + new ReplayingEvents(setup.setMdc(MDC.ReplayingEvents)).createBehavior(state) } @InternalApi -private[persistence] class EventsourcedReplayingEvents[C, E, S](override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { - import EventsourcedReplayingEvents.ReplayingState +private[persistence] class ReplayingEvents[C, E, S](override val setup: BehaviorSetup[C, E, S]) + extends JournalInteractions[C, E, S] with StashManagement[C, E, S] { + import ReplayingEvents.ReplayingState + import InternalProtocol._ def createBehavior(state: ReplayingState[S]): Behavior[InternalProtocol] = { Behaviors.setup { _ ⇒ @@ -175,9 +174,9 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set if (state.receivedPoisonPill && isStashEmpty) Behaviors.stopped else { - val running = EventsourcedRunning[C, E, S]( + val running = Running[C, E, S]( setup, - EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill) + Running.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill) ) tryUnstash(running) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala similarity index 79% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index a53e98253d..cb3051375b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -4,15 +4,12 @@ package akka.persistence.typed.internal -import akka.actor.typed.scaladsl.Behaviors.same import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill import akka.annotation.InternalApi import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult } import akka.persistence._ -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ -import akka.persistence.typed.internal.EventsourcedBehavior._ /** * INTERNAL API @@ -24,22 +21,24 @@ import akka.persistence.typed.internal.EventsourcedBehavior._ * and if it exists, we use it instead of the initial `emptyState`. * * Once snapshot recovery is done (or no snapshot was selected), - * recovery of events continues in [[EventsourcedReplayingEvents]]. + * recovery of events continues in [[ReplayingEvents]]. * - * See next behavior [[EventsourcedReplayingEvents]]. - * See previous behavior [[EventsourcedRequestingRecoveryPermit]]. + * See next behavior [[ReplayingEvents]]. + * See previous behavior [[RequestingRecoveryPermit]]. */ @InternalApi -private[akka] object EventsourcedReplayingSnapshot { +private[akka] object ReplayingSnapshot { - def apply[C, E, S](setup: EventsourcedSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = - new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill) + def apply[C, E, S](setup: BehaviorSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = + new ReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill) } @InternalApi -private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { +private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup[C, E, S]) + extends JournalInteractions[C, E, S] with StashManagement[C, E, S] { + + import InternalProtocol._ def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = { // protect against snapshot stalling forever because of journal overloaded and such @@ -89,7 +88,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E // we know we're in snapshotting mode; snapshot recovery timeout arrived val ex = new RecoveryTimedOut(s"Recovery timed out, didn't get snapshot within ${setup.settings.recoveryEventTimeout}") onRecoveryFailure(ex, None) - } else same // ignore, since we received the snapshot already + } else Behaviors.same // ignore, since we received the snapshot already def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands @@ -127,9 +126,9 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { setup.cancelRecoveryTimer() - EventsourcedReplayingEvents[C, E, S]( + ReplayingEvents[C, E, S]( setup, - EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill) + ReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill) ) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala similarity index 70% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala index a9743292fa..09e9fd56cf 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala @@ -8,8 +8,6 @@ import akka.actor.typed.Behavior import akka.actor.typed.internal.PoisonPill import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol -import akka.persistence.typed.internal.EventsourcedBehavior.MDC /** * INTERNAL API @@ -19,19 +17,19 @@ import akka.persistence.typed.internal.EventsourcedBehavior.MDC * Requests a permit to start replaying this actor; this is tone to avoid * hammering the journal with too many concurrently replaying actors. * - * See next behavior [[EventsourcedReplayingSnapshot]]. + * See next behavior [[ReplayingSnapshot]]. */ @InternalApi -private[akka] object EventsourcedRequestingRecoveryPermit { +private[akka] object RequestingRecoveryPermit { - def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] = - new EventsourcedRequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior() + def apply[C, E, S](setup: BehaviorSetup[C, E, S]): Behavior[InternalProtocol] = + new RequestingRecoveryPermit(setup.setMdc(MDC.AwaitingPermit)).createBehavior() } @InternalApi -private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedStashManagement[C, E, S] with EventsourcedJournalInteractions[C, E, S] { +private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: BehaviorSetup[C, E, S]) + extends StashManagement[C, E, S] with JournalInteractions[C, E, S] { def createBehavior(): Behavior[InternalProtocol] = { // request a permit, as only once we obtain one we can start replaying @@ -66,7 +64,7 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery) setup.holdingRecoveryPermit = true - EventsourcedReplayingSnapshot(setup, receivedPoisonPill) + ReplayingSnapshot(setup, receivedPoisonPill) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala similarity index 90% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 64019f7f67..85ba842455 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -4,29 +4,27 @@ package akka.persistence.typed.internal +import scala.annotation.tailrec +import scala.collection.immutable +import scala.util.{ Failure, Success } + import akka.Done import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.AbstractBehavior +import akka.actor.typed.Signal +import akka.actor.typed.internal.PoisonPill import akka.annotation.InternalApi import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, Stop } -import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.scaladsl.Effect -import scala.annotation.tailrec -import scala.collection.immutable -import scala.util.{ Failure, Success } - -import akka.actor.typed.Signal -import akka.actor.typed.internal.PoisonPill /** * INTERNAL API * - * Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of PersistentBehavior. + * Conceptually fourth (of four) -- also known as 'final' or 'ultimate' -- form of EventSourcedBehavior. * * In this phase recovery has completed successfully and we continue handling incoming commands, * as well as persisting new events as dictated by the user handlers. @@ -35,13 +33,14 @@ import akka.actor.typed.internal.PoisonPill * - HandlingCommands - where the command handler is invoked for incoming commands * - PersistingEvents - where incoming commands are stashed until persistence completes * - * This is implemented as such to avoid creating many EventsourcedRunning instances, + * This is implemented as such to avoid creating many EventSourced Running instances, * which perform the Persistence extension lookup on creation and similar things (config lookup) * - * See previous [[EventsourcedReplayingEvents]]. + * See previous [[ReplayingEvents]]. + * TODO rename */ @InternalApi -private[akka] object EventsourcedRunning { +private[akka] object Running { final case class EventsourcedState[State]( seqNr: Long, @@ -55,23 +54,24 @@ private[akka] object EventsourcedRunning { def updateLastSequenceNr(persistent: PersistentRepr): EventsourcedState[State] = if (persistent.sequenceNr > seqNr) copy(seqNr = persistent.sequenceNr) else this - def applyEvent[C, E](setup: EventsourcedSetup[C, E, State], event: E): EventsourcedState[State] = { + def applyEvent[C, E](setup: BehaviorSetup[C, E, State], event: E): EventsourcedState[State] = { val updated = setup.eventHandler(state, event) copy(state = updated) } } - def apply[C, E, S](setup: EventsourcedSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = - new EventsourcedRunning(setup.setMdc(MDC.RunningCmds)).handlingCommands(state) + def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: EventsourcedState[S]): Behavior[InternalProtocol] = + new Running(setup.setMdc(MDC.RunningCmds)).handlingCommands(state) } // =============================================== /** INTERNAL API */ -@InternalApi private[akka] class EventsourcedRunning[C, E, S]( - override val setup: EventsourcedSetup[C, E, S]) - extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { - import EventsourcedRunning.EventsourcedState +@InternalApi private[akka] class Running[C, E, S]( + override val setup: BehaviorSetup[C, E, S]) + extends JournalInteractions[C, E, S] with StashManagement[C, E, S] { + import Running.EventsourcedState + import InternalProtocol._ private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds) private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) @@ -157,7 +157,7 @@ private[akka] object EventsourcedRunning { setup.setMdc(runningCmdsMdc) - Behaviors.receiveMessage[EventsourcedBehavior.InternalProtocol] { + Behaviors.receiveMessage[InternalProtocol] { case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r, Behaviors.same) case _ ⇒ Behaviors.unhandled @@ -186,11 +186,11 @@ private[akka] object EventsourcedRunning { numberOfEvents: Int, shouldSnapshotAfterPersist: Boolean, var sideEffects: immutable.Seq[SideEffect[S]]) - extends AbstractBehavior[EventsourcedBehavior.InternalProtocol] { + extends AbstractBehavior[InternalProtocol] { private var eventCounter = 0 - override def onMessage(msg: EventsourcedBehavior.InternalProtocol): Behavior[EventsourcedBehavior.InternalProtocol] = { + override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r, this) case JournalResponse(r) ⇒ onJournalResponse(r) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala similarity index 69% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala index c6d80a9d5d..4380f80d9c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala @@ -4,21 +4,21 @@ package akka.persistence.typed.internal +import akka.{ actor ⇒ a } import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer } import akka.actor.{ DeadLetter, StashOverflowException } import akka.annotation.InternalApi -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol import akka.persistence._ import akka.util.ConstantFun -import akka.{ actor ⇒ a } +import akka.util.OptionVal /** INTERNAL API: Stash management for persistent behaviors */ @InternalApi -private[akka] trait EventsourcedStashManagement[C, E, S] { +private[akka] trait StashManagement[C, E, S] { import akka.actor.typed.scaladsl.adapter._ - def setup: EventsourcedSetup[C, E, S] + def setup: BehaviorSetup[C, E, S] private def context: ActorContext[InternalProtocol] = setup.context @@ -55,3 +55,24 @@ private[akka] trait EventsourcedStashManagement[C, E, S] { } } + +/** + * INTERNAL API + * Main reason for introduction of this trait is stash buffer reference management + * in order to survive restart of internal behavior + */ +@InternalApi private[akka] trait StashReferenceManagement { + + private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None + + def stashBuffer(settings: EventSourcedSettings): StashBuffer[InternalProtocol] = { + val buffer: StashBuffer[InternalProtocol] = stashBuffer match { + case OptionVal.Some(value) ⇒ value + case _ ⇒ StashBuffer(settings.stashCapacity) + } + this.stashBuffer = OptionVal.Some(buffer) + stashBuffer.get + } + + def clearStashBuffer(): Unit = stashBuffer = OptionVal.None +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala index 08ceae77e8..e2f4e3da12 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala @@ -15,7 +15,7 @@ import akka.util.OptionVal /** * FunctionalInterface for reacting on commands * - * Used with [[CommandHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]] + * Used with [[CommandHandlerBuilder]] to setup the behavior of a [[EventSourcedBehavior]] */ @FunctionalInterface trait CommandHandler[Command, Event, State] { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index 250a68b46d..c917454d37 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -4,15 +4,14 @@ package akka.persistence.typed.javadsl -import akka.annotation.DoNotInherit +import akka.annotation.{ DoNotInherit, InternalApi } import akka.japi.function import akka.persistence.typed.internal._ import akka.persistence.typed.SideEffect -import scala.collection.JavaConverters._ - -import akka.annotation.InternalApi import akka.persistence.typed.ExpectingReply +import scala.collection.JavaConverters._ + /** * INTERNAL API: see `class EffectFactories` */ @@ -20,7 +19,7 @@ import akka.persistence.typed.ExpectingReply /** * Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. - * Created via [[PersistentBehavior.Effect]]. + * Created via [[EventSourcedBehavior.Effect]]. * * Not for user extension */ @@ -59,7 +58,7 @@ import akka.persistence.typed.ExpectingReply * This has the same semantics as `cmd.replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten - * when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When + * when the `PersistentBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. @@ -70,7 +69,7 @@ import akka.persistence.typed.ExpectingReply }) /** - * When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect + * When [[EventSourcedBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be * sent for a specific command or the reply will be sent later. */ @@ -83,7 +82,7 @@ import akka.persistence.typed.ExpectingReply * * Additional side effects can be performed in the callback `andThen` * - * Instances of `Effect` are available through factories [[PersistentBehavior.Effect]]. + * Instances of `Effect` are available through factories [[EventSourcedBehavior.Effect]]. * * Not intended for user extension. */ @@ -116,7 +115,7 @@ import akka.persistence.typed.ExpectingReply * This has the same semantics as `cmd.replyTo().tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten - * when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When + * when the `PersistentBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. @@ -125,7 +124,7 @@ import akka.persistence.typed.ExpectingReply CompositeEffect(this, SideEffect[State](newState ⇒ cmd.replyTo ! replyWithMessage(newState))) /** - * When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect + * When [[EventSourcedBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect * isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be * sent for a specific command or the reply will be sent later. */ @@ -134,7 +133,7 @@ import akka.persistence.typed.ExpectingReply } /** - * [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten. + * [[EventSourcedBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten. * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala index fa4343c962..c6dc1a85ab 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala @@ -13,7 +13,7 @@ import akka.util.OptionVal /** * FunctionalInterface for reacting on events having been persisted * - * Used with [[EventHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]] + * Used with [[EventHandlerBuilder]] to setup the behavior of a [[EventSourcedBehavior]] */ @FunctionalInterface trait EventHandler[State, Event] { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala similarity index 90% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index 0527f65f70..c25003fa5e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -14,10 +14,11 @@ import akka.annotation.{ ApiMayChange, InternalApi } import akka.persistence.SnapshotMetadata import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.internal._ + import scala.util.{ Failure, Success } @ApiMayChange -abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { +abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { def this(persistenceId: PersistenceId) = { this(persistenceId, Optional.empty[BackoffSupervisorStrategy]) @@ -110,7 +111,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( * If this is overridden `shouldSnapshot` is not used. * * @return number of events between snapshots, should be greater than 0 - * @see [[PersistentBehavior#shouldSnapshot]] + * @see [[EventSourcedBehavior#shouldSnapshot]] */ def snapshotEvery(): Long = 0L @@ -122,7 +123,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( * receives the State, Event and the sequenceNr used for the Event * * @return `true` if snapshot should be saved for the given event - * @see [[PersistentBehavior#snapshotEvery]] + * @see [[EventSourcedBehavior#snapshotEvery]] */ def shouldSnapshot(state: State, event: Event, sequenceNr: Long): Boolean = false @@ -153,7 +154,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( else tags.asScala.toSet } - val behavior = scaladsl.PersistentBehavior[Command, Event, State]( + val behavior = scaladsl.EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState, (state, cmd) ⇒ commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], @@ -188,13 +189,13 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( /** * FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet. * - * A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten. + * A [[EventSourcedBehavior]] that is enforcing that replies to commands are not forgotten. * There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]]. */ @ApiMayChange -abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy]) - extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) { +abstract class EventSourcedBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy]) + extends EventSourcedBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) { def this(persistenceId: PersistenceId) = { this(persistenceId, Optional.empty[BackoffSupervisorStrategy]) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala index 3fa20889df..29228b01d1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -7,11 +7,11 @@ package akka.persistence.typed.scaladsl import akka.annotation.DoNotInherit import akka.persistence.typed.SideEffect import akka.persistence.typed.internal._ -import scala.collection.{ immutable ⇒ im } - import akka.persistence.typed.ExpectingReply import akka.persistence.typed.ReplyEffectImpl +import scala.collection.{ immutable ⇒ im } + /** * Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command. */ @@ -67,7 +67,7 @@ object Effect { * This has the same semantics as `cmd.replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten - * when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When + * when the `PersistentBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. @@ -76,7 +76,7 @@ object Effect { none[Event, State].thenReply[ReplyMessage](cmd)(_ ⇒ replyWithMessage) /** - * When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect + * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect * isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be * sent for a specific command or the reply will be sent later. */ @@ -122,7 +122,7 @@ trait Effect[+Event, State] { * This has the same semantics as `cmd.replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten - * when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When + * when the `PersistentBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When * `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]]. * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. @@ -131,7 +131,7 @@ trait Effect[+Event, State] { CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage)) /** - * When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect + * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect * isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be * sent for a specific command or the reply will be sent later. */ @@ -140,7 +140,7 @@ trait Effect[+Event, State] { } /** - * [[PersistentBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten. + * [[EventSourcedBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten. * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]]. * diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala similarity index 77% rename from akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala rename to akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 0740dd3a3f..8139415a81 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -7,16 +7,14 @@ package akka.persistence.typed.scaladsl import akka.Done import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior.DeferredBehavior +import akka.annotation.DoNotInherit import akka.persistence._ -import akka.persistence.typed.EventAdapter +import akka.persistence.typed.{ EventAdapter, ExpectingReply, PersistenceId } import akka.persistence.typed.internal._ + import scala.util.Try -import akka.annotation.DoNotInherit -import akka.persistence.typed.ExpectingReply -import akka.persistence.typed.PersistenceId - -object PersistentBehavior { +object EventSourcedBehavior { /** * Type alias for the command handler function that defines how to act on commands. @@ -43,8 +41,8 @@ object PersistentBehavior { persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) ⇒ Effect[Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = - PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) + eventHandler: (State, Event) ⇒ State): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) /** * Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten. @@ -55,8 +53,8 @@ object PersistentBehavior { persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) ⇒ ReplyEffect[Event, State], - eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = - PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) + eventHandler: (State, Event) ⇒ State): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler) /** * The `CommandHandler` defines how to act on commands. A `CommandHandler` is @@ -88,24 +86,24 @@ object PersistentBehavior { * * Not for user extension */ -@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] { +@DoNotInherit trait EventSourcedBehavior[Command, Event, State] extends DeferredBehavior[Command] { def persistenceId: PersistenceId /** * The `callback` function is called to notify that the recovery process has finished. */ - def onRecoveryCompleted(callback: State ⇒ Unit): PersistentBehavior[Command, Event, State] + def onRecoveryCompleted(callback: State ⇒ Unit): EventSourcedBehavior[Command, Event, State] /** * The `callback` function is called to notify that recovery has failed. For setting a supervision * strategy `onPersistFailure` */ - def onRecoveryFailure(callback: Throwable ⇒ Unit): PersistentBehavior[Command, Event, State] + def onRecoveryFailure(callback: Throwable ⇒ Unit): EventSourcedBehavior[Command, Event, State] /** * The `callback` function is called to notify when a snapshot is complete. */ - def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] + def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): EventSourcedBehavior[Command, Event, State] /** * Initiates a snapshot if the given function returns true. @@ -114,23 +112,23 @@ object PersistentBehavior { * * `predicate` receives the State, Event and the sequenceNr used for the Event */ - def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): PersistentBehavior[Command, Event, State] + def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State] /** * Snapshot every N events * * `numberOfEvents` should be greater than 0 */ - def snapshotEvery(numberOfEvents: Long): PersistentBehavior[Command, Event, State] + def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] /** * Change the journal plugin id that this actor should use. */ - def withJournalPluginId(id: String): PersistentBehavior[Command, Event, State] + def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] /** * Change the snapshot store plugin id that this actor should use. */ - def withSnapshotPluginId(id: String): PersistentBehavior[Command, Event, State] + def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] /** * Changes the snapshot selection criteria used by this behavior. @@ -140,18 +138,18 @@ object PersistentBehavior { * You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be * performed by replaying all events -- which may take a long time. */ - def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): PersistentBehavior[Command, Event, State] + def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] /** * The `tagger` function should give event tags, which will be used in persistence query */ - def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] + def withTagger(tagger: Event ⇒ Set[String]): EventSourcedBehavior[Command, Event, State] /** * Transform the event in another type before giving to the journal. Can be used to wrap events * in types Journals understand but is of a different type than `Event`. */ - def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] + def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] /** * Back off strategy for persist failures. @@ -161,6 +159,6 @@ object PersistentBehavior { * * If not specified the actor will be stopped on failure. */ - def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] + def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java similarity index 88% rename from akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java rename to akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java index cb8bd7535a..d7b84f6032 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/EventSourcedActorFailureTest.java @@ -19,14 +19,15 @@ import org.scalatest.junit.JUnitSuite; import java.time.Duration; -import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf; +import static akka.persistence.typed.scaladsl.EventSourcedBehaviorFailureSpec.conf; -class FailingPersistentActor extends PersistentBehavior { +class FailingEventSourcedActor extends EventSourcedBehavior { private final ActorRef probe; private final ActorRef recoveryFailureProbe; - FailingPersistentActor(PersistenceId persistenceId, ActorRef probe, ActorRef recoveryFailureProbe) { + FailingEventSourcedActor(PersistenceId persistenceId, ActorRef probe, ActorRef recoveryFailureProbe) { + super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); this.probe = probe; this.recoveryFailureProbe = recoveryFailureProbe; @@ -64,7 +65,7 @@ class FailingPersistentActor extends PersistentBehavior } } -public class PersistentActorFailureTest extends JUnitSuite { +public class EventSourcedActorFailureTest extends JUnitSuite { public static final Config config = conf().withFallback(ConfigFactory.load()); @@ -72,7 +73,7 @@ public class PersistentActorFailureTest extends JUnitSuite { public static final TestKitJunitResource testKit = new TestKitJunitResource(config); public static Behavior fail(PersistenceId pid, ActorRef probe, ActorRef recoveryFailureProbe) { - return new FailingPersistentActor(pid, probe, recoveryFailureProbe); + return new FailingEventSourcedActor(pid, probe, recoveryFailureProbe); } public static Behavior fail(PersistenceId pid, ActorRef probe) { return fail(pid, probe, testKit.createTestProbe().ref()); diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java index 46f0e1038d..247c5f921d 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java @@ -26,7 +26,7 @@ public class NullEmptyStateTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); - static class NullEmptyState extends PersistentBehavior { + static class NullEmptyState extends EventSourcedBehavior { private final ActorRef probe; diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index de2ed847fe..9dce1e026d 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -91,8 +91,8 @@ public class PersistentActorCompileOnlyTest { //#behavior - public static PersistentBehavior pb = - new PersistentBehavior(new PersistenceId("p1")) { + public static EventSourcedBehavior pb = + new EventSourcedBehavior(new PersistenceId("p1")) { @Override public SimpleState emptyState() { @@ -161,8 +161,8 @@ public class PersistentActorCompileOnlyTest { //#commonChainedEffects - private PersistentBehavior pa = - new PersistentBehavior(new PersistenceId("pa")) { + private EventSourcedBehavior pa = + new EventSourcedBehavior(new PersistenceId("pa")) { @Override public ExampleState emptyState() { @@ -281,7 +281,7 @@ public class PersistentActorCompileOnlyTest { // #actor-context // #actor-context - class MyPersistentBehavior extends PersistentBehavior { + class MyPersistentBehavior extends EventSourcedBehavior { // this makes the context available to the command handler etc. private final ActorContext ctx; diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index eaccaa4bf4..82973a578e 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -37,7 +37,7 @@ import java.io.Serializable; import java.time.Duration; import java.util.*; -import static akka.persistence.typed.scaladsl.PersistentBehaviorSpec.*; +import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -263,7 +263,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { EventAdapter transformer) { return Behaviors.setup(ctx -> { - return new PersistentBehavior(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) { + return new EventSourcedBehavior(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) { @Override public CommandHandler commandHandler() { return commandHandlerBuilder(State.class) diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java index 9d607f5b34..7178d1d0da 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java @@ -24,7 +24,7 @@ public class PrimitiveStateTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); - static class PrimitiveState extends PersistentBehavior { + static class PrimitiveState extends EventSourcedBehavior { private final ActorRef probe; diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java index 8e518b2bd3..5f78419b67 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java @@ -11,9 +11,9 @@ import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; -public class AccountExample extends PersistentBehavior { +public class AccountExample extends EventSourcedBehavior { interface AccountCommand {} public static class CreateAccount implements AccountCommand {} diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 4ea50eb805..28f0396fb9 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -6,15 +6,13 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; -import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; import java.time.Duration; -import java.util.Collections; import java.util.Set; public class BasicPersistentBehaviorTest { @@ -25,7 +23,7 @@ public class BasicPersistentBehaviorTest { public static class State {} //#supervision - public static class MyPersistentBehavior extends PersistentBehavior { + public static class MyPersistentBehavior extends EventSourcedBehavior { public MyPersistentBehavior(PersistenceId persistenceId) { super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2)); } @@ -65,7 +63,7 @@ public class BasicPersistentBehaviorTest { //#tagging } - static PersistentBehavior persistentBehavior = + static EventSourcedBehavior eventSourcedBehavior = new MyPersistentBehavior(new PersistenceId("pid")); //#structure diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java index e499cf742e..d1233ed92e 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java @@ -13,7 +13,7 @@ import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; public class BlogPostExample { @@ -147,7 +147,7 @@ public class BlogPostExample { //#commands //#behavior - public static class BlogBehavior extends PersistentBehavior { + public static class BlogBehavior extends EventSourcedBehavior { //#behavior private final ActorContext ctx; diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java index 0d23793999..d2c853161b 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java @@ -10,13 +10,13 @@ import akka.actor.typed.Behavior; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; import java.util.Collections; import java.util.HashSet; import java.util.Set; -public class MovieWatchList extends PersistentBehavior { +public class MovieWatchList extends EventSourcedBehavior { interface Command { } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java index 03860b84b8..8de95fa77b 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java @@ -10,7 +10,7 @@ import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; import java.util.Objects; @@ -117,7 +117,7 @@ public class NullBlogState { } } - public static class BlogBehavior extends PersistentBehavior { + public static class BlogBehavior extends EventSourcedBehavior { private CommandHandlerBuilder initialCommandHandler() { return commandHandlerBuilder(Objects::isNull) diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java index 339342985c..047aade428 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java @@ -10,7 +10,7 @@ import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; import java.util.Optional; @@ -117,7 +117,7 @@ public class OptionalBlogState { } } - public static class BlogBehavior extends PersistentBehavior> { + public static class BlogBehavior extends EventSourcedBehavior> { private CommandHandlerBuilder, Optional> initialCommandHandler() { return commandHandlerBuilder(state -> !state.isPresent()) diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java index cd6fac90fb..00e677b1df 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java @@ -11,7 +11,7 @@ import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.EventHandler; -import akka.persistence.typed.javadsl.PersistentBehavior; +import akka.persistence.typed.javadsl.EventSourcedBehavior; import static jdocs.akka.persistence.typed.auction.AuctionCommand.*; import static jdocs.akka.persistence.typed.auction.AuctionEvent.*; @@ -24,7 +24,7 @@ import java.util.UUID; /** * Based on https://github.com/lagom/online-auction-java/blob/master/bidding-impl/src/main/java/com/example/auction/bidding/impl/AuctionEntity.java */ -public class AuctionEntity extends PersistentBehavior { +public class AuctionEntity extends EventSourcedBehavior { private final UUID entityUUID; diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala index ba80d79521..f5d35edb8d 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala @@ -6,8 +6,8 @@ package akka.persistence.typed import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps -import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler -import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } +import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } import akka.testkit.TestLatch import akka.actor.testkit.typed.scaladsl.TestProbe @@ -25,8 +25,8 @@ object ManyRecoveriesSpec { def persistentBehavior( name: String, probe: TestProbe[String], - latch: Option[TestLatch]): PersistentBehavior[Cmd, Evt, String] = - PersistentBehavior[Cmd, Evt, String]( + latch: Option[TestLatch]): EventSourcedBehavior[Cmd, Evt, String] = + EventSourcedBehavior[Cmd, Evt, String]( persistenceId = PersistenceId(name), emptyState = "", commandHandler = CommandHandler.command { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala index 978473c4b5..c4cbd1f143 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala @@ -11,8 +11,8 @@ import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.Persistence import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecoveryPermit, ReturnRecoveryPermit } -import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler -import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } +import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } import akka.testkit.EventFilter import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -44,7 +44,7 @@ object RecoveryPermitterSpec { commandProbe: TestProbe[Any], eventProbe: TestProbe[Any], throwOnRecovery: Boolean = false): Behavior[Command] = - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId(name), emptyState = EmptyState, commandHandler = CommandHandler.command { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashReferenceManagementTest.scala similarity index 82% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashReferenceManagementTest.scala index 4b0f083eb4..4741a4b384 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/StashReferenceManagementTest.scala @@ -6,17 +6,16 @@ package akka.persistence.typed.internal import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ Behavior, Signal } -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol -import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted } import akka.actor.testkit.typed.scaladsl.TestProbe import scala.concurrent.duration._ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import org.scalatest.WordSpecLike -class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit with WordSpecLike { +class StashReferenceManagementTest extends ScalaTestWithActorTestKit with WordSpecLike { + import InternalProtocol._ - case class Impl() extends EventsourcedStashReferenceManagement + case class Impl() extends StashReferenceManagement "EventsourcedStashReferenceManagement instance" should { "initialize stash only once" in { @@ -44,7 +43,7 @@ class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit } } - object TestBehavior extends EventsourcedStashReferenceManagement { + object TestBehavior extends StashReferenceManagement { def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = { val settings = dummySettings() @@ -65,7 +64,7 @@ class EventsourcedStashReferenceManagementTest extends ScalaTestWithActorTestKit } private def dummySettings(capacity: Int = 42) = - EventsourcedSettings( + EventSourcedSettings( stashCapacity = capacity, stashOverflowStrategyConfigurator = "akka.persistence.ThrowExceptionConfigurator", logOnStashing = false, diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala similarity index 87% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index 4c48d8039b..338ef45253 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -53,7 +53,7 @@ class ChaosJournal extends InmemJournal { } } -object PersistentBehaviorFailureSpec { +object EventSourcedBehaviorFailureSpec { val conf = ConfigFactory.parseString( s""" @@ -66,23 +66,25 @@ object PersistentBehaviorFailureSpec { """).withFallback(ConfigFactory.load("reference.conf")).resolve() } -class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(PersistentBehaviorFailureSpec.conf) with WordSpecLike { +class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorFailureSpec.conf) with WordSpecLike { implicit val testSettings = TestKitSettings(system) - def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): PersistentBehavior[String, String, String] = PersistentBehavior[String, String, String]( - pid, "", - (_, cmd) ⇒ { - probe.tell("persisting") - Effect.persist(cmd) - }, - (state, event) ⇒ { - probe.tell(event) - state + event - } - ).onRecoveryCompleted { _ ⇒ - probe.tell("starting") - }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) + def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): EventSourcedBehavior[String, String, String] = + EventSourcedBehavior[String, String, String]( + + pid, "", + (_, cmd) ⇒ { + probe.tell("persisting") + Effect.persist(cmd) + }, + (state, event) ⇒ { + probe.tell(event) + state + event + } + ).onRecoveryCompleted { _ ⇒ + probe.tell("starting") + }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) "A typed persistent actor (failures)" must { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala similarity index 91% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala index 52432e7b8e..a06d825742 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorReplySpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike -object PersistentBehaviorReplySpec { +object EventSourcedBehaviorReplySpec { def conf: Config = ConfigFactory.parseString( s""" akka.loglevel = INFO @@ -47,8 +47,8 @@ object PersistentBehaviorReplySpec { def counter( ctx: ActorContext[Command[_]], - persistenceId: PersistenceId): PersistentBehavior[Command[_], Event, State] = { - PersistentBehavior.withEnforcedReplies[Command[_], Event, State]( + persistenceId: PersistenceId): EventSourcedBehavior[Command[_], Event, State] = { + EventSourcedBehavior.withEnforcedReplies[Command[_], Event, State]( persistenceId, emptyState = State(0, Vector.empty), commandHandler = (state, cmd) ⇒ cmd match { @@ -76,9 +76,9 @@ object PersistentBehaviorReplySpec { } } -class PersistentBehaviorReplySpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike { +class EventSourcedBehaviorReplySpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike { - import PersistentBehaviorReplySpec._ + import EventSourcedBehaviorReplySpec._ val pidCounter = new AtomicInteger(0) private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala similarity index 97% rename from akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala rename to akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index b6f9c9cd39..93d51c2cf4 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -30,7 +30,7 @@ import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike -object PersistentBehaviorSpec { +object EventSourcedBehaviorSpec { //#event-wrapper case class Wrapper[T](t: T) @@ -106,19 +106,19 @@ object PersistentBehaviorSpec { def counter(persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] = Behaviors.setup(ctx ⇒ counter(ctx, persistenceId, logging)) - def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = counter(ctx, persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) - def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) - def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe) - def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref) - def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): EventSourcedBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe) def counter( @@ -126,8 +126,8 @@ object PersistentBehaviorSpec { persistenceId: PersistenceId, loggingActor: ActorRef[String], probe: ActorRef[(State, Event)], - snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = { - PersistentBehavior[Command, Event, State]( + snapshotProbe: ActorRef[Try[Done]]): EventSourcedBehavior[Command, Event, State] = { + EventSourcedBehavior[Command, Event, State]( persistenceId, emptyState = State(0, Vector.empty), commandHandler = (state, cmd) ⇒ cmd match { @@ -225,9 +225,9 @@ object PersistentBehaviorSpec { } -class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike { +class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorSpec.conf) with WordSpecLike { - import PersistentBehaviorSpec._ + import EventSourcedBehaviorSpec._ implicit val testSettings = TestKitSettings(system) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala index 6e8d9d9e0c..6481e2333a 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala @@ -25,7 +25,7 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co implicit val testSettings = TestKitSettings(system) def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = - PersistentBehavior[String, String, String]( + EventSourcedBehavior[String, String, String]( persistenceId, emptyState = null, commandHandler = (_, command) ⇒ { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala index b27ae810c4..4e01a7a210 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala @@ -9,7 +9,7 @@ import java.util.UUID import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps } import akka.event.Logging -import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler +import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler import akka.actor.testkit.typed.scaladsl.TestProbe import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike @@ -27,7 +27,7 @@ object OptionalSnapshotStoreSpec { def persistentBehavior( probe: TestProbe[State], name: String = UUID.randomUUID().toString) = - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId(name), emptyState = State(), commandHandler = CommandHandler.command { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 6739b808be..a62c8dcb52 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -8,7 +8,7 @@ import java.util.UUID import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, SupervisorStrategy } -import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler +import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler import akka.actor.testkit.typed.TE import akka.actor.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory @@ -62,7 +62,7 @@ object PerformanceSpec { def behavior(name: String, probe: TestProbe[Command])(other: (Command, Parameters) ⇒ Effect[String, String]) = { Behaviors.supervise({ val parameters = Parameters() - PersistentBehavior[Command, String, String]( + EventSourcedBehavior[Command, String, String]( persistenceId = PersistenceId(name), "", commandHandler = CommandHandler.command { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index d85a6cb983..826f786c64 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -15,7 +15,7 @@ import akka.persistence.typed.SideEffect object PersistentActorCompileOnlyTest { - import akka.persistence.typed.scaladsl.PersistentBehavior._ + import akka.persistence.typed.scaladsl.EventSourcedBehavior._ object Simple { //#command @@ -44,8 +44,8 @@ object PersistentActorCompileOnlyTest { //#event-handler //#behavior - val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] = - PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState]( + val simpleBehavior: EventSourcedBehavior[SimpleCommand, SimpleEvent, ExampleState] = + EventSourcedBehavior[SimpleCommand, SimpleEvent, ExampleState]( persistenceId = PersistenceId("sample-id-1"), emptyState = ExampleState(Nil), commandHandler = commandHandler, @@ -65,7 +65,7 @@ object PersistentActorCompileOnlyTest { case class ExampleState(events: List[String] = Nil) - PersistentBehavior[MyCommand, MyEvent, ExampleState]( + EventSourcedBehavior[MyCommand, MyEvent, ExampleState]( persistenceId = PersistenceId("sample-id-1"), emptyState = ExampleState(Nil), @@ -110,7 +110,7 @@ object PersistentActorCompileOnlyTest { } val behavior: Behavior[Command] = Behaviors.setup(ctx ⇒ - PersistentBehavior[Command, Event, EventsInFlight]( + EventSourcedBehavior[Command, Event, EventsInFlight]( persistenceId = PersistenceId("recovery-complete-id"), emptyState = EventsInFlight(0, Map.empty), @@ -152,7 +152,7 @@ object PersistentActorCompileOnlyTest { sealed trait Event case class MoodChanged(to: Mood) extends Event - val b: Behavior[Command] = PersistentBehavior[Command, Event, Mood]( + val b: Behavior[Command] = EventSourcedBehavior[Command, Event, Mood]( persistenceId = PersistenceId("myPersistenceId"), emptyState = Happy, commandHandler = { (state, command) ⇒ @@ -194,7 +194,7 @@ object PersistentActorCompileOnlyTest { case class State(tasksInFlight: List[Task]) - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("asdf"), emptyState = State(Nil), commandHandler = CommandHandler.command { @@ -222,7 +222,7 @@ object PersistentActorCompileOnlyTest { def worker(task: Task): Behavior[Nothing] = ??? val behavior: Behavior[Command] = Behaviors.setup(ctx ⇒ - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("asdf"), emptyState = State(Nil), commandHandler = (_, cmd) ⇒ cmd match { @@ -285,7 +285,7 @@ object PersistentActorCompileOnlyTest { .persist[Event, List[Id]](ItemAdded(id)) .thenRun(_ ⇒ metadataRegistry ! GetMetaData(id, adapt)) - PersistentBehavior[Command, Event, List[Id]]( + EventSourcedBehavior[Command, Event, List[Id]]( persistenceId = PersistenceId("basket-1"), emptyState = Nil, commandHandler = { (state, cmd) ⇒ @@ -377,7 +377,7 @@ object PersistentActorCompileOnlyTest { case (state, Remembered(_)) ⇒ state } - PersistentBehavior[Command, Event, Mood]( + EventSourcedBehavior[Command, Event, Mood]( persistenceId = PersistenceId("myPersistenceId"), emptyState = Sad, commandHandler, @@ -405,7 +405,7 @@ object PersistentActorCompileOnlyTest { case (state, Done) ⇒ state } - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("myPersistenceId"), emptyState = new State, commandHandler, @@ -417,7 +417,7 @@ object PersistentActorCompileOnlyTest { class First extends State class Second extends State - PersistentBehavior[String, String, State]( + EventSourcedBehavior[String, String, State]( persistenceId = PersistenceId("myPersistenceId"), emptyState = new First, commandHandler = CommandHandler.command { @@ -442,7 +442,7 @@ object PersistentActorCompileOnlyTest { // #actor-context val behavior: Behavior[String] = Behaviors.setup { ctx ⇒ - PersistentBehavior[String, String, State]( + EventSourcedBehavior[String, String, State]( persistenceId = PersistenceId("myPersistenceId"), emptyState = new State, commandHandler = CommandHandler.command { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala index a917f12c00..5cca8f5b53 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala @@ -25,7 +25,7 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co implicit val testSettings = TestKitSettings(system) def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] = - PersistentBehavior[Int, Int, Int]( + EventSourcedBehavior[Int, Int, Int]( persistenceId, emptyState = 0, commandHandler = (_, command) ⇒ { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala index 5de5d9340f..918d83ad10 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala @@ -9,7 +9,7 @@ import akka.actor.typed.Behavior import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior /** * Bank account example illustrating: @@ -140,7 +140,7 @@ object AccountExampleWithCommandHandlersInState { } def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account]( + EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account]( PersistenceId(s"Account|$accountNumber"), EmptyAccount, (state, cmd) ⇒ state.applyCommand(cmd), diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala index 4d7be26e61..d2bc643945 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala @@ -9,7 +9,7 @@ import akka.actor.typed.Behavior import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.ReplyEffect /** @@ -96,7 +96,7 @@ object AccountExampleWithEventHandlersInState { //#withEnforcedReplies def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - PersistentBehavior.withEnforcedReplies( + EventSourcedBehavior.withEnforcedReplies( PersistenceId(s"Account|$accountNumber"), EmptyAccount, commandHandler, diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala index 7643dfa70d..31586c9df6 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala @@ -9,7 +9,7 @@ import akka.actor.typed.Behavior import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior /** * Bank account example illustrating: @@ -123,7 +123,7 @@ object AccountExampleWithOptionState { } def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]]( + EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]]( PersistenceId(s"Account|$accountNumber"), None, (state, cmd) ⇒ state match { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 1e5b252409..0d735e4fa0 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -7,7 +7,7 @@ package docs.akka.persistence.typed import akka.actor.typed.ActorRef import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior import scala.concurrent.duration._ import akka.persistence.typed.PersistenceId @@ -20,7 +20,7 @@ object BasicPersistentBehaviorCompileOnly { case class State() val behavior: Behavior[Command] = - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = @@ -37,7 +37,7 @@ object BasicPersistentBehaviorCompileOnly { //#recovery val recoveryBehavior: Behavior[Command] = - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = @@ -53,7 +53,7 @@ object BasicPersistentBehaviorCompileOnly { //#tagging val taggingBehavior: Behavior[Command] = - PersistentBehavior[Command, Event, State]( + EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = @@ -66,7 +66,7 @@ object BasicPersistentBehaviorCompileOnly { //#tagging //#wrapPersistentBehavior - val samplePersistentBehavior = PersistentBehavior[Command, Event, State]( + val samplePersistentBehavior = EventSourcedBehavior[Command, Event, State]( persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala index 3949997aaf..f50374e85c 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala @@ -9,7 +9,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior object BlogPostExample { @@ -56,7 +56,7 @@ object BlogPostExample { //#behavior def behavior(entityId: String): Behavior[BlogCommand] = - PersistentBehavior[BlogCommand, BlogEvent, BlogState]( + EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( persistenceId = PersistenceId(s"Blog-$entityId"), emptyState = BlankState, commandHandler, diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala index 1cb92d85d5..923042b807 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala @@ -8,8 +8,8 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.PersistentBehavior -import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler object MovieWatchList { sealed trait Command @@ -44,7 +44,7 @@ object MovieWatchList { } def behavior(userId: String): Behavior[Command] = { - PersistentBehavior[Command, Event, MovieList]( + EventSourcedBehavior[Command, Event, MovieList]( persistenceId = PersistenceId(s"movies-$userId"), emptyState = MovieList(Set.empty), commandHandler, diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala similarity index 88% rename from akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala index 9d19648bd9..98f35368c8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorDeleteFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorDeleteFailureSpec.scala @@ -14,7 +14,7 @@ import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace -object PersistentActorDeleteFailureSpec { +object EventSourcedActorDeleteFailureSpec { case class DeleteTo(n: Long) class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -44,11 +44,11 @@ object PersistentActorDeleteFailureSpec { } -class PersistentActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( +class EventSourcedActorDeleteFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( """ - akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorDeleteFailureSpec$DeleteFailingInmemJournal" + akka.persistence.journal.inmem.class = "akka.persistence.EventSourcedActorDeleteFailureSpec$DeleteFailingInmemJournal" """))) with ImplicitSender { - import PersistentActorDeleteFailureSpec._ + import EventSourcedActorDeleteFailureSpec._ system.eventStream.publish(TestEvent.Mute(EventFilter[akka.pattern.AskTimeoutException]())) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala similarity index 97% rename from akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala rename to akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala index aed8350566..2757a004e6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala @@ -6,16 +6,16 @@ package akka.persistence import akka.actor.{ OneForOneStrategy, _ } import akka.persistence.journal.AsyncWriteJournal +import akka.persistence.journal.inmem.InmemJournal import akka.testkit.{ EventFilter, ImplicitSender, TestEvent, TestProbe } import scala.collection.immutable import scala.util.control.NoStackTrace import scala.util.{ Failure, Try } -import akka.persistence.journal.inmem.InmemJournal import scala.concurrent.Future -object PersistentActorFailureSpec { +object EventSourcedActorFailureSpec { import PersistentActorSpec.{ Cmd, Evt, ExamplePersistentActor } class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -140,12 +140,12 @@ object PersistentActorFailureSpec { } } -class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( +class EventSourcedActorFailureSpec extends PersistenceSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( """ - akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal" + akka.persistence.journal.inmem.class = "akka.persistence.EventSourcedActorFailureSpec$FailingInmemJournal" """))) with ImplicitSender { - import PersistentActorFailureSpec._ + import EventSourcedActorFailureSpec._ import PersistentActorSpec._ system.eventStream.publish(TestEvent.Mute(