PersistenceId type to differentiate between persistenceId and entityId, #25703 (#25704)

* PersistenceId type to differentiate between persistenceId and entityId, #25703

* both entityId (for sharding) and persistenceId as String types was easy
  mix-up
* utility method in EntityTypeKey to concatenaty the type and entityId to
  a unique persistenceId

* support custom separator to enable compatilbility with Lagom's javadsl
This commit is contained in:
Patrik Nordwall 2018-10-17 13:53:50 +02:00 committed by GitHub
parent 570896f22a
commit 4131036a12
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 295 additions and 115 deletions

View file

@ -36,6 +36,7 @@ import akka.event.LoggingAdapter
import akka.japi.function.{ Function JFunction }
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.persistence.typed.PersistenceId
import akka.util.ByteString
import akka.util.Timeout
@ -75,8 +76,38 @@ import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String)
@InternalApi private[akka] object EntityTypeKeyImpl {
/**
* Default separator character used for concatenating EntityTypeKey with entityId to construct unique persistenceId.
* This must be same as in Lagom's `scaladsl.PersistentEntity`, for compatibility. No separator is used
* in Lagom's `javadsl.PersistentEntity` so for compatibility with that the `""` separator must be defined
* `withEntityIdSeparator`.
*/
val EntityIdSeparator = "|"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class EntityTypeKeyImpl[T](name: String, messageClassName: String,
entityIdSeparator: String = EntityTypeKeyImpl.EntityIdSeparator)
extends javadsl.EntityTypeKey[T] with scaladsl.EntityTypeKey[T] {
if (!entityIdSeparator.isEmpty && name.contains(entityIdSeparator))
throw new IllegalArgumentException(s"EntityTypeKey.name [$name] contains [$entityIdSeparator] which is " +
"a reserved character")
override def persistenceIdFrom(entityId: String): PersistenceId = {
if (!entityIdSeparator.isEmpty && entityId.contains(entityIdSeparator))
throw new IllegalArgumentException(s"entityId [$entityId] contains [$entityIdSeparator] which is " +
"a reserved character")
PersistenceId(name + entityIdSeparator + entityId)
}
override def withEntityIdSeparator(separator: String): EntityTypeKeyImpl[T] =
EntityTypeKeyImpl[T](name, messageClassName, separator)
override def toString: String = s"EntityTypeKey[$messageClassName]($name)"
}

View file

@ -20,6 +20,7 @@ import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.japi.function.{ Function JFunction }
import akka.persistence.typed.PersistenceId
import akka.util.Timeout
@FunctionalInterface
@ -304,12 +305,37 @@ object StartEntity {
* Not for user extension.
*/
@DoNotInherit abstract class EntityTypeKey[T] { scaladslSelf: scaladsl.EntityTypeKey[T]
/**
* Name of the entity type.
*/
def name: String
/**
* INTERNAL API
*/
@InternalApi private[akka] def asScala: scaladsl.EntityTypeKey[T] = scaladslSelf
/**
* Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by
* concatenating them with `|` separator.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]].
*/
def persistenceIdFrom(entityId: String): PersistenceId
/**
* Specify a custom separator for compatibility with old naming conventions. The separator is used between the
* `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]].
*
* The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator here.
*/
def withEntityIdSeparator(separator: String): EntityTypeKey[T]
}
object EntityTypeKey {

View file

@ -25,6 +25,7 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.persistence.typed.PersistenceId
object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -308,7 +309,31 @@ object StartEntity {
* Not for user extension.
*/
@DoNotInherit trait EntityTypeKey[T] {
/**
* Name of the entity type.
*/
def name: String
/**
* Constructs a [[PersistenceId]] from this `EntityTypeKey` and the given `entityId` by
* concatenating them with `|` separator.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator in [[EntityTypeKey.withEntityIdSeparator]].
*/
def persistenceIdFrom(entityId: String): PersistenceId
/**
* Specify a custom separator for compatibility with old naming conventions. The separator is used between the
* `EntityTypeKey` and the `entityId` when constructing a `persistenceId` with [[EntityTypeKey.persistenceIdFrom]].
*
* The default `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator here.
*/
def withEntityIdSeparator(separator: String): EntityTypeKey[T]
}
object EntityTypeKey {

View file

@ -13,6 +13,7 @@ import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.{ WordSpec, WordSpecLike }
@ -43,9 +44,11 @@ object ClusterShardingPersistenceSpec {
final case class Get(replyTo: ActorRef[String]) extends Command
final case object StopPlz extends Command
val typeKey = EntityTypeKey[Command]("test")
def persistentActor(entityId: String): Behavior[Command] =
PersistentBehavior[Command, String, String](
entityId,
persistenceId = typeKey.persistenceIdFrom(entityId),
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s) Effect.persist(s)
@ -56,8 +59,6 @@ object ClusterShardingPersistenceSpec {
},
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
val typeKey = EntityTypeKey[Command]("test")
}
class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike {

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.persistence.typed.PersistenceId
import org.scalatest.Matchers
import org.scalatest.WordSpec
class EntityTypeKeySpec extends WordSpec with Matchers {
"EntityTypeKey" must {
"use | as default entityIdSeparator for compatibility with Lagom's scaladsl" in {
EntityTypeKey[String]("MyType").persistenceIdFrom("abc") should ===(PersistenceId("MyType|abc"))
}
"support custom entityIdSeparator for compatibility with Lagom's javadsl" in {
EntityTypeKey[String]("MyType").withEntityIdSeparator("")
.persistenceIdFrom("abc") should ===(PersistenceId("MyTypeabc"))
}
"support custom entityIdSeparator for compatibility with other naming" in {
EntityTypeKey[String]("MyType").withEntityIdSeparator("#/#")
.persistenceIdFrom("abc") should ===(PersistenceId("MyType#/#abc"))
}
"not allow | in name because it's the default entityIdSeparator" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("Invalid | name")
}
}
"not allow custom separator in name" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("Invalid name").withEntityIdSeparator(" ")
}
}
"not allow | in entityId because it's the default entityIdSeparator" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("SomeType").persistenceIdFrom("A|B")
}
}
"not allow custom separator in entityId" in {
intercept[IllegalArgumentException] {
EntityTypeKey[String]("SomeType").withEntityIdSeparator("#").persistenceIdFrom("A#B")
}
}
}
}

