diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala new file mode 100644 index 0000000000..fce863d4fb --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.actor.typed.internal + +import akka.actor.typed.ActorContext +import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.Signal +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] sealed abstract class PoisonPill extends Signal + +/** + * INTERNAL API + */ +@InternalApi private[akka] case object PoisonPill extends PoisonPill { + def instance: PoisonPill = this +} + +/** + * INTERNAL API + * + * Returns `Behaviors.stopped` for [[PoisonPill]] signals unless it has been handled by the target `Behavior`. + * Used by Cluster Sharding to automatically stop entities without defining a stop message in the + * application protocol. Persistent actors handle `PoisonPill` and run side effects after persist + * and process stashed messages before stopping. + */ +@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] { + override def aroundReceive(ctx: ActorContext[M], msg: M, target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] = + target(ctx, msg) + + override def aroundSignal(ctx: ActorContext[M], signal: Signal, target: BehaviorInterceptor.SignalTarget[M]): Behavior[M] = { + signal match { + case p: PoisonPill ⇒ + val next = target(ctx, p) + if (Behavior.isUnhandled(next)) Behavior.stopped + else next + case _ ⇒ target(ctx, signal) + } + } + + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = + // only one interceptor per behavior stack is needed + other.isInstanceOf[PoisonPillInterceptor[_]] +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index 962eaee1a7..3096d37010 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -22,6 +22,8 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.Props import akka.actor.typed.internal.InternalRecipientRef +import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.internal.PoisonPillInterceptor import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.Behaviors @@ -155,7 +157,7 @@ import akka.util.Timeout new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava)) }, typeKey = entity.typeKey.asScala, - stopMessage = entity.stopMessage, + stopMessage = entity.stopMessage.asScala, entityProps = entity.entityProps, settings = entity.settings.asScala, messageExtractor = entity.messageExtractor.asScala, @@ -167,7 +169,7 @@ import akka.util.Timeout behavior: EntityContext ⇒ Behavior[M], entityProps: Props, typeKey: scaladsl.EntityTypeKey[M], - stopMessage: M, + stopMessage: Option[M], settings: ClusterShardingSettings, extractor: ShardingMessageExtractor[E, M], allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = { @@ -196,13 +198,21 @@ import akka.util.Timeout override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = { // using untyped.systemActorOf to avoid the Future[ActorRef] system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf( - PropsAdapter(ShardCommandActor.behavior(stopMessage)), + PropsAdapter(ShardCommandActor.behavior(stopMessage.getOrElse(PoisonPill))), URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator") } }) + def poisonPillInterceptor(behv: Behavior[M]): Behavior[M] = { + stopMessage match { + case Some(_) ⇒ behv + case None ⇒ Behaviors.intercept(new PoisonPillInterceptor[M])(behv) + } + } + val untypedEntityPropsFactory: String ⇒ akka.actor.Props = { entityId ⇒ - PropsAdapter(behavior(new EntityContext(entityId, shardCommandDelegator)), entityProps) + val behv = behavior(new EntityContext(entityId, shardCommandDelegator)) + PropsAdapter(poisonPillInterceptor(behv), entityProps) } untypedSharding.internalStart( @@ -212,7 +222,7 @@ import akka.util.Timeout extractEntityId, extractShardId, allocationStrategy.getOrElse(defaultShardAllocationStrategy(settings)), - stopMessage) + stopMessage.getOrElse(PoisonPill)) } else { log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node) " + "for role [{}] and dataCenter [{}] ...", typeKey.name, settings.role, settings.dataCenter) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index d98a07fa60..4b13a7e7ad 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -212,9 +212,8 @@ object Entity { */ def of[M]( typeKey: EntityTypeKey[M], - createBehavior: JFunction[EntityContext[M], Behavior[M]], - stopMessage: M): Entity[M, ShardingEnvelope[M]] = { - new Entity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty()) + createBehavior: JFunction[EntityContext[M], Behavior[M]]): Entity[M, ShardingEnvelope[M]] = { + new Entity(createBehavior, typeKey, Optional.empty(), Props.empty, Optional.empty(), Optional.empty(), Optional.empty()) } /** @@ -231,8 +230,7 @@ object Entity { */ def ofPersistentEntity[Command, Event, State >: Null]( typeKey: EntityTypeKey[Command], - createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]], - stopMessage: Command): Entity[Command, ShardingEnvelope[Command]] = { + createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = { of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] { override def apply(ctx: EntityContext[Command]): Behavior[Command] = { @@ -242,7 +240,7 @@ object Entity { s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.") persistentEntity } - }, stopMessage) + }) } } @@ -253,7 +251,7 @@ object Entity { final class Entity[M, E] private[akka] ( val createBehavior: JFunction[EntityContext[M], Behavior[M]], val typeKey: EntityTypeKey[M], - val stopMessage: M, + val stopMessage: Optional[M], val entityProps: Props, val settings: Optional[ClusterShardingSettings], val messageExtractor: Optional[ShardingMessageExtractor[E, M]], @@ -271,6 +269,15 @@ final class Entity[M, E] private[akka] ( def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] = copy(settings = Optional.ofNullable(newSettings)) + /** + * Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * If this is not defined it will be stopped automatically. + * It can be useful to define a custom stop message if the entity needs to perform + * some asynchronous cleanup or interactions before stopping. + */ + def withStopMessage(newStopMessage: M): Entity[M, E] = + copy(stopMessage = Optional.ofNullable(newStopMessage)) + /** * * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping @@ -292,7 +299,7 @@ final class Entity[M, E] private[akka] ( private def copy( createBehavior: JFunction[EntityContext[M], Behavior[M]] = createBehavior, typeKey: EntityTypeKey[M] = typeKey, - stopMessage: M = stopMessage, + stopMessage: Optional[M] = stopMessage, entityProps: Props = entityProps, settings: Optional[ClusterShardingSettings] = settings, allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala index 2fd0047e25..97078f5f67 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/PersistentEntity.scala @@ -23,15 +23,18 @@ import akka.persistence.typed.javadsl.PersistentBehavior */ abstract class PersistentEntity[Command, Event, State >: Null] private ( val entityTypeKey: EntityTypeKey[Command], + val entityId: String, persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) { def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = { - this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy]) + this(entityTypeKey, entityId, + persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy]) } def this(entityTypeKey: EntityTypeKey[Command], entityId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { - this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy)) + this(entityTypeKey, entityId, + persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy)) } } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index a3ea35645c..540f2de5c4 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -215,14 +215,12 @@ object Entity { * * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId) - * @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * @tparam M The type of message the entity accepts */ def apply[M]( typeKey: EntityTypeKey[M], - createBehavior: EntityContext ⇒ Behavior[M], - stopMessage: M): Entity[M, ShardingEnvelope[M]] = - new Entity(createBehavior, typeKey, stopMessage, Props.empty, None, None, None) + createBehavior: EntityContext ⇒ Behavior[M]): Entity[M, ShardingEnvelope[M]] = + new Entity(createBehavior, typeKey, None, Props.empty, None, None, None) } /** @@ -231,7 +229,7 @@ object Entity { final class Entity[M, E] private[akka] ( val createBehavior: EntityContext ⇒ Behavior[M], val typeKey: EntityTypeKey[M], - val stopMessage: M, + val stopMessage: Option[M], val entityProps: Props, val settings: Option[ClusterShardingSettings], val messageExtractor: Option[ShardingMessageExtractor[E, M]], @@ -249,6 +247,15 @@ final class Entity[M, E] private[akka] ( def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] = copy(settings = Option(newSettings)) + /** + * Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. + * If this is not defined it will be stopped automatically. + * It can be useful to define a custom stop message if the entity needs to perform + * some asynchronous cleanup or interactions before stopping. + */ + def withStopMessage(newStopMessage: M): Entity[M, E] = + copy(stopMessage = Option(newStopMessage)) + /** * * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping @@ -270,7 +277,7 @@ final class Entity[M, E] private[akka] ( private def copy( createBehavior: EntityContext ⇒ Behavior[M] = createBehavior, typeKey: EntityTypeKey[M] = typeKey, - stopMessage: M = stopMessage, + stopMessage: Option[M] = stopMessage, entityProps: Props = entityProps, settings: Option[ClusterShardingSettings] = settings, allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala index bad25ab1c3..faeb65651b 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/MultiDcClusterShardingSpec.scala @@ -70,10 +70,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh "start sharding" in { val sharding = ClusterSharding(typedSystem) val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start( - Entity( - typeKey, - _ ⇒ multiDcPinger, - NoMore)) + Entity(typeKey, _ ⇒ multiDcPinger)) val probe = TestProbe[Pong] shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref)) probe.expectMessage(max = 10.seconds, Pong(cluster.selfMember.dataCenter)) @@ -102,8 +99,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start( Entity( typeKey, - _ ⇒ multiDcPinger, - NoMore) + _ ⇒ multiDcPinger) .withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2"))) val probe = TestProbe[Pong] proxy ! ShardingEnvelope(entityId, Ping(probe.ref)) diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java new file mode 100644 index 0000000000..82ceb45844 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl; + +import akka.Done; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.Join; +import akka.persistence.typed.ExpectingReply; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; +import akka.persistence.typed.javadsl.EventHandler; +import akka.util.Timeout; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; + +import static org.junit.Assert.assertEquals; + +public class ClusterShardingPersistenceTest extends JUnitSuite { + + public static final Config config = ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.netty.tcp.port = 0 \n" + + "akka.remote.artery.canonical.port = 0 \n" + + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"); + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + interface Command {} + static class Add implements Command { + public final String s; + + Add(String s) { + this.s = s; + } + } + static class AddWithConfirmation implements Command, ExpectingReply { + final String s; + private final ActorRef replyTo; + + AddWithConfirmation(String s, ActorRef replyTo) { + this.s = s; + this.replyTo = replyTo; + } + + @Override + public ActorRef replyTo() { + return replyTo; + } + } + static class Get implements Command { + final ActorRef replyTo; + + Get(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + static enum StopPlz implements Command { + INSTANCE + } + + + + static class TestPersistentEntity extends PersistentEntity { + + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(Command.class, "HelloWorld"); + + public TestPersistentEntity(String entityId) { + super(ENTITY_TYPE_KEY, entityId); + } + + @Override + public String emptyState() { + return ""; + } + + @Override + public CommandHandler commandHandler() { + return commandHandlerBuilder(String.class) + .matchCommand(Add.class, this::add) + .matchCommand(AddWithConfirmation.class, this::addWithConfirmation) + .matchCommand(Get.class, this::getState) + .build(); + } + + private Effect add(String state, Add cmd) { + return Effect().persist(cmd.s); + } + + private Effect addWithConfirmation(String state, AddWithConfirmation cmd) { + return Effect().persist(cmd.s) + .thenReply(cmd, newState -> Done.getInstance()); + } + + private Effect getState(String state, Get cmd) { + cmd.replyTo.tell(entityId() + ":" + state); + return Effect().none(); + } + + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(String.class, this::applyEvent) + .build(); + } + + private String applyEvent(String state, String evt) { + if (state.equals("")) + return evt; + else + return state + "|" + evt; + } + } + + private ClusterSharding _sharding = null; + + private ClusterSharding sharding() { + if (_sharding == null) { + // initialize first time only + Cluster cluster = Cluster.get(testKit.system()); + cluster.manager().tell(new Join(cluster.selfMember().address())); + + ClusterSharding sharding = ClusterSharding.get(testKit.system()); + + sharding.start(Entity.ofPersistentEntity(TestPersistentEntity.ENTITY_TYPE_KEY, + entityContext -> new TestPersistentEntity(entityContext.getEntityId())) + .withStopMessage(StopPlz.INSTANCE)); + + _sharding = sharding; + } + return _sharding; + } + + @Test + public void startPersistentActor() { + TestProbe p = testKit.createTestProbe(); + EntityRef ref = sharding().entityRefFor(TestPersistentEntity.ENTITY_TYPE_KEY, "123"); + ref.tell(new Add("a")); + ref.tell(new Add("b")); + ref.tell(new Add("c")); + ref.tell(new Get(p.getRef())); + p.expectMessage("123:a|b|c"); + } + + @Test + public void askWithThenReply() { + TestProbe p1 = testKit.createTestProbe(); + EntityRef ref = sharding().entityRefFor(TestPersistentEntity.ENTITY_TYPE_KEY, "456"); + Timeout askTimeout = Timeout.create(p1.getRemainingOrDefault()); + CompletionStage done1 =ref.ask(replyTo -> new AddWithConfirmation("a", replyTo), askTimeout); + done1.thenAccept(d -> p1.getRef().tell(d)); + p1.expectMessage(Done.getInstance()); + + CompletionStage done2 =ref.ask(replyTo -> new AddWithConfirmation("b", replyTo), askTimeout); + done1.thenAccept(d -> p1.getRef().tell(d)); + p1.expectMessage(Done.getInstance()); + + TestProbe p2 = testKit.createTestProbe(); + ref.tell(new Get(p2.getRef())); + p2.expectMessage("456:a|b"); + } + +} diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java index 23b0eff4d8..75660ba753 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.java @@ -45,8 +45,7 @@ public class HelloWorldPersistentEntityExample { sharding.start( Entity.ofPersistentEntity( HelloWorld.ENTITY_TYPE_KEY, - ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()), - HelloWorld.Passivate.INSTANCE)); + ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()))); } // usage example @@ -78,10 +77,6 @@ public class HelloWorldPersistentEntityExample { } } - enum Passivate implements Command { - INSTANCE - } - // Response public static final class Greeting { public final String whom; @@ -140,14 +135,9 @@ public class HelloWorldPersistentEntityExample { public CommandHandler commandHandler() { return commandHandlerBuilder(KnownPeople.class) .matchCommand(Greet.class, this::greet) - .matchCommand(Greet.class, this::passivate) .build(); } - private Effect passivate(KnownPeople state, Command cmd) { - return Effect().stop(); - } - private Effect greet(KnownPeople state, Greet cmd) { return Effect().persist(new Greeted(cmd.whom)) .thenRun(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople()))); diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java index a044fad573..0d3dd5be55 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleTest.java @@ -44,8 +44,7 @@ public class HelloWorldPersistentEntityExampleTest extends JUnitSuite { sharding.start( Entity.ofPersistentEntity( HelloWorld.ENTITY_TYPE_KEY, - ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()), - HelloWorld.Passivate.INSTANCE)); + ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()))); _sharding = sharding; } return _sharding; diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index 1ae45cf318..9e20c4fbbc 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -22,14 +22,12 @@ import akka.cluster.sharding.typed.javadsl.Entity; import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand; import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior; -import jdocs.akka.persistence.typed.BlogPostExample.PassivatePost; public class ShardingCompileOnlyTest { //#counter-messages interface CounterCommand {} public static class Increment implements CounterCommand { } - public static class GoodByeCounter implements CounterCommand { } public static class GetValue implements CounterCommand { private final ActorRef replyTo; @@ -50,9 +48,6 @@ public class ShardingCompileOnlyTest { msg.replyTo.tell(value); return Behaviors.same(); }) - .onMessage(GoodByeCounter.class, (ctx, msg) -> { - return Behaviors.stopped(); - }) .build(); } //#counter @@ -60,6 +55,8 @@ public class ShardingCompileOnlyTest { //#counter-passivate public static class Idle implements CounterCommand { } + public static class GoodByeCounter implements CounterCommand { } + public static Behavior counter2(ActorRef shard, String entityId) { return Behaviors.setup(ctx -> { ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle()); @@ -105,8 +102,8 @@ public class ShardingCompileOnlyTest { sharding.start( Entity.of( typeKey, - ctx -> counter2(ctx.getShard(), ctx.getEntityId()), - new GoodByeCounter())); + ctx -> counter2(ctx.getShard(), ctx.getEntityId())) + .withStopMessage(new GoodByeCounter())); //#counter-passivate-start } @@ -126,8 +123,7 @@ public class ShardingCompileOnlyTest { ActorRef> shardRegion = sharding.start( Entity.of( typeKey, - ctx -> counter(ctx.getEntityId(),0), - new GoodByeCounter())); + ctx -> counter(ctx.getEntityId(),0))); //#start //#send @@ -150,8 +146,7 @@ public class ShardingCompileOnlyTest { sharding.start( Entity.of( blogTypeKey, - ctx -> BlogBehavior.behavior(ctx.getEntityId()), - new PassivatePost())); + ctx -> BlogBehavior.behavior(ctx.getEntityId()))); //#persistence } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index fece092900..304905a64a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -4,13 +4,27 @@ package akka.cluster.sharding.typed.scaladsl +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.concurrent.Promise import akka.Done import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState +import akka.cluster.sharding.ShardRegion.GetShardRegionState +import akka.cluster.sharding.typed.scaladsl.ClusterSharding.Passivate +import akka.cluster.sharding.typed.scaladsl.ClusterSharding.ShardCommand +import akka.cluster.sharding.{ ClusterSharding ⇒ UntypedClusterSharding } import akka.cluster.typed.Cluster import akka.cluster.typed.Join import akka.persistence.typed.ExpectingReply @@ -21,8 +35,10 @@ import org.scalatest.WordSpecLike object ClusterShardingPersistenceSpec { val config = ConfigFactory.parseString( """ - akka.actor.provider = cluster + akka.loglevel = DEBUG + #akka.persistence.typed.log-stashing = on + akka.actor.provider = cluster akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 @@ -33,61 +49,119 @@ object ClusterShardingPersistenceSpec { sealed trait Command final case class Add(s: String) extends Command final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done] + final case class PassivateAndPersist(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done] final case class Get(replyTo: ActorRef[String]) extends Command - final case object StopPlz extends Command + final case class Echo(msg: String, replyTo: ActorRef[String]) extends Command + final case class Block(latch: CountDownLatch) extends Command val typeKey = EntityTypeKey[Command]("test") - def persistentEntity(entityId: String): Behavior[Command] = - PersistentEntity[Command, String, String]( - entityTypeKey = typeKey, - entityId = entityId, - emptyState = "", - commandHandler = (state, cmd) ⇒ cmd match { - case Add(s) ⇒ - Effect.persist(s) + val recoveryCompletedProbes = new ConcurrentHashMap[String, ActorRef[String]] - case cmd @ AddWithConfirmation(s) ⇒ - Effect.persist(s) - .thenReply(cmd)(newState ⇒ Done) + // Need this to be able to send the PoisonPill from the outside, simulating rebalance before recovery and such. + // Promise completed by the actor when it's started. + val entityActorRefs = new ConcurrentHashMap[String, Promise[ActorRef[Any]]] - case Get(replyTo) ⇒ - replyTo ! s"$entityId:$state" - Effect.none - case StopPlz ⇒ Effect.stop() - }, - eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) + def persistentEntity(entityId: String, shard: ActorRef[ShardCommand]): Behavior[Command] = { + Behaviors.setup { ctx ⇒ + + entityActorRefs.get(entityId) match { + case null ⇒ + case promise ⇒ promise.trySuccess(ctx.self.upcast) + } + + PersistentEntity[Command, String, String]( + entityTypeKey = typeKey, + entityId = entityId, + emptyState = "", + commandHandler = (state, cmd) ⇒ cmd match { + case Add(s) ⇒ + Effect.persist(s) + + case cmd @ AddWithConfirmation(s) ⇒ + Effect.persist(s) + .thenReply(cmd)(newState ⇒ Done) + + case Get(replyTo) ⇒ + replyTo ! s"$entityId:$state" + Effect.none + + case cmd @ PassivateAndPersist(s) ⇒ + shard ! Passivate(ctx.self) + Effect.persist(s) + .thenReply(cmd)(newState ⇒ Done) + + case Echo(msg, replyTo) ⇒ + Effect.none.thenRun(_ ⇒ replyTo ! msg) + + case Block(latch) ⇒ + latch.await(5, TimeUnit.SECONDS) + Effect.none + }, + eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) + .onRecoveryCompleted { state ⇒ + ctx.log.debug("onRecoveryCompleted: [{}]", state) + recoveryCompletedProbes.get(entityId) match { + case null ⇒ ctx.log.debug("no recoveryCompletedProbe for [{}]", entityId) + case p ⇒ p ! s"recoveryCompleted:$state" + } + } + } + } } class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike { import ClusterShardingPersistenceSpec._ + private var _entityId = 0 + def nextEntityId(): String = { + _entityId += 1 + _entityId.toString + } + + private def awaitEntityTerminatedAndRemoved(ref: ActorRef[_], entityId: String): Unit = { + val p = TestProbe[Any]() + p.expectTerminated(ref, p.remainingOrDefault) + + // also make sure that the entity is removed from the Shard before continuing + // FIXME rewrite this with Typed API when region queries are supported + import akka.actor.typed.scaladsl.adapter._ + val regionStateProbe = TestProbe[CurrentShardRegionState]() + val untypedRegion = UntypedClusterSharding(system.toUntyped) + regionStateProbe.awaitAssert { + untypedRegion.shardRegion(typeKey.name).tell(GetShardRegionState, regionStateProbe.ref.toUntyped) + regionStateProbe.expectMessageType[CurrentShardRegionState].shards.foreach { shardState ⇒ + shardState.entityIds should not contain entityId + } + } + } + "Typed cluster sharding with persistent actor" must { ClusterSharding(system).start(Entity( typeKey, - ctx ⇒ persistentEntity(ctx.entityId), - StopPlz - )) + ctx ⇒ persistentEntity(ctx.entityId, ctx.shard))) Cluster(system).manager ! Join(Cluster(system).selfMember.address) "start persistent actor" in { + val entityId = nextEntityId() val p = TestProbe[String]() - val ref = ClusterSharding(system).entityRefFor(typeKey, "123") + val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) ref ! Add("a") ref ! Add("b") ref ! Add("c") ref ! Get(p.ref) - p.expectMessage("123:a|b|c") + p.expectMessage("1:a|b|c") } "support ask with thenReply" in { + val entityId = nextEntityId() val p = TestProbe[String]() - val ref = ClusterSharding(system).entityRefFor(typeKey, "456") + val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) val done1 = ref ? AddWithConfirmation("a") done1.futureValue should ===(Done) @@ -95,7 +169,244 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh done2.futureValue should ===(Done) ref ! Get(p.ref) - p.expectMessage("456:a|b") + p.expectMessage("2:a|b") } + + "handle PoisonPill after persist effect" in { + val entityId = nextEntityId() + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val p1 = TestProbe[Done]() + val ref = ClusterSharding(system).entityRefFor(typeKey, entityId) + (1 to 10).foreach { n ⇒ + ref ! PassivateAndPersist(n.toString)(p1.ref) + // recoveryCompleted each time, verifies that it was actually stopped + recoveryProbe.expectMessage("recoveryCompleted:" + (1 until n).map(_.toString).mkString("|")) + p1.expectMessage(Done) + } + + val p2 = TestProbe[String]() + ref ! Get(p2.ref) + p2.expectMessage(entityId + ":" + (1 to 10).map(_.toString).mkString("|")) + } + + "handle PoisonPill after several stashed commands that persist" in { + val entityId = nextEntityId() + val actorRefPromise = Promise[ActorRef[Any]]() + entityActorRefs.put(entityId, actorRefPromise) + + val addProbe = TestProbe[Done]() + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) + // this will wakeup the entity, and complete the entityActorRefPromise + entityRef ! AddWithConfirmation("a")(addProbe.ref) + addProbe.expectMessage(Done) + recoveryProbe.expectMessage("recoveryCompleted:") + // now we know that it's in EventSourcedRunning with no stashed commands + + val actorRef = actorRefPromise.future.futureValue + + // not sending via the EntityRef because that would make the test racy + // these are stashed, since before the PoisonPill + val latch = new CountDownLatch(1) + actorRef ! Block(latch) + actorRef ! AddWithConfirmation("b")(addProbe.ref) + actorRef ! AddWithConfirmation("c")(addProbe.ref) + + actorRef ! PoisonPill + + // those messages should be ignored since they happen after the PoisonPill, + actorRef ! AddWithConfirmation("d")(addProbe.ref) + actorRef ! AddWithConfirmation("e")(addProbe.ref) + + // now we have enqueued the message sequence and start processing them + latch.countDown() + + addProbe.expectMessage(Done) + addProbe.expectMessage(Done) + + // wake up again + awaitEntityTerminatedAndRemoved(actorRef, entityId) + val p2 = TestProbe[String]() + entityRef ! AddWithConfirmation("f")(addProbe.ref) + entityRef ! Get(p2.ref) + recoveryProbe.expectMessage("recoveryCompleted:a|b|c") + p2.expectMessage(entityId + ":a|b|c|f") + } + + "handle PoisonPill after several stashed commands that DON'T persist" in { + val entityId = nextEntityId() + val actorRefPromise = Promise[ActorRef[Any]]() + entityActorRefs.put(entityId, actorRefPromise) + + val addProbe = TestProbe[Done]() + val echoProbe = TestProbe[String]() + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) + // this will wakeup the entity, and complete the entityActorRefPromise + entityRef ! AddWithConfirmation("a")(addProbe.ref) + addProbe.expectMessage(Done) + recoveryProbe.expectMessage("recoveryCompleted:") + // now we know that it's in EventSourcedRunning with no stashed commands + + val actorRef = actorRefPromise.future.futureValue + // not sending via the EntityRef because that would make the test racy + // these are stashed, since before the PoisonPill + val latch = new CountDownLatch(1) + actorRef ! Block(latch) + actorRef ! AddWithConfirmation("b")(addProbe.ref) + actorRef ! AddWithConfirmation("c")(addProbe.ref) + actorRef ! Echo("echo-1", echoProbe.ref) + + actorRef ! PoisonPill + + // those messages should be ignored since they happen after the PoisonPill, + actorRef ! Echo("echo-2", echoProbe.ref) + actorRef ! AddWithConfirmation("d")(addProbe.ref) + actorRef ! AddWithConfirmation("e")(addProbe.ref) + actorRef ! Echo("echo-3", echoProbe.ref) + + // now we have enqueued the message sequence and start processing them + latch.countDown() + + echoProbe.expectMessage("echo-1") + addProbe.expectMessage(Done) + addProbe.expectMessage(Done) + + // wake up again + awaitEntityTerminatedAndRemoved(actorRef, entityId) + val p2 = TestProbe[String]() + entityRef ! Echo("echo-4", echoProbe.ref) + echoProbe.expectMessage("echo-4") + entityRef ! AddWithConfirmation("f")(addProbe.ref) + entityRef ! Get(p2.ref) + recoveryProbe.expectMessage("recoveryCompleted:a|b|c") + p2.expectMessage(entityId + ":a|b|c|f") + } + + "handle PoisonPill when stash empty" in { + val entityId = nextEntityId() + val actorRefPromise = Promise[ActorRef[Any]]() + entityActorRefs.put(entityId, actorRefPromise) + + val addProbe = TestProbe[Done]() + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) + // this will wakeup the entity, and complete the entityActorRefPromise + entityRef ! AddWithConfirmation("a")(addProbe.ref) + addProbe.expectMessage(Done) + recoveryProbe.expectMessage("recoveryCompleted:") + // now we know that it's in EventSourcedRunning with no stashed commands + + val actorRef = actorRefPromise.future.futureValue + // not sending via the EntityRef because that would make the test racy + val latch = new CountDownLatch(1) + actorRef ! Block(latch) + actorRef ! PoisonPill + // those messages should be ignored since they happen after the PoisonPill, + actorRef ! AddWithConfirmation("b")(addProbe.ref) + + // now we have enqueued the message sequence and start processing them + latch.countDown() + + // wake up again + awaitEntityTerminatedAndRemoved(actorRef, entityId) + val p2 = TestProbe[String]() + entityRef ! AddWithConfirmation("c")(addProbe.ref) + entityRef ! Get(p2.ref) + recoveryProbe.expectMessage("recoveryCompleted:a") + p2.expectMessage(entityId + ":a|c") + } + + "handle PoisonPill before recovery completed without stashed commands" in { + val entityId = nextEntityId() + val actorRefPromise = Promise[ActorRef[Any]]() + entityActorRefs.put(entityId, actorRefPromise) + + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) + val ignoreFirstEchoProbe = TestProbe[String]() + val echoProbe = TestProbe[String]() + // first echo will wakeup the entity, and complete the entityActorRefPromise + // ignore the first echo reply since it may be racy with the PoisonPill + entityRef ! Echo("echo-1", ignoreFirstEchoProbe.ref) + + // not using actorRefPromise.future.futureValue because it's polling (slow) and want to run this before + // recovery completed, to exercise that scenario + implicit val ec: ExecutionContext = testKit.system.executionContext + val poisonSent = actorRefPromise.future.map { actorRef ⇒ + // not sending via the EntityRef because that would make the test racy + actorRef ! PoisonPill + actorRef + } + val actorRef = poisonSent.futureValue + + recoveryProbe.expectMessage("recoveryCompleted:") + + // wake up again + awaitEntityTerminatedAndRemoved(actorRef, entityId) + entityRef ! Echo("echo-2", echoProbe.ref) + echoProbe.expectMessage("echo-2") + recoveryProbe.expectMessage("recoveryCompleted:") + } + + "handle PoisonPill before recovery completed with stashed commands" in { + val entityId = nextEntityId() + val actorRefPromise = Promise[ActorRef[Any]]() + entityActorRefs.put(entityId, actorRefPromise) + + val recoveryProbe = TestProbe[String]() + recoveryCompletedProbes.put(entityId, recoveryProbe.ref) + + val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId) + val addProbe = TestProbe[Done]() + val ignoreFirstEchoProbe = TestProbe[String]() + val echoProbe = TestProbe[String]() + // first echo will wakeup the entity, and complete the entityActorRefPromise + // ignore the first echo reply since it may be racy with the PoisonPill + entityRef ! Echo("echo-1", ignoreFirstEchoProbe.ref) + + // not using actorRefPromise.future.futureValue because it's polling (slow) and want to run this before + // recovery completed, to exercise that scenario + implicit val ec: ExecutionContext = testKit.system.executionContext + val poisonSent = actorRefPromise.future.map { actorRef ⇒ + // not sending via the EntityRef because that would make the test racy + // these are stashed, since before the PoisonPill + actorRef ! Echo("echo-2", echoProbe.ref) + actorRef ! AddWithConfirmation("a")(addProbe.ref) + actorRef ! AddWithConfirmation("b")(addProbe.ref) + actorRef ! Echo("echo-3", echoProbe.ref) + + actorRef ! PoisonPill + + // those messages should be ignored since they happen after the PoisonPill, + actorRef ! Echo("echo-4", echoProbe.ref) + actorRef ! AddWithConfirmation("c")(addProbe.ref) + actorRef + } + val actorRef = poisonSent.futureValue + + recoveryProbe.expectMessage("recoveryCompleted:") + echoProbe.expectMessage("echo-2") + echoProbe.expectMessage("echo-3") + addProbe.expectMessage(Done) + addProbe.expectMessage(Done) + + // wake up again + awaitEntityTerminatedAndRemoved(actorRef, entityId) + entityRef ! Echo("echo-5", echoProbe.ref) + echoProbe.expectMessage("echo-5") + recoveryProbe.expectMessage("recoveryCompleted:a|b") + } + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index d5ac806777..358703e825 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -18,6 +18,7 @@ import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.ActorRef import akka.actor.typed.ActorRefResolver import akka.actor.typed.ActorSystem +import akka.actor.typed.PostStop import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.cluster.MemberStatus @@ -148,14 +149,14 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") - private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[Done]] = None) = + private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) = Behaviors.receive[TestProtocol] { case (ctx, PassivatePlz()) ⇒ shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case (_, StopPlz()) ⇒ - stopProbe.foreach(_ ! Done) + stopProbe.foreach(_ ! "StopPlz") Behaviors.stopped case (ctx, WhoAreYou(replyTo)) ⇒ @@ -166,6 +167,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. case (_, ReplyPlz(toMe)) ⇒ toMe ! "Hello!" Behaviors.same + }.receiveSignal { + case (_, PostStop) ⇒ + stopProbe.foreach(_ ! "PostStop") + Behaviors.same } private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") @@ -185,35 +190,35 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity( typeKey, - ctx ⇒ behavior(ctx.shard), - StopPlz())) + ctx ⇒ behavior(ctx.shard)) + .withStopMessage(StopPlz())) private val shardingRef2 = sharding2.start(Entity( typeKey, - ctx ⇒ behavior(ctx.shard), - StopPlz())) + ctx ⇒ behavior(ctx.shard)) + .withStopMessage(StopPlz())) private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(Entity( typeKey2, - ctx ⇒ behaviorWithId(ctx.shard), - IdStopPlz()) + ctx ⇒ behaviorWithId(ctx.shard)) .withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) ⇒ id case IdWhoAreYou(id, _) ⇒ id case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") }) + .withStopMessage(IdStopPlz()) ) private val shardingRef4 = sharding2.start(Entity( typeKey2, - ctx ⇒ behaviorWithId(ctx.shard), - IdStopPlz()) + ctx ⇒ behaviorWithId(ctx.shard)) .withMessageExtractor( ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) ⇒ id case IdWhoAreYou(id, _) ⇒ id case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") }) + .withStopMessage(IdStopPlz()) ) def totalEntityCount1(): Int = { @@ -258,33 +263,55 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } } - "be able to passivate" in { - val stopProbe = TestProbe[Done]() + "be able to passivate with custom stop message" in { + val stopProbe = TestProbe[String]() val p = TestProbe[String]() val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test") val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity( typeKey3, - ctx ⇒ behavior(ctx.shard, Some(stopProbe.ref)), - StopPlz())) + ctx ⇒ behavior(ctx.shard, Some(stopProbe.ref))) + .withStopMessage(StopPlz())) shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) p.expectMessage("Hello!") shardingRef3 ! ShardingEnvelope(s"test1", PassivatePlz()) - stopProbe.expectMessage(Done) + stopProbe.expectMessage("StopPlz") + stopProbe.expectMessage("PostStop") shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref)) p.expectMessage("Hello!") } + "be able to passivate with PoisonPill" in { + val stopProbe = TestProbe[String]() + val p = TestProbe[String]() + val typeKey4 = EntityTypeKey[TestProtocol]("passivate-test-poison") + + val shardingRef4: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity( + typeKey4, + ctx ⇒ behavior(ctx.shard, Some(stopProbe.ref)))) + // no StopPlz stopMessage + + shardingRef4 ! ShardingEnvelope(s"test4", ReplyPlz(p.ref)) + p.expectMessage("Hello!") + + shardingRef4 ! ShardingEnvelope(s"test4", PassivatePlz()) + // no StopPlz + stopProbe.expectMessage("PostStop") + + shardingRef4 ! ShardingEnvelope(s"test4", ReplyPlz(p.ref)) + p.expectMessage("Hello!") + } + "fail if starting sharding for already used typeName, but with a different type" in { // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") val ex = intercept[Exception] { sharding.start(Entity( EntityTypeKey[IdTestProtocol]("envelope-shard"), - ctx ⇒ behaviorWithId(ctx.shard), - IdStopPlz())) + ctx ⇒ behaviorWithId(ctx.shard)) + .withStopMessage(IdStopPlz())) } ex.getMessage should include("already started") @@ -349,8 +376,8 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. sharding.start(Entity( ignorantKey, - _ ⇒ Behaviors.ignore[TestProtocol], - StopPlz())) + _ ⇒ Behaviors.ignore[TestProtocol]) + .withStopMessage(StopPlz())) val ref = sharding.entityRefFor(ignorantKey, "sloppy") diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala index a3ff7bd96f..8c8e78b7a3 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExample.scala @@ -25,8 +25,7 @@ object HelloWorldPersistentEntityExample { sharding.start(Entity( typeKey = HelloWorld.entityTypeKey, - createBehavior = entityContext ⇒ HelloWorld.persistentEntity(entityContext.entityId), - stopMessage = HelloWorld.Passivate)) + createBehavior = entityContext ⇒ HelloWorld.persistentEntity(entityContext.entityId))) private implicit val askTimeout: Timeout = Timeout(5.seconds) @@ -50,7 +49,6 @@ object HelloWorldPersistentEntityExample { // Command trait Command final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command - case object Passivate extends Command // Response final case class Greeting(whom: String, numberOfPeople: Int) @@ -68,7 +66,6 @@ object HelloWorldPersistentEntityExample { (_, cmd) ⇒ cmd match { case cmd: Greet ⇒ greet(cmd) - case Passivate ⇒ passivate() } } @@ -76,9 +73,6 @@ object HelloWorldPersistentEntityExample { Effect.persist(Greeted(cmd.whom)) .thenRun(state ⇒ cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople)) - private def passivate(): Effect[Greeted, KnownPeople] = - Effect.stop() - private val eventHandler: (KnownPeople, Greeted) ⇒ KnownPeople = { (state, evt) ⇒ state.add(evt.whom) } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala index 7c23b00c0b..b0da6f8e8d 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/HelloWorldPersistentEntityExampleSpec.scala @@ -37,9 +37,7 @@ class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(He sharding.start(Entity( HelloWorld.entityTypeKey, - ctx ⇒ HelloWorld.persistentEntity(ctx.entityId), - HelloWorld.Passivate - )) + ctx ⇒ HelloWorld.persistentEntity(ctx.entityId))) } "HelloWorld example" must { diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index 5d2d2a87ae..16408abaae 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -10,7 +10,7 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.actor.typed.scaladsl.Behaviors import akka.cluster.sharding.typed.scaladsl.Entity import docs.akka.persistence.typed.BlogPostExample -import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost } +import docs.akka.persistence.typed.BlogPostExample.BlogCommand object ShardingCompileOnlySpec { @@ -29,7 +29,6 @@ object ShardingCompileOnlySpec { trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand - case object GoodByeCounter extends CounterCommand //#counter-messages //#counter @@ -41,8 +40,6 @@ object ShardingCompileOnlySpec { case GetValue(replyTo) ⇒ replyTo ! value Behaviors.same - case GoodByeCounter ⇒ - Behaviors.stopped } //#counter @@ -51,8 +48,7 @@ object ShardingCompileOnlySpec { val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(Entity( typeKey = TypeKey, - createBehavior = ctx ⇒ counter(ctx.entityId, 0), - stopMessage = GoodByeCounter)) + createBehavior = ctx ⇒ counter(ctx.entityId, 0))) //#start //#send @@ -70,13 +66,13 @@ object ShardingCompileOnlySpec { ClusterSharding(system).start(Entity( typeKey = BlogTypeKey, - createBehavior = ctx ⇒ behavior(ctx.entityId), - stopMessage = PassivatePost)) + createBehavior = ctx ⇒ behavior(ctx.entityId))) //#persistence //#counter-passivate case object Idle extends CounterCommand + case object GoodByeCounter extends CounterCommand def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { Behaviors.setup { ctx ⇒ @@ -104,8 +100,8 @@ object ShardingCompileOnlySpec { sharding.start(Entity( typeKey = TypeKey, - createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId), - stopMessage = GoodByeCounter)) + createBehavior = ctx ⇒ counter2(ctx.shard, ctx.entityId)) + .withStopMessage(GoodByeCounter)) //#counter-passivate } diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3e1e8834f1..b4fb7082b3 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -105,10 +105,11 @@ If a message is already enqueued to the entity when it stops itself the enqueued in the mailbox will be dropped. To support graceful passivation without losing such messages the entity actor can send `ClusterSharding.Passivate` to to the @scala[`ActorRef[ShardCommand]`]@java[`ActorRef`] that was passed in to -the factory method when creating the entity. The specified `handOffStopMessage` message -will be sent back to the entity, which is then supposed to stop itself. Incoming messages -will be buffered by the `Shard` between reception of `Passivate` and termination of the -entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. +the factory method when creating the entity. The optional `stopMessage` message +will be sent back to the entity, which is then supposed to stop itself, otherwise it will +be stopped automatically. Incoming messages will be buffered by the `Shard` between reception +of `Passivate` and termination of the entity. Such buffered messages are thereafter delivered +to a new incarnation of the entity. Scala : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate } @@ -116,6 +117,10 @@ Scala Java : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start } +Note that in the above example the `stopMessage` is specified as `GoodByeCounter`. That message will be sent to +the entity when it's supposed to stop itself due to rebalance or passivation. If the `stopMessage` is not defined +it will be stopped automatically without receiving a specific message. It can be useful to define a custom stop +message if the entity needs to perform some asynchronous cleanup or interactions before stopping. ### Automatic Passivation @@ -123,5 +128,5 @@ The entities can be configured to be automatically passivated if they haven't re a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages -to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. -By default automatic passivation is disabled. \ No newline at end of file +to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. +By default automatic passivation is disabled. diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 30b1951c97..97fc784ae0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -12,9 +12,10 @@ import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior._ - import scala.util.control.NonFatal +import akka.actor.typed.internal.PoisonPill + /*** * INTERNAL API * @@ -37,7 +38,8 @@ private[persistence] object EventsourcedReplayingEvents { seqNr: Long, state: State, eventSeenInInterval: Boolean, - toSeqNr: Long + toSeqNr: Long, + receivedPoisonPill: Boolean ) def apply[C, E, S]( @@ -69,9 +71,11 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set case JournalResponse(r) ⇒ onJournalResponse(state, r) case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) case RecoveryTickEvent(snap) ⇒ onRecoveryTick(state, snap) - case cmd: IncomingCommand[C] ⇒ onCommand(cmd) + case cmd: IncomingCommand[C] ⇒ onCommand(cmd, state) case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit - }.receiveSignal(returnPermitOnStop) + }.receiveSignal(returnPermitOnStop.orElse { + case (_, PoisonPill) ⇒ stay(state.copy(receivedPoisonPill = true)) + }) private def onJournalResponse( state: ReplayingState[S], @@ -106,10 +110,16 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set } } - private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = { + private def onCommand(cmd: InternalProtocol, state: ReplayingState[S]): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands - stash(cmd) - Behaviors.same + if (state.receivedPoisonPill) { + if (setup.settings.logOnStashing) setup.log.debug( + "Discarding message [{}], because actor is to be stopped", cmd) + Behaviors.unhandled + } else { + stash(cmd) + Behaviors.same + } } protected def onRecoveryTick(state: ReplayingState[S], snapshot: Boolean): Behavior[InternalProtocol] = @@ -157,12 +167,16 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set tryReturnRecoveryPermit("replay completed successfully") setup.recoveryCompleted(state.state) - val running = EventsourcedRunning[C, E, S]( - setup, - EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state) - ) + if (state.receivedPoisonPill && isStashEmpty) + Behaviors.stopped + else { + val running = EventsourcedRunning[C, E, S]( + setup, + EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill) + ) - tryUnstash(running) + tryUnstash(running) + } } finally { setup.cancelRecoveryTimer() } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala index a60c46663b..7587572315 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingSnapshot.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.internal import akka.actor.typed.scaladsl.Behaviors.same import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } import akka.actor.typed.Behavior +import akka.actor.typed.internal.PoisonPill import akka.annotation.InternalApi import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult } import akka.persistence._ @@ -31,8 +32,8 @@ import akka.persistence.typed.internal.EventsourcedBehavior._ @InternalApi private[akka] object EventsourcedReplayingSnapshot { - def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] = - new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior() + def apply[C, E, S](setup: EventsourcedSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] = + new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill) } @@ -40,19 +41,26 @@ private[akka] object EventsourcedReplayingSnapshot { private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S]) extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { - def createBehavior(): Behavior[InternalProtocol] = { + def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = { // protect against snapshot stalling forever because of journal overloaded and such setup.startRecoveryTimer(snapshot = true) loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr) - Behaviors.receiveMessage[InternalProtocol] { - case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r) - case JournalResponse(r) ⇒ onJournalResponse(r) - case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot) - case cmd: IncomingCommand[C] ⇒ onCommand(cmd) - case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit - }.receiveSignal(returnPermitOnStop) + def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { + Behaviors.receiveMessage[InternalProtocol] { + case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r, receivedPoisonPill) + case JournalResponse(r) ⇒ onJournalResponse(r) + case RecoveryTickEvent(snapshot) ⇒ onRecoveryTick(snapshot) + case cmd: IncomingCommand[C] ⇒ + if (receivedPoisonPill) Behaviors.unhandled + else onCommand(cmd) + case RecoveryPermitGranted ⇒ Behaviors.unhandled // should not happen, we already have the permit + }.receiveSignal(returnPermitOnStop.orElse { + case (_, PoisonPill) ⇒ stay(receivedPoisonPill = true) + }) + } + stay(receivedPoisonPillInPreviousPhase) } /** @@ -94,7 +102,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E Behaviors.unhandled } - def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = { + def onSnapshotterResponse(response: SnapshotProtocol.Response, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { response match { case LoadSnapshotResult(sso, toSnr) ⇒ var state: S = setup.emptyState @@ -106,7 +114,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E case None ⇒ 0 // from the beginning please } - becomeReplayingEvents(state, seqNr, toSnr) + becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill) case LoadSnapshotFailed(cause) ⇒ onRecoveryFailure(cause, event = None) @@ -116,12 +124,12 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E } } - private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = { + private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { setup.cancelRecoveryTimer() EventsourcedReplayingEvents[C, E, S]( setup, - EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr) + EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill) ) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala index 520007f067..a9743292fa 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRequestingRecoveryPermit.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior +import akka.actor.typed.internal.PoisonPill import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol @@ -36,21 +37,36 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s // request a permit, as only once we obtain one we can start replaying requestRecoveryPermit() - Behaviors.receiveMessage[InternalProtocol] { - case InternalProtocol.RecoveryPermitGranted ⇒ - becomeReplaying() + def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { + Behaviors.receiveMessage[InternalProtocol] { + case InternalProtocol.RecoveryPermitGranted ⇒ + becomeReplaying(receivedPoisonPill) - case other ⇒ - stash(other) - Behaviors.same + case _ if receivedPoisonPill ⇒ + Behaviors.unhandled + + case other ⇒ + if (receivedPoisonPill) { + if (setup.settings.logOnStashing) setup.log.debug( + "Discarding message [{}], because actor is to be stopped", other) + Behaviors.unhandled + } else { + stash(other) + Behaviors.same + } + + }.receiveSignal { + case (_, PoisonPill) ⇒ stay(receivedPoisonPill = true) + } } + stay(receivedPoisonPill = false) } - private def becomeReplaying(): Behavior[InternalProtocol] = { + private def becomeReplaying(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery) setup.holdingRecoveryPermit = true - EventsourcedReplayingSnapshot(setup) + EventsourcedReplayingSnapshot(setup, receivedPoisonPill) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 1ec9cc588c..64019f7f67 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -16,11 +16,13 @@ import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, St import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.scaladsl.Effect - import scala.annotation.tailrec import scala.collection.immutable import scala.util.{ Failure, Success } +import akka.actor.typed.Signal +import akka.actor.typed.internal.PoisonPill + /** * INTERNAL API * @@ -42,8 +44,9 @@ import scala.util.{ Failure, Success } private[akka] object EventsourcedRunning { final case class EventsourcedState[State]( - seqNr: Long, - state: State + seqNr: Long, + state: State, + receivedPoisonPill: Boolean ) { def nextSequenceNr(): EventsourcedState[State] = @@ -158,6 +161,10 @@ private[akka] object EventsourcedRunning { case IncomingCommand(c: C @unchecked) ⇒ onCommand(state, c) case SnapshotterResponse(r) ⇒ onSnapshotterResponse(r, Behaviors.same) case _ ⇒ Behaviors.unhandled + }.receiveSignal { + case (_, PoisonPill) ⇒ + if (isStashEmpty) Behaviors.stopped + else handlingCommands(state.copy(receivedPoisonPill = true)) } } @@ -194,8 +201,14 @@ private[akka] object EventsourcedRunning { } def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { - stash(cmd) - this + if (state.receivedPoisonPill) { + if (setup.settings.logOnStashing) setup.log.debug( + "Discarding message [{}], because actor is to be stopped", cmd) + Behaviors.unhandled + } else { + stash(cmd) + this + } } final def onJournalResponse( @@ -246,6 +259,13 @@ private[akka] object EventsourcedRunning { } } + override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { + case PoisonPill ⇒ + // wait for journal responses before stopping + state = state.copy(receivedPoisonPill = true) + this + } + } private def onSnapshotterResponse( @@ -285,7 +305,10 @@ private[akka] object EventsourcedRunning { if (stopped) res = Behaviors.stopped } - res + if (state.receivedPoisonPill && isStashEmpty) + Behaviors.stopped + else + res } def applySideEffect(effect: SideEffect[S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala index 0e2cc0654c..c6d80a9d5d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashManagement.scala @@ -24,6 +24,8 @@ private[akka] trait EventsourcedStashManagement[C, E, S] { private def stashBuffer: StashBuffer[InternalProtocol] = setup.internalStash + protected def isStashEmpty: Boolean = stashBuffer.isEmpty + protected def stash(msg: InternalProtocol): Unit = { if (setup.settings.logOnStashing) setup.log.debug("Stashing message: [{}]", msg) diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java index 50e35d7497..e499cf742e 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java @@ -133,8 +133,6 @@ public class BlogPostExample { this.replyTo = replyTo; } } - public static class PassivatePost implements BlogCommand { - } public static class PostContent implements BlogCommand { final String postId; final String title; @@ -204,8 +202,7 @@ public class BlogPostExample { private CommandHandlerBuilder commonCommandHandler() { return commandHandlerBuilder(BlogState.class) - .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) - .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()); } //#post-added-command-handler diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java index de37fa0a86..03860b84b8 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java @@ -104,9 +104,6 @@ public class NullBlogState { public Publish(ActorRef replyTo) { this.replyTo = replyTo; } - } - public static class PassivatePost implements BlogCommand { - } public static class PostContent implements BlogCommand { final String postId; @@ -128,8 +125,7 @@ public class NullBlogState { PostAdded event = new PostAdded(cmd.content.postId, cmd.content); return Effect().persist(event) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); - }) - .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + }); } private CommandHandlerBuilder postCommandHandler() { @@ -147,8 +143,7 @@ public class NullBlogState { cmd.replyTo.tell(state.postContent); return Effect().none(); }) - .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) - .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()); } public BlogBehavior(PersistenceId persistenceId) { diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java index a669d043d1..339342985c 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java @@ -104,9 +104,6 @@ public class OptionalBlogState { public Publish(ActorRef replyTo) { this.replyTo = replyTo; } - } - public static class PassivatePost implements BlogCommand { - } public static class PostContent implements BlogCommand { final String postId; @@ -128,8 +125,7 @@ public class OptionalBlogState { PostAdded event = new PostAdded(cmd.content.postId, cmd.content); return Effect().persist(event) .thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); - }) - .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + }); } private CommandHandlerBuilder, Optional> postCommandHandler() { @@ -147,8 +143,7 @@ public class OptionalBlogState { cmd.replyTo.tell(state.get().postContent); return Effect().none(); }) - .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) - .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()); } public BlogBehavior(PersistenceId persistenceId) { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala index 232261213b..3949997aaf 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala @@ -51,7 +51,6 @@ object BlogPostExample { final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand - final case object PassivatePost extends BlogCommand final case class PostContent(postId: String, title: String, body: String) //#commands @@ -69,9 +68,8 @@ object BlogPostExample { state match { case BlankState ⇒ command match { - case cmd: AddPost ⇒ addPost(cmd) - case PassivatePost ⇒ Effect.stop() - case _ ⇒ Effect.unhandled + case cmd: AddPost ⇒ addPost(cmd) + case _ ⇒ Effect.unhandled } case draftState: DraftState ⇒ command match { @@ -79,12 +77,10 @@ object BlogPostExample { case Publish(replyTo) ⇒ publish(draftState, replyTo) case GetPost(replyTo) ⇒ getPost(draftState, replyTo) case _: AddPost ⇒ Effect.unhandled - case PassivatePost ⇒ Effect.stop() } case publishedState: PublishedState ⇒ command match { case GetPost(replyTo) ⇒ getPost(publishedState, replyTo) - case PassivatePost ⇒ Effect.stop() case _ ⇒ Effect.unhandled } }