From 4131036a125871645a8b69992abd2089bac41de2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 17 Oct 2018 13:53:50 +0200 Subject: [PATCH] PersistenceId type to differentiate between persistenceId and entityId, #25703 (#25704) * PersistenceId type to differentiate between persistenceId and entityId, #25703 * both entityId (for sharding) and persistenceId as String types was easy mix-up * utility method in EntityTypeKey to concatenaty the type and entityId to a unique persistenceId * support custom separator to enable compatilbility with Lagom's javadsl --- .../typed/internal/ClusterShardingImpl.scala | 33 +++++++++++- .../typed/javadsl/ClusterSharding.scala | 26 +++++++++ .../typed/scaladsl/ClusterSharding.scala | 25 +++++++++ .../ClusterShardingPersistenceSpec.scala | 7 +-- .../typed/scaladsl/EntityTypeKeySpec.scala | 53 +++++++++++++++++++ .../ClusterSingletonPersistenceSpec.scala | 3 +- .../typed/EventRejectedException.scala | 4 +- .../persistence/typed/PersistenceId.scala | 11 ++++ .../typed/internal/EventsourcedBehavior.scala | 5 +- .../EventsourcedJournalInteractions.scala | 10 ++-- .../EventsourcedReplayingEvents.scala | 5 +- .../typed/internal/EventsourcedSetup.scala | 3 +- .../internal/JournalFailureException.scala | 5 +- .../internal/PersistentBehaviorImpl.scala | 5 +- .../typed/javadsl/PersistentBehavior.scala | 6 +-- .../typed/scaladsl/PersistentBehavior.scala | 5 +- .../PersistentActorCompileOnlyTest.java | 16 ++++-- .../javadsl/PersistentActorFailureTest.java | 7 +-- .../javadsl/PersistentActorJavaDslTest.java | 45 ++++++++-------- .../persistence/typed/AccountExample.java | 3 +- .../typed/BasicPersistentBehaviorTest.java | 8 +-- .../typed/InDepthPersistentBehaviorTest.java | 5 +- .../persistence/typed/MovieWatchList.java | 5 +- .../persistence/typed/OptionalBlogState.java | 3 +- .../typed/ManyRecoveriesSpec.scala | 2 +- .../internal/RecoveryPermitterSpec.scala | 5 +- .../scaladsl/OptionalSnapshotStoreSpec.scala | 3 +- .../typed/scaladsl/PerformanceSpec.scala | 5 +- .../PersistentActorCompileOnlyTest.scala | 24 +++++---- .../PersistentBehaviorFailureSpec.scala | 11 ++-- .../scaladsl/PersistentBehaviorSpec.scala | 39 +++++++------- .../persistence/typed/AccountExample1.scala | 3 +- .../persistence/typed/AccountExample2.scala | 3 +- .../BasicPersistentBehaviorCompileOnly.scala | 11 ++-- .../typed/InDepthPersistentBehaviorSpec.scala | 3 +- .../persistence/typed/MovieWatchList.scala | 3 +- 36 files changed, 295 insertions(+), 115 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/EntityTypeKeySpec.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 9f1c715c90..e2d35590b8 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -36,6 +36,7 @@ import akka.event.LoggingAdapter import akka.japi.function.{ Function ⇒ JFunction } import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef +import akka.persistence.typed.PersistenceId import akka.util.ByteString import akka.util.Timeout @@ -75,8 +76,38 @@ import akka.util.Timeout /** * INTERNAL API */ -@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String) +@InternalApi private[akka] object EntityTypeKeyImpl { + /** + * Default separator character used for concatenating EntityTypeKey with entityId to construct unique persistenceId. + * This must be same as in Lagom's `scaladsl.PersistentEntity`, for compatibility. No separator is used + * in Lagom's `javadsl.PersistentEntity` so for compatibility with that the `""` separator must be defined + * `withEntityIdSeparator`. + */ + val EntityIdSeparator = "|" +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String, + entityIdSeparator: String = EntityTypeKeyImpl.EntityIdSeparator) extends javadsl.EntityTypeKey[T] with scaladsl.EntityTypeKey[T] { + + if (!entityIdSeparator.isEmpty && name.contains(entityIdSeparator)) + throw new IllegalArgumentException(s"EntityTypeKey.name [$name] contains [$entityIdSeparator] which is " + + "a reserved character") + + override def persistenceIdFrom(entityId: String): PersistenceId = { + if (!entityIdSeparator.isEmpty && entityId.contains(entityIdSeparator)) + throw new IllegalArgumentException(s"entityId [$entityId] contains [$entityIdSeparator] which is " + + "a reserved character") + + PersistenceId(name + entityIdSeparator + entityId) + } + + override def withEntityIdSeparator(separator: String): EntityTypeKeyImpl[T] = + EntityTypeKeyImpl[T](name, messageClassName, separator) + override def toString: String = s"EntityTypeKey[$messageClassName]($name)" } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index a959d85627..6f3d98568b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -20,6 +20,7 @@ import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.japi.function.{ Function ⇒ JFunction } +import akka.persistence.typed.PersistenceId import akka.util.Timeout @FunctionalInterface @@ -304,12 +305,37 @@ object StartEntity { * Not for user extension. */ @DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T] ⇒ + + /** + * Name of the entity type. + */ def name: String /** * INTERNAL API */ @InternalApi private[akka] def asScala: scaladsl.EntityTypeKey[T] = scaladslSelf + + /** + * Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by + * concatenating them with `|` separator. + * + * The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used + * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity` + * you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]]. + */ + def persistenceIdFrom(entityId: String): PersistenceId + + /** + * Specify a custom separator for compatibility with old naming conventions. The separator is used between the + * `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]]. + * + * The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used + * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity` + * you should use `""` as the separator here. + */ + def withEntityIdSeparator(separator: String): EntityTypeKey[T] + } object EntityTypeKey { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 9826aaea76..bf1d75395d 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -25,6 +25,7 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.cluster.sharding.ShardRegion.{ StartEntity ⇒ UntypedStartEntity } +import akka.persistence.typed.PersistenceId object ClusterSharding extends ExtensionId[ClusterSharding] { @@ -308,7 +309,31 @@ object StartEntity { * Not for user extension. */ @DoNotInherit trait EntityTypeKey[T] { + + /** + * Name of the entity type. + */ def name: String + + /** + * Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by + * concatenating them with `|` separator. + * + * The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used + * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity` + * you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]]. + */ + def persistenceIdFrom(entityId: String): PersistenceId + + /** + * Specify a custom separator for compatibility with old naming conventions. The separator is used between the + * `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]]. + * + * The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used + * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity` + * you should use `""` as the separator here. + */ + def withEntityIdSeparator(separator: String): EntityTypeKey[T] } object EntityTypeKey { diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 766ae78aca..ec7d0cd094 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -13,6 +13,7 @@ import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory import org.scalatest.{ WordSpec, WordSpecLike } @@ -43,9 +44,11 @@ object ClusterShardingPersistenceSpec { final case class Get(replyTo: ActorRef[String]) extends Command final case object StopPlz extends Command + val typeKey = EntityTypeKey[Command]("test") + def persistentActor(entityId: String): Behavior[Command] = PersistentBehavior[Command, String, String]( - entityId, + persistenceId = typeKey.persistenceIdFrom(entityId), emptyState = "", commandHandler = (state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) @@ -56,8 +59,6 @@ object ClusterShardingPersistenceSpec { }, eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) - val typeKey = EntityTypeKey[Command]("test") - } class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike { diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/EntityTypeKeySpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/EntityTypeKeySpec.scala new file mode 100644 index 0000000000..12e0ea6706 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/EntityTypeKeySpec.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.persistence.typed.PersistenceId +import org.scalatest.Matchers +import org.scalatest.WordSpec + +class EntityTypeKeySpec extends WordSpec with Matchers { + + "EntityTypeKey" must { + "use | as default entityIdSeparator for compatibility with Lagom's scaladsl" in { + EntityTypeKey[String]("MyType").persistenceIdFrom("abc") should ===(PersistenceId("MyType|abc")) + } + + "support custom entityIdSeparator for compatibility with Lagom's javadsl" in { + EntityTypeKey[String]("MyType").withEntityIdSeparator("") + .persistenceIdFrom("abc") should ===(PersistenceId("MyTypeabc")) + } + + "support custom entityIdSeparator for compatibility with other naming" in { + EntityTypeKey[String]("MyType").withEntityIdSeparator("#/#") + .persistenceIdFrom("abc") should ===(PersistenceId("MyType#/#abc")) + } + + "not allow | in name because it's the default entityIdSeparator" in { + intercept[IllegalArgumentException] { + EntityTypeKey[String]("Invalid | name") + } + } + + "not allow custom separator in name" in { + intercept[IllegalArgumentException] { + EntityTypeKey[String]("Invalid name").withEntityIdSeparator(" ") + } + } + + "not allow | in entityId because it's the default entityIdSeparator" in { + intercept[IllegalArgumentException] { + EntityTypeKey[String]("SomeType").persistenceIdFrom("A|B") + } + } + + "not allow custom separator in entityId" in { + intercept[IllegalArgumentException] { + EntityTypeKey[String]("SomeType").withEntityIdSeparator("#").persistenceIdFrom("A#B") + } + } + } + +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index a7ea4aa1e2..511bbc412b 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -8,6 +8,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.{ ActorRef, Behavior, Props } import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike @@ -37,7 +38,7 @@ object ClusterSingletonPersistenceSpec { val persistentActor: Behavior[Command] = PersistentBehavior[Command, String, String]( - persistenceId = "TheSingleton", + persistenceId = PersistenceId("TheSingleton"), emptyState = "", commandHandler = (state, cmd) ⇒ cmd match { case Add(s) ⇒ Effect.persist(s) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala index e51e2d373f..15fa3862a4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala @@ -7,5 +7,5 @@ package akka.persistence.typed /** * Thrown if a journal rejects an event e.g. due to a serialization error. */ -final class EventRejectedException(persistenceId: String, sequenceNr: Long, cause: Throwable) - extends RuntimeException(s"PersistenceId $persistenceId sequenceNr: $sequenceNr", cause) +final class EventRejectedException(persistenceId: PersistenceId, sequenceNr: Long, cause: Throwable) + extends RuntimeException(s"Rejected event, persistenceId [${persistenceId.id}], sequenceNr [$sequenceNr]", cause) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala new file mode 100644 index 0000000000..f6f5da482e --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala @@ -0,0 +1,11 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Unique identifier in the backend data store (journal and snapshot store) of the + * persistent actor. + */ +final case class PersistenceId(id: String) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala index 8015ee78ef..0b1c50c976 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedBehavior.scala @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi +import akka.persistence.typed.PersistenceId /** INTERNAL API */ @InternalApi @@ -36,9 +37,9 @@ private[akka] object EventsourcedBehavior { val PersistingEvents = "persist-evts" // format: ON - def create(persistenceId: String, phaseName: String): Map[String, Any] = { + def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = { Map( - "persistenceId" → persistenceId, + "persistenceId" → persistenceId.id, "phase" → phaseName ) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala index cc53ff5dcd..91bb60f0b6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -34,7 +34,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { val senderNotKnownBecauseAkkaTyped = null val repr = PersistentRepr( event, - persistenceId = setup.persistenceId, + persistenceId = setup.persistenceId.id, sequenceNr = newState.seqNr, writerUuid = setup.writerIdentity.writerUuid, sender = senderNotKnownBecauseAkkaTyped @@ -56,7 +56,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { newState = newState.nextSequenceNr() PersistentRepr( event, - persistenceId = setup.persistenceId, + persistenceId = setup.persistenceId.id, sequenceNr = newState.seqNr, writerUuid = setup.writerIdentity.writerUuid, sender = ActorRef.noSender) @@ -71,7 +71,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = { setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr) - setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped) + setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId.id, setup.selfUntyped) } protected def requestRecoveryPermit(): Unit = { @@ -104,11 +104,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { * to the running [[PersistentActor]]. */ protected def loadSnapshot(criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = { - setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntyped) + setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfUntyped) } protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = { - setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntyped) + setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), setup.selfUntyped) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 9aff9f8a7a..946cff62db 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -144,9 +144,10 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set val msg = message match { case Some(evt) ⇒ - s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. PersistenceId: [${setup.persistence}]" + s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " + + s"PersistenceId [${setup.persistenceId.id}]" case None ⇒ - s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId: [${setup.persistenceId}]" + s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId [${setup.persistenceId.id}]" } throw new JournalFailureException(msg, cause) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index 7dc81b94e3..ad0fa2bab6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -21,6 +21,7 @@ import akka.util.OptionVal import scala.util.Try import akka.actor.Cancellable +import akka.persistence.typed.PersistenceId import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent /** @@ -29,7 +30,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.Rec @InternalApi private[persistence] final class EventsourcedSetup[C, E, S]( val context: ActorContext[InternalProtocol], - val persistenceId: String, + val persistenceId: PersistenceId, val emptyState: S, val commandHandler: PersistentBehavior.CommandHandler[C, E, S], val eventHandler: PersistentBehavior.EventHandler[S, E], diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala index 9eb9266fee..8336bab851 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.internal import akka.annotation.InternalApi +import akka.persistence.typed.PersistenceId /** * INTERNAL API @@ -13,6 +14,6 @@ import akka.annotation.InternalApi */ @InternalApi final private[akka] class JournalFailureException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) { - def this(persistenceId: String, sequenceNr: Long, eventType: String, cause: Throwable) = - this(s"Failed to persist event type $eventType with sequence number $sequenceNr for persistenceId [$persistenceId]", cause) + def this(persistenceId: PersistenceId, sequenceNr: Long, eventType: String, cause: Throwable) = + this(s"Failed to persist event type [$eventType] with sequence number [$sequenceNr] for persistenceId [${persistenceId.id}]", cause) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index 4e1ce47a7a..24c82b7167 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -14,9 +14,10 @@ import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl._ import akka.util.ConstantFun - import scala.util.{ Failure, Success, Try } +import akka.persistence.typed.PersistenceId + @InternalApi private[akka] object PersistentBehaviorImpl { @@ -32,7 +33,7 @@ private[akka] object PersistentBehaviorImpl { @InternalApi private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( - persistenceId: String, + persistenceId: PersistenceId, emptyState: State, commandHandler: PersistentBehavior.CommandHandler[Command, Event, State], eventHandler: PersistentBehavior.EventHandler[State, Event], diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index 90fec5b177..a763eaa7dd 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -18,13 +18,13 @@ import scala.util.{ Failure, Success } /** Java API */ @ApiMayChange -abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { +abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { - def this(persistenceId: String) = { + def this(persistenceId: PersistenceId) = { this(persistenceId, None) } - def this(persistenceId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { + def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { this(persistenceId, Some(backoffSupervisorStrategy)) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala index 981ab647fb..71ee26b629 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala @@ -11,9 +11,10 @@ import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter import akka.persistence.typed.internal._ - import scala.util.Try +import akka.persistence.typed.PersistenceId + object PersistentBehavior { /** @@ -38,7 +39,7 @@ object PersistentBehavior { * Create a `Behavior` for a persistent actor. */ def apply[Command, Event, State]( - persistenceId: String, + persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) ⇒ Effect[Event, State], eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index 3667055446..30591e7514 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -11,6 +11,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.EventAdapter; import akka.actor.testkit.typed.javadsl.TestInbox; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.SideEffect; import akka.util.Timeout; @@ -91,7 +92,9 @@ public class PersistentActorCompileOnlyTest { //#behavior - public static PersistentBehavior pb = new PersistentBehavior("p1") { + public static PersistentBehavior pb = + new PersistentBehavior(new PersistenceId("p1")) { + @Override public SimpleState emptyState() { return new SimpleState(); @@ -154,11 +157,14 @@ public class PersistentActorCompileOnlyTest { //#commonChainedEffects // Factored out Chained effect - static final SideEffect commonChainedEffect = SideEffect.create(s -> System.out.println("Command handled!")); + static final SideEffect commonChainedEffect = + SideEffect.create(s -> System.out.println("Command handled!")); //#commonChainedEffects - private PersistentBehavior pa = new PersistentBehavior("pa") { + private PersistentBehavior pa = + new PersistentBehavior(new PersistenceId("pa")) { + @Override public ExampleState emptyState() { return new ExampleState(); @@ -269,7 +275,7 @@ public class PersistentActorCompileOnlyTest { } // #actor-context - public Behavior behavior(String persistenceId) { + public Behavior behavior(PersistenceId persistenceId) { return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx)); } @@ -281,7 +287,7 @@ public class PersistentActorCompileOnlyTest { // this makes the context available to the command handler etc. private final ActorContext ctx; - public MyPersistentBehavior(String persistenceId, ActorContext ctx) { + public MyPersistentBehavior(PersistenceId persistenceId, ActorContext ctx) { super(persistenceId); this.ctx = ctx; } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java index 754b8d5d5b..bfbf6a8f15 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java @@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.ActorContext; +import akka.persistence.typed.PersistenceId; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -24,7 +25,7 @@ class FailingPersistentActor extends PersistentBehavior private final ActorRef probe; - FailingPersistentActor(String persistenceId, ActorRef probe) { + FailingPersistentActor(PersistenceId persistenceId, ActorRef probe) { super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); this.probe = probe; } @@ -63,14 +64,14 @@ public class PersistentActorFailureTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); - public static Behavior fail(String pid, ActorRef probe) { + public static Behavior fail(PersistenceId pid, ActorRef probe) { return new FailingPersistentActor(pid, probe); } @Test public void persistEvents() throws Exception { TestProbe probe = testKit.createTestProbe(); - Behavior p1 = fail("fail-first-2", probe.ref()); + Behavior p1 = fail(new PersistenceId("fail-first-2"), probe.ref()); ActorRef c = testKit.spawn(p1); probe.expectMessage("starting"); // fail diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index a1b99e17cd..7645312c74 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -18,6 +18,7 @@ import akka.persistence.query.Sequence; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.persistence.typed.EventAdapter; import akka.persistence.typed.NoOpEventAdapter; +import akka.persistence.typed.PersistenceId; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; @@ -170,13 +171,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite { private static String loggingOne = "one"; - private Behavior counter(String persistenceId, ActorRef> probe) { + private Behavior counter(PersistenceId persistenceId, ActorRef> probe) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>()); } - private Behavior counter(String persistenceId, + private Behavior counter(PersistenceId persistenceId, ActorRef> probe, Function> tagger) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); @@ -184,7 +185,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>()); } - private Behavior counter(String persistenceId, + private Behavior counter(PersistenceId persistenceId, ActorRef> probe, EventAdapter transformer) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); @@ -192,7 +193,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer); } - private Behavior counter(String persistenceId) { + private Behavior counter(PersistenceId persistenceId) { return counter(persistenceId, TestProbe.>create(testKit.system()).ref(), TestProbe.create(testKit.system()).ref(), @@ -204,7 +205,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } private Behavior counter( - String persistenceId, + PersistenceId persistenceId, Function3 snapshot, ActorRef> snapshotProbe ) { @@ -218,7 +219,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } private Behavior counter( - String persistentId, + PersistenceId persistentId, ActorRef> eventProbe, ActorRef loggingProbe) { return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), @@ -228,7 +229,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } private Behavior counter( - String persistentId, + PersistenceId persistentId, ActorRef> eventProbe, Function3 snapshot) { return counter(persistentId, eventProbe, testKit.createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(), @@ -237,7 +238,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } private static Behavior counter( - String persistentId, + PersistenceId persistentId, ActorRef> eventProbe, ActorRef loggingProbe, Function3 snapshot, @@ -335,7 +336,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void persistEvents() { - ActorRef c = testKit.spawn(counter("c1")); + ActorRef c = testKit.spawn(counter(new PersistenceId("c1"))); TestProbe probe = testKit.createTestProbe(); c.tell(Increment.instance); c.tell(new GetValue(probe.ref())); @@ -344,7 +345,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void replyStoredEvents() { - ActorRef c = testKit.spawn(counter("c2")); + ActorRef c = testKit.spawn(counter(new PersistenceId("c2"))); TestProbe probe = testKit.createTestProbe(); c.tell(Increment.instance); c.tell(Increment.instance); @@ -352,7 +353,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); - ActorRef c2 = testKit.spawn(counter("c2")); + ActorRef c2 = testKit.spawn(counter(new PersistenceId("c2"))); c2.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); c2.tell(Increment.instance); @@ -363,7 +364,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void handleTerminatedSignal() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c3", eventHandlerProbe.ref())); + ActorRef c = testKit.spawn(counter(new PersistenceId("c3"), eventHandlerProbe.ref())); c.tell(Increment.instance); c.tell(new IncrementLater()); eventHandlerProbe.expectMessage(Pair.create(emptyState, new Incremented(1))); @@ -373,7 +374,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void handleReceiveTimeout() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c4", eventHandlerProbe.ref())); + ActorRef c = testKit.spawn(counter(new PersistenceId("c4"), eventHandlerProbe.ref())); c.tell(new Increment100OnTimeout()); eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent)); } @@ -382,14 +383,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public void chainableSideEffectsWithEvents() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); TestProbe loggingProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c5", eventHandlerProbe.ref(), loggingProbe.ref())); + ActorRef c = testKit.spawn(counter(new PersistenceId("c5"), eventHandlerProbe.ref(), loggingProbe.ref())); c.tell(new EmptyEventsListAndThenLog()); loggingProbe.expectMessage(loggingOne); } @Test public void workWhenWrappedInOtherBehavior() { - Behavior behavior = Behaviors.supervise(counter("c6")).onFailure( + Behavior behavior = Behaviors.supervise(counter(new PersistenceId("c6"))).onFailure( SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1), Duration.ofSeconds(10), 0.1) ); @@ -404,7 +405,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void snapshot() { TestProbe> snapshotProbe = testKit.createTestProbe(); - Behavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); + Behavior snapshoter = counter(new PersistenceId("c11"), (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); ActorRef c = testKit.spawn(snapshoter); c.tell(Increment.instance); c.tell(Increment.instance); @@ -416,7 +417,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); TestProbe> eventProbe = testKit.createTestProbe(); - snapshoter = counter("c11", eventProbe.ref(), (s, e, l) -> s.value % 2 == 0); + snapshoter = counter(new PersistenceId("c11"), eventProbe.ref(), (s, e, l) -> s.value % 2 == 0); ActorRef c2 = testKit.spawn(snapshoter); // First 2 are snapshot eventProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1))); @@ -427,7 +428,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void stopThenLog() { TestProbe probe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("c12")); + ActorRef c = testKit.spawn(counter(new PersistenceId("c12"))); c.tell(new StopThenLog()); probe.expectTerminated(c, Duration.ofSeconds(1)); } @@ -449,7 +450,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { return target.apply(ctx, signal); } }; - ActorRef c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter("tap1")))); + ActorRef c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter(new PersistenceId("tap1"))))); c.tell(Increment.instance); interceptProbe.expectMessage(Increment.instance); signalProbe.expectNoMessage(); @@ -459,7 +460,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public void tagEvent() throws Exception { TestProbe> eventProbe = testKit.createTestProbe(); TestProbe stateProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("tagging", eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2"))); + ActorRef c = testKit.spawn(counter(new PersistenceId("tagging"), eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2"))); c.tell(new Increment()); c.tell(new GetValue(stateProbe.ref())); stateProbe.expectMessage(new State(1, Collections.singletonList(0))); @@ -475,7 +476,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public void transformEvent() throws Exception { TestProbe> eventProbe = testKit.createTestProbe(); TestProbe stateProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); + ActorRef c = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter())); c.tell(new Increment()); c.tell(new GetValue(stateProbe.ref())); @@ -487,7 +488,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1))) ), events); - ActorRef c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); + ActorRef c2 = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter())); c2.tell(new GetValue(stateProbe.ref())); stateProbe.expectMessage(new State(1, Collections.singletonList(0))); } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java index 7e9cf67d96..c298b6362a 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExample.java @@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -66,7 +67,7 @@ public class AccountExample extends PersistentBehavior context, String accountNumber) { - super(accountNumber); + super(new PersistenceId(accountNumber)); } @Override diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java index 10c684785e..4ea50eb805 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java @@ -8,6 +8,7 @@ import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.PersistentBehavior; @@ -25,7 +26,7 @@ public class BasicPersistentBehaviorTest { //#supervision public static class MyPersistentBehavior extends PersistentBehavior { - public MyPersistentBehavior(String persistenceId) { + public MyPersistentBehavior(PersistenceId persistenceId) { super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2)); } //#supervision @@ -64,12 +65,13 @@ public class BasicPersistentBehaviorTest { //#tagging } - static PersistentBehavior persistentBehavior = new MyPersistentBehavior("pid"); + static PersistentBehavior persistentBehavior = + new MyPersistentBehavior(new PersistenceId("pid")); //#structure //#wrapPersistentBehavior static Behavior debugAlwaysSnapshot = Behaviors.setup((context) -> { - return new MyPersistentBehavior("pid") { + return new MyPersistentBehavior(new PersistenceId("pid")) { @Override public boolean shouldSnapshot(State state, Event event, long sequenceNr) { context.getLog().info("Snapshot actor {} => state: {}", diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java index dc5b00ecb6..29ca49e790 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java @@ -9,6 +9,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -153,7 +154,7 @@ public class InDepthPersistentBehaviorTest { private final ActorContext ctx; - public BlogBehavior(String persistenceId, ActorContext ctx) { + public BlogBehavior(PersistenceId persistenceId, ActorContext ctx) { super(persistenceId); this.ctx = ctx; } @@ -238,7 +239,7 @@ public class InDepthPersistentBehaviorTest { //#behavior public static Behavior behavior(String entityId) { return Behaviors.setup(ctx -> - new BlogBehavior("Blog-" + entityId, ctx) + new BlogBehavior(new PersistenceId("Blog-" + entityId), ctx) ); } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java index 9ee5c88105..d325170fc9 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java @@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.PersistentBehavior; @@ -84,10 +85,10 @@ public class MovieWatchList extends PersistentBehavior behavior(String userId) { - return new MovieWatchList("movies-" + userId); + return new MovieWatchList(new PersistenceId("movies-" + userId)); } - public MovieWatchList(String persistenceId) { + public MovieWatchList(PersistenceId persistenceId) { super(persistenceId); } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java index d13a37eee7..48795ded06 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java @@ -6,6 +6,7 @@ package jdocs.akka.persistence.typed; import akka.Done; import akka.actor.typed.ActorRef; +import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -150,7 +151,7 @@ public class OptionalBlogState { .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); } - public BlogBehavior(String persistenceId) { + public BlogBehavior(PersistenceId persistenceId) { super(persistenceId); } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala index a0ee995dcc..ba80d79521 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/ManyRecoveriesSpec.scala @@ -27,7 +27,7 @@ object ManyRecoveriesSpec { probe: TestProbe[String], latch: Option[TestLatch]): PersistentBehavior[Cmd, Evt, String] = PersistentBehavior[Cmd, Evt, String]( - persistenceId = name, + persistenceId = PersistenceId(name), emptyState = "", commandHandler = CommandHandler.command { case Cmd(s) ⇒ Effect.persist(Evt(s)).thenRun(_ ⇒ probe.ref ! s"$name-$s") diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala index 07de9f0ffd..a13532ee30 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala @@ -14,10 +14,11 @@ import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecove import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior } import akka.testkit.EventFilter - import scala.concurrent.duration._ import scala.util.control.NoStackTrace + import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike object RecoveryPermitterSpec { @@ -44,7 +45,7 @@ object RecoveryPermitterSpec { eventProbe: TestProbe[Any], throwOnRecovery: Boolean = false): Behavior[Command] = PersistentBehavior[Command, Event, State]( - persistenceId = name, + persistenceId = PersistenceId(name), emptyState = EmptyState, commandHandler = CommandHandler.command { case StopActor ⇒ Effect.stop diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala index 1528c83349..b27ae810c4 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/OptionalSnapshotStoreSpec.scala @@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps import akka.event.Logging import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike object OptionalSnapshotStoreSpec { @@ -27,7 +28,7 @@ object OptionalSnapshotStoreSpec { probe: TestProbe[State], name: String = UUID.randomUUID().toString) = PersistentBehavior[Command, Event, State]( - persistenceId = name, + persistenceId = PersistenceId(name), emptyState = State(), commandHandler = CommandHandler.command { _ ⇒ Effect.persist(Event()).thenRun(probe.ref ! _) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 1318686962..6739b808be 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -12,9 +12,10 @@ import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler import akka.actor.testkit.typed.TE import akka.actor.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory - import scala.concurrent.duration._ + import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike object PerformanceSpec { @@ -62,7 +63,7 @@ object PerformanceSpec { Behaviors.supervise({ val parameters = Parameters() PersistentBehavior[Command, String, String]( - persistenceId = name, + persistenceId = PersistenceId(name), "", commandHandler = CommandHandler.command { case StopMeasure ⇒ Effect.none.thenRun(_ ⇒ probe.ref ! StopMeasure) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index e82791cea4..958c483bde 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -6,9 +6,11 @@ package akka.persistence.typed.scaladsl import scala.concurrent.ExecutionContext import scala.concurrent.duration._ + import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.TimerScheduler +import akka.persistence.typed.PersistenceId import akka.persistence.typed.SideEffect object PersistentActorCompileOnlyTest { @@ -44,7 +46,7 @@ object PersistentActorCompileOnlyTest { //#behavior val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] = PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState]( - persistenceId = "sample-id-1", + persistenceId = PersistenceId("sample-id-1"), emptyState = ExampleState(Nil), commandHandler = commandHandler, eventHandler = eventHandler) @@ -64,7 +66,7 @@ object PersistentActorCompileOnlyTest { case class ExampleState(events: List[String] = Nil) PersistentBehavior[MyCommand, MyEvent, ExampleState]( - persistenceId = "sample-id-1", + persistenceId = PersistenceId("sample-id-1"), emptyState = ExampleState(Nil), @@ -109,7 +111,7 @@ object PersistentActorCompileOnlyTest { val behavior: Behavior[Command] = Behaviors.setup(ctx ⇒ PersistentBehavior[Command, Event, EventsInFlight]( - persistenceId = "recovery-complete-id", + persistenceId = PersistenceId("recovery-complete-id"), emptyState = EventsInFlight(0, Map.empty), @@ -151,7 +153,7 @@ object PersistentActorCompileOnlyTest { case class MoodChanged(to: Mood) extends Event val b: Behavior[Command] = PersistentBehavior[Command, Event, Mood]( - persistenceId = "myPersistenceId", + persistenceId = PersistenceId("myPersistenceId"), emptyState = Happy, commandHandler = { (state, command) ⇒ state match { @@ -193,7 +195,7 @@ object PersistentActorCompileOnlyTest { case class State(tasksInFlight: List[Task]) PersistentBehavior[Command, Event, State]( - persistenceId = "asdf", + persistenceId = PersistenceId("asdf"), emptyState = State(Nil), commandHandler = CommandHandler.command { case RegisterTask(task) ⇒ Effect.persist(TaskRegistered(task)) @@ -221,7 +223,7 @@ object PersistentActorCompileOnlyTest { val behavior: Behavior[Command] = Behaviors.setup(ctx ⇒ PersistentBehavior[Command, Event, State]( - persistenceId = "asdf", + persistenceId = PersistenceId("asdf"), emptyState = State(Nil), commandHandler = (_, cmd) ⇒ cmd match { case RegisterTask(task) ⇒ @@ -284,7 +286,7 @@ object PersistentActorCompileOnlyTest { .thenRun(_ ⇒ metadataRegistry ! GetMetaData(id, adapt)) PersistentBehavior[Command, Event, List[Id]]( - persistenceId = "basket-1", + persistenceId = PersistenceId("basket-1"), emptyState = Nil, commandHandler = { (state, cmd) ⇒ if (isFullyHydrated(basket, state)) @@ -376,7 +378,7 @@ object PersistentActorCompileOnlyTest { } PersistentBehavior[Command, Event, Mood]( - persistenceId = "myPersistenceId", + persistenceId = PersistenceId("myPersistenceId"), emptyState = Sad, commandHandler, eventHandler) @@ -404,7 +406,7 @@ object PersistentActorCompileOnlyTest { } PersistentBehavior[Command, Event, State]( - persistenceId = "myPersistenceId", + persistenceId = PersistenceId("myPersistenceId"), emptyState = new State, commandHandler, eventHandler) @@ -416,7 +418,7 @@ object PersistentActorCompileOnlyTest { class Second extends State PersistentBehavior[String, String, State]( - persistenceId = "myPersistenceId", + persistenceId = PersistenceId("myPersistenceId"), emptyState = new First, commandHandler = CommandHandler.command { cmd ⇒ @@ -441,7 +443,7 @@ object PersistentActorCompileOnlyTest { val behavior: Behavior[String] = Behaviors.setup { ctx ⇒ PersistentBehavior[String, String, State]( - persistenceId = "myPersistenceId", + persistenceId = PersistenceId("myPersistenceId"), emptyState = new State, commandHandler = CommandHandler.command { cmd ⇒ diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala index 6d5e351ce5..872daac2b8 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala @@ -14,12 +14,13 @@ import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try +import akka.persistence.typed.PersistenceId + class ChaosJournal extends InmemJournal { var count = 0 var failRecovery = true @@ -66,7 +67,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent implicit val testSettings = TestKitSettings(system) - def failingPersistentActor(pid: String, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String]( + def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String]( pid, "", (_, cmd) ⇒ { probe.tell("persisting") @@ -83,7 +84,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent "A typed persistent actor (failures)" must { "restart with backoff" in { val probe = TestProbe[String]() - val behav = failingPersistentActor("fail-first-2", probe.ref) + val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref) val c = spawn(behav) probe.expectMessage("starting") // fail @@ -106,7 +107,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent "restart with backoff for recovery" in { val probe = TestProbe[String]() - val behav = failingPersistentActor("fail-recovery-once", probe.ref) + val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref) spawn(behav) // First time fails, second time should work and call onRecoveryComplete probe.expectMessage("starting") @@ -117,7 +118,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent val probe = TestProbe[String]() val behav = Behaviors.supervise( - failingPersistentActor("reject-first", probe.ref)).onFailure[EventRejectedException]( + failingPersistentActor(PersistenceId("reject-first"), probe.ref)).onFailure[EventRejectedException]( SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) val c = spawn(behav) // First time fails, second time should work and call onRecoveryComplete diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index bdaab7a8ac..8e2a562154 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -20,12 +20,13 @@ import akka.stream.scaladsl.Sink import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import com.typesafe.config.{ Config, ConfigFactory } - import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.{ Success, Try } + import akka.persistence.journal.inmem.InmemJournal +import akka.persistence.typed.PersistenceId import org.scalatest.WordSpecLike object PersistentBehaviorSpec { @@ -97,30 +98,30 @@ object PersistentBehaviorSpec { val firstLogging = "first logging" val secondLogging = "second logging" - def counter(persistenceId: String)(implicit system: ActorSystem[_]): Behavior[Command] = + def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] = Behaviors.setup(ctx ⇒ counter(ctx, persistenceId)) - def counter(persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] = + def counter(persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] = Behaviors.setup(ctx ⇒ counter(ctx, persistenceId, logging)) - def counter(ctx: ActorContext[Command], persistenceId: String)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = counter(ctx, persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) - def counter(ctx: ActorContext[Command], persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) - def counterWithProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe) - def counterWithProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref) - def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe) def counter( ctx: ActorContext[Command], - persistenceId: String, + persistenceId: PersistenceId, loggingActor: ActorRef[String], probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = { @@ -231,7 +232,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio LeveldbReadJournal.Identifier) val pidCounter = new AtomicInteger(0) - private def nextPid(): String = s"c${pidCounter.incrementAndGet()}" + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") "A typed persistent actor" must { @@ -508,7 +509,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio replyProbe.expectMessage(State(1, Vector(0))) val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Incremented(1))) + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1))) } "adapt events" in { @@ -526,8 +527,8 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) - val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) val c2 = spawn(Behaviors.setup[Command](ctx ⇒ counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]) @@ -548,10 +549,10 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(2, Vector(0, 1))) - val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue events shouldEqual List( - EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))), - EventEnvelope(Sequence(2), pid, 2, Wrapper(Incremented(1))) + EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))), + EventEnvelope(Sequence(2), pid.id, 2, Wrapper(Incremented(1))) ) val c2 = spawn(Behaviors.setup[Command](ctx ⇒ @@ -574,8 +575,8 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) - val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) val c2 = spawn(Behaviors.setup[Command](ctx ⇒ counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])) @@ -584,7 +585,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio replyProbe.expectMessage(State(1, Vector(0))) val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue - taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1)))) } "handle scheduled message arriving before recovery completed " in { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala index 3276627bd2..2587fbbc40 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala @@ -5,6 +5,7 @@ package docs.akka.persistence.typed import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.PersistentBehavior import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler @@ -90,7 +91,7 @@ object AccountExample1 { def behavior(accountNumber: String): Behavior[AccountCommand] = PersistentBehavior[AccountCommand, AccountEvent, Option[Account]]( - persistenceId = accountNumber, + persistenceId = PersistenceId(s"Account-$accountNumber"), emptyState = None, commandHandler = commandHandler, eventHandler = eventHandler diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala index b0766684d4..55d9ad0a6b 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala @@ -5,6 +5,7 @@ package docs.akka.persistence.typed import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.PersistentBehavior import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler @@ -93,7 +94,7 @@ object AccountExample2 { def behavior(accountNumber: String): Behavior[AccountCommand] = PersistentBehavior[AccountCommand, AccountEvent, Account]( - persistenceId = accountNumber, + persistenceId = PersistenceId(s"Account-$accountNumber"), emptyState = EmptyAccount, commandHandler = commandHandler, eventHandler = eventHandler diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 50b5a32cde..e2696995ed 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -8,9 +8,10 @@ import akka.actor.typed.ActorRef import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.PersistentBehavior - import scala.concurrent.duration._ +import akka.persistence.typed.PersistenceId + object BasicPersistentBehaviorCompileOnly { //#structure @@ -20,7 +21,7 @@ object BasicPersistentBehaviorCompileOnly { val behavior: Behavior[Command] = PersistentBehavior[Command, Event, State]( - persistenceId = "abc", + persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) ⇒ @@ -37,7 +38,7 @@ object BasicPersistentBehaviorCompileOnly { //#recovery val recoveryBehavior: Behavior[Command] = PersistentBehavior[Command, Event, State]( - persistenceId = "abc", + persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) ⇒ @@ -53,7 +54,7 @@ object BasicPersistentBehaviorCompileOnly { //#tagging val taggingBehavior: Behavior[Command] = PersistentBehavior[Command, Event, State]( - persistenceId = "abc", + persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) ⇒ @@ -66,7 +67,7 @@ object BasicPersistentBehaviorCompileOnly { //#wrapPersistentBehavior val samplePersistentBehavior = PersistentBehavior[Command, Event, State]( - persistenceId = "abc", + persistenceId = PersistenceId("abc"), emptyState = State(), commandHandler = (state, cmd) ⇒ diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala index 23948d50b0..b7e4895752 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala @@ -6,6 +6,7 @@ package docs.akka.persistence.typed import akka.Done import akka.actor.typed.{ ActorRef, Behavior } +import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.PersistentBehavior import akka.persistence.typed.scaladsl.Effect @@ -120,7 +121,7 @@ object InDepthPersistentBehaviorSpec { //#behavior def behavior(entityId: String): Behavior[BlogCommand] = PersistentBehavior[BlogCommand, BlogEvent, BlogState]( - persistenceId = "Blog-" + entityId, + persistenceId = PersistenceId(s"Blog-$entityId"), emptyState = BlogState.empty, commandHandler, eventHandler) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala index 56fbdd22ab..d0b806c58e 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala @@ -6,6 +6,7 @@ package docs.akka.persistence.typed import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.PersistentBehavior import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler @@ -44,7 +45,7 @@ object MovieWatchList { def behavior(userId: String): Behavior[Command] = { PersistentBehavior[Command, Event, MovieList]( - persistenceId = "movies-" + userId, + persistenceId = PersistenceId(s"movies-$userId"), emptyState = MovieList(Set.empty), commandHandler, eventHandler = (state, event) ⇒ state.applyEvent(event)