View file

@ -8,6 +8,7 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.{ ActorRef, Behavior, Props }
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
@ -37,7 +38,7 @@ object ClusterSingletonPersistenceSpec {
val persistentActor: Behavior[Command] =
PersistentBehavior[Command, String, String](
persistenceId = "TheSingleton",
persistenceId = PersistenceId("TheSingleton"),
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s) Effect.persist(s)

View file

@ -7,5 +7,5 @@ package akka.persistence.typed
/**
* Thrown if a journal rejects an event e.g. due to a serialization error.
*/
final class EventRejectedException(persistenceId: String, sequenceNr: Long, cause: Throwable)
extends RuntimeException(s"PersistenceId $persistenceId sequenceNr: $sequenceNr", cause)
final class EventRejectedException(persistenceId: PersistenceId, sequenceNr: Long, cause: Throwable)
extends RuntimeException(s"Rejected event, persistenceId [${persistenceId.id}], sequenceNr [$sequenceNr]", cause)

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Unique identifier in the backend data store (journal and snapshot store) of the
* persistent actor.
*/
final case class PersistenceId(id: String)

View file

@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence.typed.PersistenceId
/** INTERNAL API */
@InternalApi
@ -36,9 +37,9 @@ private[akka] object EventsourcedBehavior {
val PersistingEvents = "persist-evts"
// format: ON
def create(persistenceId: String, phaseName: String): Map[String, Any] = {
def create(persistenceId: PersistenceId, phaseName: String): Map[String, Any] = {
Map(
"persistenceId" persistenceId,
"persistenceId" persistenceId.id,
"phase" phaseName
)
}

View file

@ -34,7 +34,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped
@ -56,7 +56,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
@ -71,7 +71,7 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
protected def replayEvents(fromSeqNr: Long, toSeqNr: Long): Unit = {
setup.log.debug("Replaying messages: from: {}, to: {}", fromSeqNr, toSeqNr)
setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId, setup.selfUntyped)
setup.journal ! ReplayMessages(fromSeqNr, toSeqNr, setup.recovery.replayMax, setup.persistenceId.id, setup.selfUntyped)
}
protected def requestRecoveryPermit(): Unit = {
@ -104,11 +104,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
* to the running [[PersistentActor]].
*/
protected def loadSnapshot(criteria: SnapshotSelectionCriteria, toSequenceNr: Long): Unit = {
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId, criteria, toSequenceNr), setup.selfUntyped)
setup.snapshotStore.tell(LoadSnapshot(setup.persistenceId.id, criteria, toSequenceNr), setup.selfUntyped)
}
protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = {
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId, state.seqNr), state.state), setup.selfUntyped)
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), setup.selfUntyped)
}
}

View file

@ -144,9 +144,10 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
val msg = message match {
case Some(evt)
s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. PersistenceId: [${setup.persistence}]"
s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. " +
s"PersistenceId [${setup.persistenceId.id}]"
case None
s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId: [${setup.persistenceId}]"
s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId [${setup.persistenceId.id}]"
}
throw new JournalFailureException(msg, cause)

View file

@ -21,6 +21,7 @@ import akka.util.OptionVal
import scala.util.Try
import akka.actor.Cancellable
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.RecoveryTickEvent
/**
@ -29,7 +30,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.Rec
@InternalApi
private[persistence] final class EventsourcedSetup[C, E, S](
val context: ActorContext[InternalProtocol],
val persistenceId: String,
val persistenceId: PersistenceId,
val emptyState: S,
val commandHandler: PersistentBehavior.CommandHandler[C, E, S],
val eventHandler: PersistentBehavior.EventHandler[S, E],

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.typed.PersistenceId
/**
* INTERNAL API
@ -13,6 +14,6 @@ import akka.annotation.InternalApi
*/
@InternalApi
final private[akka] class JournalFailureException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
def this(persistenceId: String, sequenceNr: Long, eventType: String, cause: Throwable) =
this(s"Failed to persist event type $eventType with sequence number $sequenceNr for persistenceId [$persistenceId]", cause)
def this(persistenceId: PersistenceId, sequenceNr: Long, eventType: String, cause: Throwable) =
this(s"Failed to persist event type [$eventType] with sequence number [$sequenceNr] for persistenceId [${persistenceId.id}]", cause)
}

View file

@ -14,9 +14,10 @@ import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl._
import akka.util.ConstantFun
import scala.util.{ Failure, Success, Try }
import akka.persistence.typed.PersistenceId
@InternalApi
private[akka] object PersistentBehaviorImpl {
@ -32,7 +33,7 @@ private[akka] object PersistentBehaviorImpl {
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String,
persistenceId: PersistenceId,
emptyState: State,
commandHandler: PersistentBehavior.CommandHandler[Command, Event, State],
eventHandler: PersistentBehavior.EventHandler[State, Event],

View file

@ -18,13 +18,13 @@ import scala.util.{ Failure, Success }
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
def this(persistenceId: String) = {
def this(persistenceId: PersistenceId) = {
this(persistenceId, None)
}
def this(persistenceId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Some(backoffSupervisorStrategy))
}

View file

@ -11,9 +11,10 @@ import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal._
import scala.util.Try
import akka.persistence.typed.PersistenceId
object PersistentBehavior {
/**
@ -38,7 +39,7 @@ object PersistentBehavior {
* Create a `Behavior` for a persistent actor.
*/
def apply[Command, Event, State](
persistenceId: String,
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) Effect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =

View file

@ -11,6 +11,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.SideEffect;
import akka.util.Timeout;
@ -91,7 +92,9 @@ public class PersistentActorCompileOnlyTest {
//#behavior
public static PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState> pb = new PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState>("p1") {
public static PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState> pb =
new PersistentBehavior<SimpleCommand, SimpleEvent, SimpleState>(new PersistenceId("p1")) {
@Override
public SimpleState emptyState() {
return new SimpleState();
@ -154,11 +157,14 @@ public class PersistentActorCompileOnlyTest {
//#commonChainedEffects
// Factored out Chained effect
static final SideEffect<ExampleState> commonChainedEffect = SideEffect.create(s -> System.out.println("Command handled!"));
static final SideEffect<ExampleState> commonChainedEffect =
SideEffect.create(s -> System.out.println("Command handled!"));
//#commonChainedEffects
private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa = new PersistentBehavior<MyCommand, MyEvent, ExampleState>("pa") {
private PersistentBehavior<MyCommand, MyEvent, ExampleState> pa =
new PersistentBehavior<MyCommand, MyEvent, ExampleState>(new PersistenceId("pa")) {
@Override
public ExampleState emptyState() {
return new ExampleState();
@ -269,7 +275,7 @@ public class PersistentActorCompileOnlyTest {
}
// #actor-context
public Behavior<Command> behavior(String persistenceId) {
public Behavior<Command> behavior(PersistenceId persistenceId) {
return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx));
}
@ -281,7 +287,7 @@ public class PersistentActorCompileOnlyTest {
// this makes the context available to the command handler etc.
private final ActorContext<Command> ctx;
public MyPersistentBehavior(String persistenceId, ActorContext<Command> ctx) {
public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) {
super(persistenceId);
this.ctx = ctx;
}

View file

@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
@ -24,7 +25,7 @@ class FailingPersistentActor extends PersistentBehavior<String, String, String>
private final ActorRef<String> probe;
FailingPersistentActor(String persistenceId, ActorRef<String> probe) {
FailingPersistentActor(PersistenceId persistenceId, ActorRef<String> probe) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.probe = probe;
}
@ -63,14 +64,14 @@ public class PersistentActorFailureTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
public static Behavior<String> fail(String pid, ActorRef<String> probe) {
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe) {
return new FailingPersistentActor(pid, probe);
}
@Test
public void persistEvents() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<String> p1 = fail("fail-first-2", probe.ref());
Behavior<String> p1 = fail(new PersistenceId("fail-first-2"), probe.ref());
ActorRef<String> c = testKit.spawn(p1);
probe.expectMessage("starting");
// fail

View file

@ -18,6 +18,7 @@ import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.EventAdapter;
import akka.persistence.typed.NoOpEventAdapter;
import akka.persistence.typed.PersistenceId;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
@ -170,13 +171,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
private static String loggingOne = "one";
private Behavior<Command> counter(String persistenceId, ActorRef<Pair<State, Incremented>> probe) {
private Behavior<Command> counter(PersistenceId persistenceId, ActorRef<Pair<State, Incremented>> probe) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>());
}
private Behavior<Command> counter(String persistenceId,
private Behavior<Command> counter(PersistenceId persistenceId,
ActorRef<Pair<State, Incremented>> probe,
Function<Incremented, Set<String>> tagger) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
@ -184,7 +185,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>());
}
private Behavior<Command> counter(String persistenceId,
private Behavior<Command> counter(PersistenceId persistenceId,
ActorRef<Pair<State, Incremented>> probe,
EventAdapter<Incremented, ?> transformer) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
@ -192,7 +193,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer);
}
private Behavior<Command> counter(String persistenceId) {
private Behavior<Command> counter(PersistenceId persistenceId) {
return counter(persistenceId,
TestProbe.<Pair<State, Incremented>>create(testKit.system()).ref(),
TestProbe.<String>create(testKit.system()).ref(),
@ -204,7 +205,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
private Behavior<Command> counter(
String persistenceId,
PersistenceId persistenceId,
Function3<State, Incremented, Long, Boolean> snapshot,
ActorRef<Optional<Throwable>> snapshotProbe
) {
@ -218,7 +219,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
private Behavior<Command> counter(
String persistentId,
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe) {
return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(),
@ -228,7 +229,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
private Behavior<Command> counter(
String persistentId,
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
Function3<State, Incremented, Long, Boolean> snapshot) {
return counter(persistentId, eventProbe, testKit.<String>createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(),
@ -237,7 +238,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
private static <A> Behavior<Command> counter(
String persistentId,
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe,
Function3<State, Incremented, Long, Boolean> snapshot,
@ -335,7 +336,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void persistEvents() {
ActorRef<Command> c = testKit.spawn(counter("c1"));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c1")));
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.instance);
c.tell(new GetValue(probe.ref()));
@ -344,7 +345,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void replyStoredEvents() {
ActorRef<Command> c = testKit.spawn(counter("c2"));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c2")));
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.instance);
c.tell(Increment.instance);
@ -352,7 +353,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
ActorRef<Command> c2 = testKit.spawn(counter("c2"));
ActorRef<Command> c2 = testKit.spawn(counter(new PersistenceId("c2")));
c2.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
c2.tell(Increment.instance);
@ -363,7 +364,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void handleTerminatedSignal() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("c3", eventHandlerProbe.ref()));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c3"), eventHandlerProbe.ref()));
c.tell(Increment.instance);
c.tell(new IncrementLater());
eventHandlerProbe.expectMessage(Pair.create(emptyState, new Incremented(1)));
@ -373,7 +374,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void handleReceiveTimeout() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("c4", eventHandlerProbe.ref()));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c4"), eventHandlerProbe.ref()));
c.tell(new Increment100OnTimeout());
eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent));
}
@ -382,14 +383,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public void chainableSideEffectsWithEvents() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
TestProbe<String> loggingProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("c5", eventHandlerProbe.ref(), loggingProbe.ref()));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c5"), eventHandlerProbe.ref(), loggingProbe.ref()));
c.tell(new EmptyEventsListAndThenLog());
loggingProbe.expectMessage(loggingOne);
}
@Test
public void workWhenWrappedInOtherBehavior() {
Behavior<Command> behavior = Behaviors.supervise(counter("c6")).onFailure(
Behavior<Command> behavior = Behaviors.supervise(counter(new PersistenceId("c6"))).onFailure(
SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1),
Duration.ofSeconds(10), 0.1)
);
@ -404,7 +405,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void snapshot() {
TestProbe<Optional<Throwable>> snapshotProbe = testKit.createTestProbe();
Behavior<Command> snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
Behavior<Command> snapshoter = counter(new PersistenceId("c11"), (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
ActorRef<Command> c = testKit.spawn(snapshoter);
c.tell(Increment.instance);
c.tell(Increment.instance);
@ -416,7 +417,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
snapshoter = counter("c11", eventProbe.ref(), (s, e, l) -> s.value % 2 == 0);
snapshoter = counter(new PersistenceId("c11"), eventProbe.ref(), (s, e, l) -> s.value % 2 == 0);
ActorRef<Command> c2 = testKit.spawn(snapshoter);
// First 2 are snapshot
eventProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1)));
@ -427,7 +428,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void stopThenLog() {
TestProbe<State> probe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("c12"));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c12")));
c.tell(new StopThenLog());
probe.expectTerminated(c, Duration.ofSeconds(1));
}
@ -449,7 +450,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
return target.apply(ctx, signal);
}
};
ActorRef<Command> c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter("tap1"))));
ActorRef<Command> c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter(new PersistenceId("tap1")))));
c.tell(Increment.instance);
interceptProbe.expectMessage(Increment.instance);
signalProbe.expectNoMessage();
@ -459,7 +460,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public void tagEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("tagging", eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2")));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("tagging"), eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2")));
c.tell(new Increment());
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
@ -475,7 +476,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public void transformEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter()));
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter()));
c.tell(new Increment());
c.tell(new GetValue(stateProbe.ref()));
@ -487,7 +488,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1)))
), events);
ActorRef<Command> c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter()));
ActorRef<Command> c2 = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter()));
c2.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
}

View file

@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
@ -66,7 +67,7 @@ public class AccountExample extends PersistentBehavior<AccountExample.AccountCom
}
public AccountExample(ActorContext<AccountCommand> context, String accountNumber) {
super(accountNumber);
super(new PersistenceId(accountNumber));
}
@Override

View file

@ -8,6 +8,7 @@ import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
@ -25,7 +26,7 @@ public class BasicPersistentBehaviorTest {
//#supervision
public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> {
public MyPersistentBehavior(String persistenceId) {
public MyPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
}
//#supervision
@ -64,12 +65,13 @@ public class BasicPersistentBehaviorTest {
//#tagging
}
static PersistentBehavior<Command, Event, State> persistentBehavior = new MyPersistentBehavior("pid");
static PersistentBehavior<Command, Event, State> persistentBehavior =
new MyPersistentBehavior(new PersistenceId("pid"));
//#structure
//#wrapPersistentBehavior
static Behavior<Command> debugAlwaysSnapshot = Behaviors.setup((context) -> {
return new MyPersistentBehavior("pid") {
return new MyPersistentBehavior(new PersistenceId("pid")) {
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
context.getLog().info("Snapshot actor {} => state: {}",

View file

@ -9,6 +9,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
@ -153,7 +154,7 @@ public class InDepthPersistentBehaviorTest {
private final ActorContext<BlogCommand> ctx;
public BlogBehavior(String persistenceId, ActorContext<BlogCommand> ctx) {
public BlogBehavior(PersistenceId persistenceId, ActorContext<BlogCommand> ctx) {
super(persistenceId);
this.ctx = ctx;
}
@ -238,7 +239,7 @@ public class InDepthPersistentBehaviorTest {
//#behavior
public static Behavior<BlogCommand> behavior(String entityId) {
return Behaviors.setup(ctx ->
new BlogBehavior("Blog-" + entityId, ctx)
new BlogBehavior(new PersistenceId("Blog-" + entityId), ctx)
);
}

View file

@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
@ -84,10 +85,10 @@ public class MovieWatchList extends PersistentBehavior<MovieWatchList.Command, M
}
public static Behavior<Command> behavior(String userId) {
return new MovieWatchList("movies-" + userId);
return new MovieWatchList(new PersistenceId("movies-" + userId));
}
public MovieWatchList(String persistenceId) {
public MovieWatchList(PersistenceId persistenceId) {
super(persistenceId);
}

View file

@ -6,6 +6,7 @@ package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
@ -150,7 +151,7 @@ public class OptionalBlogState {
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
}
public BlogBehavior(String persistenceId) {
public BlogBehavior(PersistenceId persistenceId) {
super(persistenceId);
}

View file

@ -27,7 +27,7 @@ object ManyRecoveriesSpec {
probe: TestProbe[String],
latch: Option[TestLatch]): PersistentBehavior[Cmd, Evt, String] =
PersistentBehavior[Cmd, Evt, String](
persistenceId = name,
persistenceId = PersistenceId(name),
emptyState = "",
commandHandler = CommandHandler.command {
case Cmd(s) Effect.persist(Evt(s)).thenRun(_ probe.ref ! s"$name-$s")

View file

@ -14,10 +14,11 @@ import akka.persistence.RecoveryPermitter.{ RecoveryPermitGranted, RequestRecove
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.testkit.EventFilter
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
object RecoveryPermitterSpec {
@ -44,7 +45,7 @@ object RecoveryPermitterSpec {
eventProbe: TestProbe[Any],
throwOnRecovery: Boolean = false): Behavior[Command] =
PersistentBehavior[Command, Event, State](
persistenceId = name,
persistenceId = PersistenceId(name),
emptyState = EmptyState,
commandHandler = CommandHandler.command {
case StopActor Effect.stop

View file

@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.adapter.{ TypedActorRefOps, TypedActorSystemOps
import akka.event.Logging
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
object OptionalSnapshotStoreSpec {
@ -27,7 +28,7 @@ object OptionalSnapshotStoreSpec {
probe: TestProbe[State],
name: String = UUID.randomUUID().toString) =
PersistentBehavior[Command, Event, State](
persistenceId = name,
persistenceId = PersistenceId(name),
emptyState = State(),
commandHandler = CommandHandler.command {
_ Effect.persist(Event()).thenRun(probe.ref ! _)

View file

@ -12,9 +12,10 @@ import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
import akka.actor.testkit.typed.TE
import akka.actor.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
object PerformanceSpec {
@ -62,7 +63,7 @@ object PerformanceSpec {
Behaviors.supervise({
val parameters = Parameters()
PersistentBehavior[Command, String, String](
persistenceId = name,
persistenceId = PersistenceId(name),
"",
commandHandler = CommandHandler.command {
case StopMeasure Effect.none.thenRun(_ probe.ref ! StopMeasure)

View file

@ -6,9 +6,11 @@ package akka.persistence.typed.scaladsl
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SideEffect
object PersistentActorCompileOnlyTest {
@ -44,7 +46,7 @@ object PersistentActorCompileOnlyTest {
//#behavior
val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] =
PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState](
persistenceId = "sample-id-1",
persistenceId = PersistenceId("sample-id-1"),
emptyState = ExampleState(Nil),
commandHandler = commandHandler,
eventHandler = eventHandler)
@ -64,7 +66,7 @@ object PersistentActorCompileOnlyTest {
case class ExampleState(events: List[String] = Nil)
PersistentBehavior[MyCommand, MyEvent, ExampleState](
persistenceId = "sample-id-1",
persistenceId = PersistenceId("sample-id-1"),
emptyState = ExampleState(Nil),
@ -109,7 +111,7 @@ object PersistentActorCompileOnlyTest {
val behavior: Behavior[Command] = Behaviors.setup(ctx
PersistentBehavior[Command, Event, EventsInFlight](
persistenceId = "recovery-complete-id",
persistenceId = PersistenceId("recovery-complete-id"),
emptyState = EventsInFlight(0, Map.empty),
@ -151,7 +153,7 @@ object PersistentActorCompileOnlyTest {
case class MoodChanged(to: Mood) extends Event
val b: Behavior[Command] = PersistentBehavior[Command, Event, Mood](
persistenceId = "myPersistenceId",
persistenceId = PersistenceId("myPersistenceId"),
emptyState = Happy,
commandHandler = { (state, command)
state match {
@ -193,7 +195,7 @@ object PersistentActorCompileOnlyTest {
case class State(tasksInFlight: List[Task])
PersistentBehavior[Command, Event, State](
persistenceId = "asdf",
persistenceId = PersistenceId("asdf"),
emptyState = State(Nil),
commandHandler = CommandHandler.command {
case RegisterTask(task) Effect.persist(TaskRegistered(task))
@ -221,7 +223,7 @@ object PersistentActorCompileOnlyTest {
val behavior: Behavior[Command] = Behaviors.setup(ctx
PersistentBehavior[Command, Event, State](
persistenceId = "asdf",
persistenceId = PersistenceId("asdf"),
emptyState = State(Nil),
commandHandler = (_, cmd) cmd match {
case RegisterTask(task)
@ -284,7 +286,7 @@ object PersistentActorCompileOnlyTest {
.thenRun(_ metadataRegistry ! GetMetaData(id, adapt))
PersistentBehavior[Command, Event, List[Id]](
persistenceId = "basket-1",
persistenceId = PersistenceId("basket-1"),
emptyState = Nil,
commandHandler = { (state, cmd)
if (isFullyHydrated(basket, state))
@ -376,7 +378,7 @@ object PersistentActorCompileOnlyTest {
}
PersistentBehavior[Command, Event, Mood](
persistenceId = "myPersistenceId",
persistenceId = PersistenceId("myPersistenceId"),
emptyState = Sad,
commandHandler,
eventHandler)
@ -404,7 +406,7 @@ object PersistentActorCompileOnlyTest {
}
PersistentBehavior[Command, Event, State](
persistenceId = "myPersistenceId",
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new State,
commandHandler,
eventHandler)
@ -416,7 +418,7 @@ object PersistentActorCompileOnlyTest {
class Second extends State
PersistentBehavior[String, String, State](
persistenceId = "myPersistenceId",
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new First,
commandHandler = CommandHandler.command {
cmd
@ -441,7 +443,7 @@ object PersistentActorCompileOnlyTest {
val behavior: Behavior[String] =
Behaviors.setup { ctx
PersistentBehavior[String, String, State](
persistenceId = "myPersistenceId",
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new State,
commandHandler = CommandHandler.command {
cmd

View file

@ -14,12 +14,13 @@ import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import akka.persistence.typed.PersistenceId
class ChaosJournal extends InmemJournal {
var count = 0
var failRecovery = true
@ -66,7 +67,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
implicit val testSettings = TestKitSettings(system)
def failingPersistentActor(pid: String, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String](
def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String](
pid, "",
(_, cmd) {
probe.tell("persisting")
@ -83,7 +84,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
"A typed persistent actor (failures)" must {
"restart with backoff" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor("fail-first-2", probe.ref)
val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref)
val c = spawn(behav)
probe.expectMessage("starting")
// fail
@ -106,7 +107,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
"restart with backoff for recovery" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor("fail-recovery-once", probe.ref)
val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref)
spawn(behav)
// First time fails, second time should work and call onRecoveryComplete
probe.expectMessage("starting")
@ -117,7 +118,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
val probe = TestProbe[String]()
val behav =
Behaviors.supervise(
failingPersistentActor("reject-first", probe.ref)).onFailure[EventRejectedException](
failingPersistentActor(PersistenceId("reject-first"), probe.ref)).onFailure[EventRejectedException](
SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
val c = spawn(behav)
// First time fails, second time should work and call onRecoveryComplete

View file

@ -20,12 +20,13 @@ import akka.stream.scaladsl.Sink
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{ Success, Try }
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
object PersistentBehaviorSpec {
@ -97,30 +98,30 @@ object PersistentBehaviorSpec {
val firstLogging = "first logging"
val secondLogging = "second logging"
def counter(persistenceId: String)(implicit system: ActorSystem[_]): Behavior[Command] =
def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command] =
Behaviors.setup(ctx counter(ctx, persistenceId))
def counter(persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] =
def counter(persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): Behavior[Command] =
Behaviors.setup(ctx counter(ctx, persistenceId, logging))
def counter(ctx: ActorContext[Command], persistenceId: String)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(ctx, persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counter(ctx: ActorContext[Command], persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counter(ctx: ActorContext[Command], persistenceId: PersistenceId, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(ctx, persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, snapshotProbe)
def counterWithProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref)
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: String, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
def counterWithSnapshotProbe(ctx: ActorContext[Command], persistenceId: PersistenceId, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(ctx, persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe)
def counter(
ctx: ActorContext[Command],
persistenceId: String,
persistenceId: PersistenceId,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = {
@ -231,7 +232,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
LeveldbReadJournal.Identifier)
val pidCounter = new AtomicInteger(0)
private def nextPid(): String = s"c${pidCounter.incrementAndGet()}"
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
"A typed persistent actor" must {
@ -508,7 +509,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Incremented(1)))
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1)))
}
"adapt events" in {
@ -526,8 +527,8 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
val c2 = spawn(Behaviors.setup[Command](ctx
counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])
@ -548,10 +549,10 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(
EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))),
EventEnvelope(Sequence(2), pid, 2, Wrapper(Incremented(1)))
EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))),
EventEnvelope(Sequence(2), pid.id, 2, Wrapper(Incremented(1)))
)
val c2 = spawn(Behaviors.setup[Command](ctx
@ -574,8 +575,8 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
val c2 = spawn(Behaviors.setup[Command](ctx
counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event]))
@ -584,7 +585,7 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
replyProbe.expectMessage(State(1, Vector(0)))
val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
}
"handle scheduled message arriving before recovery completed " in {

View file

@ -5,6 +5,7 @@
package docs.akka.persistence.typed
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
@ -90,7 +91,7 @@ object AccountExample1 {
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehavior[AccountCommand, AccountEvent, Option[Account]](
persistenceId = accountNumber,
persistenceId = PersistenceId(s"Account-$accountNumber"),
emptyState = None,
commandHandler = commandHandler,
eventHandler = eventHandler

View file

@ -5,6 +5,7 @@
package docs.akka.persistence.typed
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
@ -93,7 +94,7 @@ object AccountExample2 {
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehavior[AccountCommand, AccountEvent, Account](
persistenceId = accountNumber,
persistenceId = PersistenceId(s"Account-$accountNumber"),
emptyState = EmptyAccount,
commandHandler = commandHandler,
eventHandler = eventHandler

View file

@ -8,9 +8,10 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.PersistentBehavior
import scala.concurrent.duration._
import akka.persistence.typed.PersistenceId
object BasicPersistentBehaviorCompileOnly {
//#structure
@ -20,7 +21,7 @@ object BasicPersistentBehaviorCompileOnly {
val behavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
persistenceId = "abc",
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
(state, cmd)
@ -37,7 +38,7 @@ object BasicPersistentBehaviorCompileOnly {
//#recovery
val recoveryBehavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
persistenceId = "abc",
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
(state, cmd)
@ -53,7 +54,7 @@ object BasicPersistentBehaviorCompileOnly {
//#tagging
val taggingBehavior: Behavior[Command] =
PersistentBehavior[Command, Event, State](
persistenceId = "abc",
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
(state, cmd)
@ -66,7 +67,7 @@ object BasicPersistentBehaviorCompileOnly {
//#wrapPersistentBehavior
val samplePersistentBehavior = PersistentBehavior[Command, Event, State](
persistenceId = "abc",
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler =
(state, cmd)

View file

@ -6,6 +6,7 @@ package docs.akka.persistence.typed
import akka.Done
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.Effect
@ -120,7 +121,7 @@ object InDepthPersistentBehaviorSpec {
//#behavior
def behavior(entityId: String): Behavior[BlogCommand] =
PersistentBehavior[BlogCommand, BlogEvent, BlogState](
persistenceId = "Blog-" + entityId,
persistenceId = PersistenceId(s"Blog-$entityId"),
emptyState = BlogState.empty,
commandHandler,
eventHandler)

View file

@ -6,6 +6,7 @@ package docs.akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
@ -44,7 +45,7 @@ object MovieWatchList {
def behavior(userId: String): Behavior[Command] = {
PersistentBehavior[Command, Event, MovieList](
persistenceId = "movies-" + userId,
persistenceId = PersistenceId(s"movies-$userId"),
emptyState = MovieList(Set.empty),
commandHandler,
eventHandler = (state, event) state.applyEvent(event)