Make the stop message in sharding optional, #25642

* It's mostly technical concern that is blurring the business logic in the entity
* Async interactions before stopping is often not needed
* Implemented with an internal PoisonPill signal that is added by sharding,
* Persistent actors handle PoisonPill and run side effects after persist
  and process stashed messages before stopping.

* remove unecessary stop messages

* reference docs
This commit is contained in:
Patrik Nordwall 2018-10-22 08:58:39 -04:00
parent 133c41375f
commit 9c1153b1a6
25 changed files with 796 additions and 186 deletions

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal
import akka.actor.typed.ActorContext
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.Signal
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] sealed abstract class PoisonPill extends Signal
/**
* INTERNAL API
*/
@InternalApi private[akka] case object PoisonPill extends PoisonPill {
def instance: PoisonPill = this
}
/**
* INTERNAL API
*
* Returns `Behaviors.stopped` for [[PoisonPill]] signals unless it has been handled by the target `Behavior`.
* Used by Cluster Sharding to automatically stop entities without defining a stop message in the
* application protocol. Persistent actors handle `PoisonPill` and run side effects after persist
* and process stashed messages before stopping.
*/
@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] {
override def aroundReceive(ctx: ActorContext[M], msg: M, target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] =
target(ctx, msg)
override def aroundSignal(ctx: ActorContext[M], signal: Signal, target: BehaviorInterceptor.SignalTarget[M]): Behavior[M] = {
signal match {
case p: PoisonPill
val next = target(ctx, p)
if (Behavior.isUnhandled(next)) Behavior.stopped
else next
case _ target(ctx, signal)
}
}
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean =
// only one interceptor per behavior stack is needed
other.isInstanceOf[PoisonPillInterceptor[_]]
}

View file

@ -22,6 +22,8 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.PoisonPillInterceptor
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
@ -155,7 +157,7 @@ import akka.util.Timeout
new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava))
},
typeKey = entity.typeKey.asScala,
stopMessage = entity.stopMessage,
stopMessage = entity.stopMessage.asScala,
entityProps = entity.entityProps,
settings = entity.settings.asScala,
messageExtractor = entity.messageExtractor.asScala,
@ -167,7 +169,7 @@ import akka.util.Timeout
behavior: EntityContext Behavior[M],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[M],
stopMessage: M,
stopMessage: Option[M],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, M],
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = {
@ -196,13 +198,21 @@ import akka.util.Timeout
override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = {
// using untyped.systemActorOf to avoid the Future[ActorRef]
system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf(
PropsAdapter(ShardCommandActor.behavior(stopMessage)),
PropsAdapter(ShardCommandActor.behavior(stopMessage.getOrElse(PoisonPill))),
URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator")
}
})
def poisonPillInterceptor(behv: Behavior[M]): Behavior[M] = {
stopMessage match {
case Some(_) behv
case None Behaviors.intercept(new PoisonPillInterceptor[M])(behv)
}
}
val untypedEntityPropsFactory: String akka.actor.Props = { entityId
PropsAdapter(behavior(new EntityContext(entityId, shardCommandDelegator)), entityProps)
val behv = behavior(new EntityContext(entityId, shardCommandDelegator))
PropsAdapter(poisonPillInterceptor(behv), entityProps)
}
untypedSharding.internalStart(
@ -212,7 +222,7 @@ import akka.util.Timeout
extractEntityId,
extractShardId,
allocationStrategy.getOrElse(defaultShardAllocationStrategy(settings)),
stopMessage)
stopMessage.getOrElse(PoisonPill))
} else {
log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node) " +
"for role [{}] and dataCenter [{}] ...", typeKey.name, settings.role, settings.dataCenter)

View file

@ -212,9 +212,8 @@ object Entity {
*/
def of[M](
typeKey: EntityTypeKey[M],
createBehavior: JFunction[EntityContext[M], Behavior[M]],
stopMessage: M): Entity[M, ShardingEnvelope[M]] = {
new Entity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
createBehavior: JFunction[EntityContext[M], Behavior[M]]): Entity[M, ShardingEnvelope[M]] = {
new Entity(createBehavior, typeKey, Optional.empty(), Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
}
/**
@ -231,8 +230,7 @@ object Entity {
*/
def ofPersistentEntity[Command, Event, State >: Null](
typeKey: EntityTypeKey[Command],
createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]],
stopMessage: Command): Entity[Command, ShardingEnvelope[Command]] = {
createPersistentEntity: JFunction[EntityContext[Command], PersistentEntity[Command, Event, State]]): Entity[Command, ShardingEnvelope[Command]] = {
of(typeKey, new JFunction[EntityContext[Command], Behavior[Command]] {
override def apply(ctx: EntityContext[Command]): Behavior[Command] = {
@ -242,7 +240,7 @@ object Entity {
s" [${persistentEntity.getClass.getName}] doesn't match expected $typeKey.")
persistentEntity
}
}, stopMessage)
})
}
}
@ -253,7 +251,7 @@ object Entity {
final class Entity[M, E] private[akka] (
val createBehavior: JFunction[EntityContext[M], Behavior[M]],
val typeKey: EntityTypeKey[M],
val stopMessage: M,
val stopMessage: Optional[M],
val entityProps: Props,
val settings: Optional[ClusterShardingSettings],
val messageExtractor: Optional[ShardingMessageExtractor[E, M]],
@ -271,6 +269,15 @@ final class Entity[M, E] private[akka] (
def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
copy(settings = Optional.ofNullable(newSettings))
/**
* Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* If this is not defined it will be stopped automatically.
* It can be useful to define a custom stop message if the entity needs to perform
* some asynchronous cleanup or interactions before stopping.
*/
def withStopMessage(newStopMessage: M): Entity[M, E] =
copy(stopMessage = Optional.ofNullable(newStopMessage))
/**
*
* If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
@ -292,7 +299,7 @@ final class Entity[M, E] private[akka] (
private def copy(
createBehavior: JFunction[EntityContext[M], Behavior[M]] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
stopMessage: Optional[M] = stopMessage,
entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy

View file

@ -23,15 +23,18 @@ import akka.persistence.typed.javadsl.PersistentBehavior
*/
abstract class PersistentEntity[Command, Event, State >: Null] private (
val entityTypeKey: EntityTypeKey[Command],
val entityId: String,
persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, supervisorStrategy) {
def this(entityTypeKey: EntityTypeKey[Command], entityId: String) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy])
this(entityTypeKey, entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.empty[BackoffSupervisorStrategy])
}
def this(entityTypeKey: EntityTypeKey[Command], entityId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(entityTypeKey, persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy))
this(entityTypeKey, entityId,
persistenceId = entityTypeKey.persistenceIdFrom(entityId), Optional.ofNullable(backoffSupervisorStrategy))
}
}

View file

@ -215,14 +215,12 @@ object Entity {
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts
*/
def apply[M](
typeKey: EntityTypeKey[M],
createBehavior: EntityContext Behavior[M],
stopMessage: M): Entity[M, ShardingEnvelope[M]] =
new Entity(createBehavior, typeKey, stopMessage, Props.empty, None, None, None)
createBehavior: EntityContext Behavior[M]): Entity[M, ShardingEnvelope[M]] =
new Entity(createBehavior, typeKey, None, Props.empty, None, None, None)
}
/**
@ -231,7 +229,7 @@ object Entity {
final class Entity[M, E] private[akka] (
val createBehavior: EntityContext Behavior[M],
val typeKey: EntityTypeKey[M],
val stopMessage: M,
val stopMessage: Option[M],
val entityProps: Props,
val settings: Option[ClusterShardingSettings],
val messageExtractor: Option[ShardingMessageExtractor[E, M]],
@ -249,6 +247,15 @@ final class Entity[M, E] private[akka] (
def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
copy(settings = Option(newSettings))
/**
* Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* If this is not defined it will be stopped automatically.
* It can be useful to define a custom stop message if the entity needs to perform
* some asynchronous cleanup or interactions before stopping.
*/
def withStopMessage(newStopMessage: M): Entity[M, E] =
copy(stopMessage = Option(newStopMessage))
/**
*
* If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
@ -270,7 +277,7 @@ final class Entity[M, E] private[akka] (
private def copy(
createBehavior: EntityContext Behavior[M] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
stopMessage: Option[M] = stopMessage,
entityProps: Props = entityProps,
settings: Option[ClusterShardingSettings] = settings,
allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy

View file

@ -70,10 +70,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"start sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start(
Entity(
typeKey,
_ multiDcPinger,
NoMore))
Entity(typeKey, _ multiDcPinger))
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(max = 10.seconds, Pong(cluster.selfMember.dataCenter))
@ -102,8 +99,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start(
Entity(
typeKey,
_ multiDcPinger,
NoMore)
_ multiDcPinger)
.withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))

View file

@ -0,0 +1,176 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.javadsl;
import akka.Done;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import static org.junit.Assert.assertEquals;
public class ClusterShardingPersistenceTest extends JUnitSuite {
public static final Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.netty.tcp.port = 0 \n" +
"akka.remote.artery.canonical.port = 0 \n" +
"akka.remote.artery.canonical.hostname = 127.0.0.1 \n" +
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
interface Command {}
static class Add implements Command {
public final String s;
Add(String s) {
this.s = s;
}
}
static class AddWithConfirmation implements Command, ExpectingReply<Done> {
final String s;
private final ActorRef<Done> replyTo;
AddWithConfirmation(String s, ActorRef<Done> replyTo) {
this.s = s;
this.replyTo = replyTo;
}
@Override
public ActorRef<Done> replyTo() {
return replyTo;
}
}
static class Get implements Command {
final ActorRef<String> replyTo;
Get(ActorRef<String> replyTo) {
this.replyTo = replyTo;
}
}
static enum StopPlz implements Command {
INSTANCE
}
static class TestPersistentEntity extends PersistentEntity<Command, String, String> {
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "HelloWorld");
public TestPersistentEntity(String entityId) {
super(ENTITY_TYPE_KEY, entityId);
}
@Override
public String emptyState() {
return "";
}
@Override
public CommandHandler<Command, String, String> commandHandler() {
return commandHandlerBuilder(String.class)
.matchCommand(Add.class, this::add)
.matchCommand(AddWithConfirmation.class, this::addWithConfirmation)
.matchCommand(Get.class, this::getState)
.build();
}
private Effect<String, String> add(String state, Add cmd) {
return Effect().persist(cmd.s);
}
private Effect<String, String> addWithConfirmation(String state, AddWithConfirmation cmd) {
return Effect().persist(cmd.s)
.thenReply(cmd, newState -> Done.getInstance());
}
private Effect<String, String> getState(String state, Get cmd) {
cmd.replyTo.tell(entityId() + ":" + state);
return Effect().none();
}
@Override
public EventHandler<String, String> eventHandler() {
return eventHandlerBuilder()
.matchEvent(String.class, this::applyEvent)
.build();
}
private String applyEvent(String state, String evt) {
if (state.equals(""))
return evt;
else
return state + "|" + evt;
}
}
private ClusterSharding _sharding = null;
private ClusterSharding sharding() {
if (_sharding == null) {
// initialize first time only
Cluster cluster = Cluster.get(testKit.system());
cluster.manager().tell(new Join(cluster.selfMember().address()));
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.start(Entity.ofPersistentEntity(TestPersistentEntity.ENTITY_TYPE_KEY,
entityContext -> new TestPersistentEntity(entityContext.getEntityId()))
.withStopMessage(StopPlz.INSTANCE));
_sharding = sharding;
}
return _sharding;
}
@Test
public void startPersistentActor() {
TestProbe<String> p = testKit.createTestProbe();
EntityRef<Command> ref = sharding().entityRefFor(TestPersistentEntity.ENTITY_TYPE_KEY, "123");
ref.tell(new Add("a"));
ref.tell(new Add("b"));
ref.tell(new Add("c"));
ref.tell(new Get(p.getRef()));
p.expectMessage("123:a|b|c");
}
@Test
public void askWithThenReply() {
TestProbe<Done> p1 = testKit.createTestProbe();
EntityRef<Command> ref = sharding().entityRefFor(TestPersistentEntity.ENTITY_TYPE_KEY, "456");
Timeout askTimeout = Timeout.create(p1.getRemainingOrDefault());
CompletionStage<Done> done1 =ref.ask(replyTo -> new AddWithConfirmation("a", replyTo), askTimeout);
done1.thenAccept(d -> p1.getRef().tell(d));
p1.expectMessage(Done.getInstance());
CompletionStage<Done> done2 =ref.ask(replyTo -> new AddWithConfirmation("b", replyTo), askTimeout);
done1.thenAccept(d -> p1.getRef().tell(d));
p1.expectMessage(Done.getInstance());
TestProbe<String> p2 = testKit.createTestProbe();
ref.tell(new Get(p2.getRef()));
p2.expectMessage("456:a|b");
}
}

View file

@ -45,8 +45,7 @@ public class HelloWorldPersistentEntityExample {
sharding.start(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()),
HelloWorld.Passivate.INSTANCE));
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
}
// usage example
@ -78,10 +77,6 @@ public class HelloWorldPersistentEntityExample {
}
}
enum Passivate implements Command {
INSTANCE
}
// Response
public static final class Greeting {
public final String whom;
@ -140,14 +135,9 @@ public class HelloWorldPersistentEntityExample {
public CommandHandler<Command, Greeted, KnownPeople> commandHandler() {
return commandHandlerBuilder(KnownPeople.class)
.matchCommand(Greet.class, this::greet)
.matchCommand(Greet.class, this::passivate)
.build();
}
private Effect<Greeted, KnownPeople> passivate(KnownPeople state, Command cmd) {
return Effect().stop();
}
private Effect<Greeted, KnownPeople> greet(KnownPeople state, Greet cmd) {
return Effect().persist(new Greeted(cmd.whom))
.thenRun(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople())));

View file

@ -44,8 +44,7 @@ public class HelloWorldPersistentEntityExampleTest extends JUnitSuite {
sharding.start(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId()),
HelloWorld.Passivate.INSTANCE));
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));
_sharding = sharding;
}
return _sharding;

View file

@ -22,14 +22,12 @@ import akka.cluster.sharding.typed.javadsl.Entity;
import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand;
import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior;
import jdocs.akka.persistence.typed.BlogPostExample.PassivatePost;
public class ShardingCompileOnlyTest {
//#counter-messages
interface CounterCommand {}
public static class Increment implements CounterCommand { }
public static class GoodByeCounter implements CounterCommand { }
public static class GetValue implements CounterCommand {
private final ActorRef<Integer> replyTo;
@ -50,9 +48,6 @@ public class ShardingCompileOnlyTest {
msg.replyTo.tell(value);
return Behaviors.same();
})
.onMessage(GoodByeCounter.class, (ctx, msg) -> {
return Behaviors.stopped();
})
.build();
}
//#counter
@ -60,6 +55,8 @@ public class ShardingCompileOnlyTest {
//#counter-passivate
public static class Idle implements CounterCommand { }
public static class GoodByeCounter implements CounterCommand { }
public static Behavior<CounterCommand> counter2(ActorRef<ClusterSharding.ShardCommand> shard, String entityId) {
return Behaviors.setup(ctx -> {
ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle());
@ -105,8 +102,8 @@ public class ShardingCompileOnlyTest {
sharding.start(
Entity.of(
typeKey,
ctx -> counter2(ctx.getShard(), ctx.getEntityId()),
new GoodByeCounter()));
ctx -> counter2(ctx.getShard(), ctx.getEntityId()))
.withStopMessage(new GoodByeCounter()));
//#counter-passivate-start
}
@ -126,8 +123,7 @@ public class ShardingCompileOnlyTest {
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.start(
Entity.of(
typeKey,
ctx -> counter(ctx.getEntityId(),0),
new GoodByeCounter()));
ctx -> counter(ctx.getEntityId(),0)));
//#start
//#send
@ -150,8 +146,7 @@ public class ShardingCompileOnlyTest {
sharding.start(
Entity.of(
blogTypeKey,
ctx -> BlogBehavior.behavior(ctx.getEntityId()),
new PassivatePost()));
ctx -> BlogBehavior.behavior(ctx.getEntityId())));
//#persistence
}
}

View file

@ -4,13 +4,27 @@
package akka.cluster.sharding.typed.scaladsl
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.ShardRegion.GetShardRegionState
import akka.cluster.sharding.typed.scaladsl.ClusterSharding.Passivate
import akka.cluster.sharding.typed.scaladsl.ClusterSharding.ShardCommand
import akka.cluster.sharding.{ ClusterSharding UntypedClusterSharding }
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.ExpectingReply
@ -21,8 +35,10 @@ import org.scalatest.WordSpecLike
object ClusterShardingPersistenceSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.loglevel = DEBUG
#akka.persistence.typed.log-stashing = on
akka.actor.provider = cluster
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
@ -33,61 +49,119 @@ object ClusterShardingPersistenceSpec {
sealed trait Command
final case class Add(s: String) extends Command
final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case class PassivateAndPersist(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case class Get(replyTo: ActorRef[String]) extends Command
final case object StopPlz extends Command
final case class Echo(msg: String, replyTo: ActorRef[String]) extends Command
final case class Block(latch: CountDownLatch) extends Command
val typeKey = EntityTypeKey[Command]("test")
def persistentEntity(entityId: String): Behavior[Command] =
PersistentEntity[Command, String, String](
entityTypeKey = typeKey,
entityId = entityId,
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s)
Effect.persist(s)
val recoveryCompletedProbes = new ConcurrentHashMap[String, ActorRef[String]]
case cmd @ AddWithConfirmation(s)
Effect.persist(s)
.thenReply(cmd)(newState Done)
// Need this to be able to send the PoisonPill from the outside, simulating rebalance before recovery and such.
// Promise completed by the actor when it's started.
val entityActorRefs = new ConcurrentHashMap[String, Promise[ActorRef[Any]]]
case Get(replyTo)
replyTo ! s"$entityId:$state"
Effect.none
case StopPlz Effect.stop()
},
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
def persistentEntity(entityId: String, shard: ActorRef[ShardCommand]): Behavior[Command] = {
Behaviors.setup { ctx
entityActorRefs.get(entityId) match {
case null
case promise promise.trySuccess(ctx.self.upcast)
}
PersistentEntity[Command, String, String](
entityTypeKey = typeKey,
entityId = entityId,
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s)
Effect.persist(s)
case cmd @ AddWithConfirmation(s)
Effect.persist(s)
.thenReply(cmd)(newState Done)
case Get(replyTo)
replyTo ! s"$entityId:$state"
Effect.none
case cmd @ PassivateAndPersist(s)
shard ! Passivate(ctx.self)
Effect.persist(s)
.thenReply(cmd)(newState Done)
case Echo(msg, replyTo)
Effect.none.thenRun(_ replyTo ! msg)
case Block(latch)
latch.await(5, TimeUnit.SECONDS)
Effect.none
},
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
.onRecoveryCompleted { state
ctx.log.debug("onRecoveryCompleted: [{}]", state)
recoveryCompletedProbes.get(entityId) match {
case null ctx.log.debug("no recoveryCompletedProbe for [{}]", entityId)
case p p ! s"recoveryCompleted:$state"
}
}
}
}
}
class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike {
import ClusterShardingPersistenceSpec._
private var _entityId = 0
def nextEntityId(): String = {
_entityId += 1
_entityId.toString
}
private def awaitEntityTerminatedAndRemoved(ref: ActorRef[_], entityId: String): Unit = {
val p = TestProbe[Any]()
p.expectTerminated(ref, p.remainingOrDefault)
// also make sure that the entity is removed from the Shard before continuing
// FIXME rewrite this with Typed API when region queries are supported
import akka.actor.typed.scaladsl.adapter._
val regionStateProbe = TestProbe[CurrentShardRegionState]()
val untypedRegion = UntypedClusterSharding(system.toUntyped)
regionStateProbe.awaitAssert {
untypedRegion.shardRegion(typeKey.name).tell(GetShardRegionState, regionStateProbe.ref.toUntyped)
regionStateProbe.expectMessageType[CurrentShardRegionState].shards.foreach { shardState
shardState.entityIds should not contain entityId
}
}
}
"Typed cluster sharding with persistent actor" must {
ClusterSharding(system).start(Entity(
typeKey,
ctx persistentEntity(ctx.entityId),
StopPlz
))
ctx persistentEntity(ctx.entityId, ctx.shard)))
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in {
val entityId = nextEntityId()
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, "123")
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
ref ! Add("a")
ref ! Add("b")
ref ! Add("c")
ref ! Get(p.ref)
p.expectMessage("123:a|b|c")
p.expectMessage("1:a|b|c")
}
"support ask with thenReply" in {
val entityId = nextEntityId()
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, "456")
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
val done1 = ref ? AddWithConfirmation("a")
done1.futureValue should ===(Done)
@ -95,7 +169,244 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
done2.futureValue should ===(Done)
ref ! Get(p.ref)
p.expectMessage("456:a|b")
p.expectMessage("2:a|b")
}
"handle PoisonPill after persist effect" in {
val entityId = nextEntityId()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val p1 = TestProbe[Done]()
val ref = ClusterSharding(system).entityRefFor(typeKey, entityId)
(1 to 10).foreach { n
ref ! PassivateAndPersist(n.toString)(p1.ref)
// recoveryCompleted each time, verifies that it was actually stopped
recoveryProbe.expectMessage("recoveryCompleted:" + (1 until n).map(_.toString).mkString("|"))
p1.expectMessage(Done)
}
val p2 = TestProbe[String]()
ref ! Get(p2.ref)
p2.expectMessage(entityId + ":" + (1 to 10).map(_.toString).mkString("|"))
}
"handle PoisonPill after several stashed commands that persist" in {
val entityId = nextEntityId()
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val addProbe = TestProbe[Done]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
// not sending via the EntityRef because that would make the test racy
// these are stashed, since before the PoisonPill
val latch = new CountDownLatch(1)
actorRef ! Block(latch)
actorRef ! AddWithConfirmation("b")(addProbe.ref)
actorRef ! AddWithConfirmation("c")(addProbe.ref)
actorRef ! PoisonPill
// those messages should be ignored since they happen after the PoisonPill,
actorRef ! AddWithConfirmation("d")(addProbe.ref)
actorRef ! AddWithConfirmation("e")(addProbe.ref)
// now we have enqueued the message sequence and start processing them
latch.countDown()
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
val p2 = TestProbe[String]()
entityRef ! AddWithConfirmation("f")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a|b|c")
p2.expectMessage(entityId + ":a|b|c|f")
}
"handle PoisonPill after several stashed commands that DON'T persist" in {
val entityId = nextEntityId()
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val addProbe = TestProbe[Done]()
val echoProbe = TestProbe[String]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
// not sending via the EntityRef because that would make the test racy
// these are stashed, since before the PoisonPill
val latch = new CountDownLatch(1)
actorRef ! Block(latch)
actorRef ! AddWithConfirmation("b")(addProbe.ref)
actorRef ! AddWithConfirmation("c")(addProbe.ref)
actorRef ! Echo("echo-1", echoProbe.ref)
actorRef ! PoisonPill
// those messages should be ignored since they happen after the PoisonPill,
actorRef ! Echo("echo-2", echoProbe.ref)
actorRef ! AddWithConfirmation("d")(addProbe.ref)
actorRef ! AddWithConfirmation("e")(addProbe.ref)
actorRef ! Echo("echo-3", echoProbe.ref)
// now we have enqueued the message sequence and start processing them
latch.countDown()
echoProbe.expectMessage("echo-1")
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
val p2 = TestProbe[String]()
entityRef ! Echo("echo-4", echoProbe.ref)
echoProbe.expectMessage("echo-4")
entityRef ! AddWithConfirmation("f")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a|b|c")
p2.expectMessage(entityId + ":a|b|c|f")
}
"handle PoisonPill when stash empty" in {
val entityId = nextEntityId()
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val addProbe = TestProbe[Done]()
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
// this will wakeup the entity, and complete the entityActorRefPromise
entityRef ! AddWithConfirmation("a")(addProbe.ref)
addProbe.expectMessage(Done)
recoveryProbe.expectMessage("recoveryCompleted:")
// now we know that it's in EventSourcedRunning with no stashed commands
val actorRef = actorRefPromise.future.futureValue
// not sending via the EntityRef because that would make the test racy
val latch = new CountDownLatch(1)
actorRef ! Block(latch)
actorRef ! PoisonPill
// those messages should be ignored since they happen after the PoisonPill,
actorRef ! AddWithConfirmation("b")(addProbe.ref)
// now we have enqueued the message sequence and start processing them
latch.countDown()
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
val p2 = TestProbe[String]()
entityRef ! AddWithConfirmation("c")(addProbe.ref)
entityRef ! Get(p2.ref)
recoveryProbe.expectMessage("recoveryCompleted:a")
p2.expectMessage(entityId + ":a|c")
}
"handle PoisonPill before recovery completed without stashed commands" in {
val entityId = nextEntityId()
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val ignoreFirstEchoProbe = TestProbe[String]()
val echoProbe = TestProbe[String]()
// first echo will wakeup the entity, and complete the entityActorRefPromise
// ignore the first echo reply since it may be racy with the PoisonPill
entityRef ! Echo("echo-1", ignoreFirstEchoProbe.ref)
// not using actorRefPromise.future.futureValue because it's polling (slow) and want to run this before
// recovery completed, to exercise that scenario
implicit val ec: ExecutionContext = testKit.system.executionContext
val poisonSent = actorRefPromise.future.map { actorRef
// not sending via the EntityRef because that would make the test racy
actorRef ! PoisonPill
actorRef
}
val actorRef = poisonSent.futureValue
recoveryProbe.expectMessage("recoveryCompleted:")
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
entityRef ! Echo("echo-2", echoProbe.ref)
echoProbe.expectMessage("echo-2")
recoveryProbe.expectMessage("recoveryCompleted:")
}
"handle PoisonPill before recovery completed with stashed commands" in {
val entityId = nextEntityId()
val actorRefPromise = Promise[ActorRef[Any]]()
entityActorRefs.put(entityId, actorRefPromise)
val recoveryProbe = TestProbe[String]()
recoveryCompletedProbes.put(entityId, recoveryProbe.ref)
val entityRef = ClusterSharding(system).entityRefFor(typeKey, entityId)
val addProbe = TestProbe[Done]()
val ignoreFirstEchoProbe = TestProbe[String]()
val echoProbe = TestProbe[String]()
// first echo will wakeup the entity, and complete the entityActorRefPromise
// ignore the first echo reply since it may be racy with the PoisonPill
entityRef ! Echo("echo-1", ignoreFirstEchoProbe.ref)
// not using actorRefPromise.future.futureValue because it's polling (slow) and want to run this before
// recovery completed, to exercise that scenario
implicit val ec: ExecutionContext = testKit.system.executionContext
val poisonSent = actorRefPromise.future.map { actorRef
// not sending via the EntityRef because that would make the test racy
// these are stashed, since before the PoisonPill
actorRef ! Echo("echo-2", echoProbe.ref)
actorRef ! AddWithConfirmation("a")(addProbe.ref)
actorRef ! AddWithConfirmation("b")(addProbe.ref)
actorRef ! Echo("echo-3", echoProbe.ref)
actorRef ! PoisonPill
// those messages should be ignored since they happen after the PoisonPill,
actorRef ! Echo("echo-4", echoProbe.ref)
actorRef ! AddWithConfirmation("c")(addProbe.ref)
actorRef
}
val actorRef = poisonSent.futureValue
recoveryProbe.expectMessage("recoveryCompleted:")
echoProbe.expectMessage("echo-2")
echoProbe.expectMessage("echo-3")
addProbe.expectMessage(Done)
addProbe.expectMessage(Done)
// wake up again
awaitEntityTerminatedAndRemoved(actorRef, entityId)
entityRef ! Echo("echo-5", echoProbe.ref)
echoProbe.expectMessage("echo-5")
recoveryProbe.expectMessage("recoveryCompleted:a|b")
}
}
}

View file

@ -18,6 +18,7 @@ import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.ActorSystem
import akka.actor.typed.PostStop
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MemberStatus
@ -148,14 +149,14 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
}
private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[Done]] = None) =
private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[String]] = None) =
Behaviors.receive[TestProtocol] {
case (ctx, PassivatePlz())
shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
case (_, StopPlz())
stopProbe.foreach(_ ! Done)
stopProbe.foreach(_ ! "StopPlz")
Behaviors.stopped
case (ctx, WhoAreYou(replyTo))
@ -166,6 +167,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
case (_, ReplyPlz(toMe))
toMe ! "Hello!"
Behaviors.same
}.receiveSignal {
case (_, PostStop)
stopProbe.foreach(_ ! "PostStop")
Behaviors.same
}
private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
@ -185,35 +190,35 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
typeKey,
ctx behavior(ctx.shard),
StopPlz()))
ctx behavior(ctx.shard))
.withStopMessage(StopPlz()))
private val shardingRef2 = sharding2.start(Entity(
typeKey,
ctx behavior(ctx.shard),
StopPlz()))
ctx behavior(ctx.shard))
.withStopMessage(StopPlz()))
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(Entity(
typeKey2,
ctx behaviorWithId(ctx.shard),
IdStopPlz())
ctx behaviorWithId(ctx.shard))
.withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
.withStopMessage(IdStopPlz())
)
private val shardingRef4 = sharding2.start(Entity(
typeKey2,
ctx behaviorWithId(ctx.shard),
IdStopPlz())
ctx behaviorWithId(ctx.shard))
.withMessageExtractor(
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
.withStopMessage(IdStopPlz())
)
def totalEntityCount1(): Int = {
@ -258,33 +263,55 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
}
}
"be able to passivate" in {
val stopProbe = TestProbe[Done]()
"be able to passivate with custom stop message" in {
val stopProbe = TestProbe[String]()
val p = TestProbe[String]()
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
typeKey3,
ctx behavior(ctx.shard, Some(stopProbe.ref)),
StopPlz()))
ctx behavior(ctx.shard, Some(stopProbe.ref)))
.withStopMessage(StopPlz()))
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
shardingRef3 ! ShardingEnvelope(s"test1", PassivatePlz())
stopProbe.expectMessage(Done)
stopProbe.expectMessage("StopPlz")
stopProbe.expectMessage("PostStop")
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
}
"be able to passivate with PoisonPill" in {
val stopProbe = TestProbe[String]()
val p = TestProbe[String]()
val typeKey4 = EntityTypeKey[TestProtocol]("passivate-test-poison")
val shardingRef4: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
typeKey4,
ctx behavior(ctx.shard, Some(stopProbe.ref))))
// no StopPlz stopMessage
shardingRef4 ! ShardingEnvelope(s"test4", ReplyPlz(p.ref))
p.expectMessage("Hello!")
shardingRef4 ! ShardingEnvelope(s"test4", PassivatePlz())
// no StopPlz
stopProbe.expectMessage("PostStop")
shardingRef4 ! ShardingEnvelope(s"test4", ReplyPlz(p.ref))
p.expectMessage("Hello!")
}
"fail if starting sharding for already used typeName, but with a different type" in {
// sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] {
sharding.start(Entity(
EntityTypeKey[IdTestProtocol]("envelope-shard"),
ctx behaviorWithId(ctx.shard),
IdStopPlz()))
ctx behaviorWithId(ctx.shard))
.withStopMessage(IdStopPlz()))
}
ex.getMessage should include("already started")
@ -349,8 +376,8 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
sharding.start(Entity(
ignorantKey,
_ Behaviors.ignore[TestProtocol],
StopPlz()))
_ Behaviors.ignore[TestProtocol])
.withStopMessage(StopPlz()))
val ref = sharding.entityRefFor(ignorantKey, "sloppy")

View file

@ -25,8 +25,7 @@ object HelloWorldPersistentEntityExample {
sharding.start(Entity(
typeKey = HelloWorld.entityTypeKey,
createBehavior = entityContext HelloWorld.persistentEntity(entityContext.entityId),
stopMessage = HelloWorld.Passivate))
createBehavior = entityContext HelloWorld.persistentEntity(entityContext.entityId)))
private implicit val askTimeout: Timeout = Timeout(5.seconds)
@ -50,7 +49,6 @@ object HelloWorldPersistentEntityExample {
// Command
trait Command
final case class Greet(whom: String)(val replyTo: ActorRef[Greeting]) extends Command
case object Passivate extends Command
// Response
final case class Greeting(whom: String, numberOfPeople: Int)
@ -68,7 +66,6 @@ object HelloWorldPersistentEntityExample {
(_, cmd)
cmd match {
case cmd: Greet greet(cmd)
case Passivate passivate()
}
}
@ -76,9 +73,6 @@ object HelloWorldPersistentEntityExample {
Effect.persist(Greeted(cmd.whom))
.thenRun(state cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople))
private def passivate(): Effect[Greeted, KnownPeople] =
Effect.stop()
private val eventHandler: (KnownPeople, Greeted) KnownPeople = {
(state, evt) state.add(evt.whom)
}

View file

@ -37,9 +37,7 @@ class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(He
sharding.start(Entity(
HelloWorld.entityTypeKey,
ctx HelloWorld.persistentEntity(ctx.entityId),
HelloWorld.Passivate
))
ctx HelloWorld.persistentEntity(ctx.entityId)))
}
"HelloWorld example" must {

View file

@ -10,7 +10,7 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.Entity
import docs.akka.persistence.typed.BlogPostExample
import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost }
import docs.akka.persistence.typed.BlogPostExample.BlogCommand
object ShardingCompileOnlySpec {
@ -29,7 +29,6 @@ object ShardingCompileOnlySpec {
trait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand
//#counter-messages
//#counter
@ -41,8 +40,6 @@ object ShardingCompileOnlySpec {
case GetValue(replyTo)
replyTo ! value
Behaviors.same
case GoodByeCounter
Behaviors.stopped
}
//#counter
@ -51,8 +48,7 @@ object ShardingCompileOnlySpec {
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(Entity(
typeKey = TypeKey,
createBehavior = ctx counter(ctx.entityId, 0),
stopMessage = GoodByeCounter))
createBehavior = ctx counter(ctx.entityId, 0)))
//#start
//#send
@ -70,13 +66,13 @@ object ShardingCompileOnlySpec {
ClusterSharding(system).start(Entity(
typeKey = BlogTypeKey,
createBehavior = ctx behavior(ctx.entityId),
stopMessage = PassivatePost))
createBehavior = ctx behavior(ctx.entityId)))
//#persistence
//#counter-passivate
case object Idle extends CounterCommand
case object GoodByeCounter extends CounterCommand
def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
Behaviors.setup { ctx
@ -104,8 +100,8 @@ object ShardingCompileOnlySpec {
sharding.start(Entity(
typeKey = TypeKey,
createBehavior = ctx counter2(ctx.shard, ctx.entityId),
stopMessage = GoodByeCounter))
createBehavior = ctx counter2(ctx.shard, ctx.entityId))
.withStopMessage(GoodByeCounter))
//#counter-passivate
}

View file

@ -105,10 +105,11 @@ If a message is already enqueued to the entity when it stops itself the enqueued
in the mailbox will be dropped. To support graceful passivation without losing such
messages the entity actor can send `ClusterSharding.Passivate` to to the
@scala[`ActorRef[ShardCommand]`]@java[`ActorRef<ShardCommand>`] that was passed in to
the factory method when creating the entity. The specified `handOffStopMessage` message
will be sent back to the entity, which is then supposed to stop itself. Incoming messages
will be buffered by the `Shard` between reception of `Passivate` and termination of the
entity. Such buffered messages are thereafter delivered to a new incarnation of the entity.
the factory method when creating the entity. The optional `stopMessage` message
will be sent back to the entity, which is then supposed to stop itself, otherwise it will
be stopped automatically. Incoming messages will be buffered by the `Shard` between reception
of `Passivate` and termination of the entity. Such buffered messages are thereafter delivered
to a new incarnation of the entity.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate }
@ -116,6 +117,10 @@ Scala
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start }
Note that in the above example the `stopMessage` is specified as `GoodByeCounter`. That message will be sent to
the entity when it's supposed to stop itself due to rebalance or passivation. If the `stopMessage` is not defined
it will be stopped automatically without receiving a specific message. It can be useful to define a custom stop
message if the entity needs to perform some asynchronous cleanup or interactions before stopping.
### Automatic Passivation
@ -123,5 +128,5 @@ The entities can be configured to be automatically passivated if they haven't re
a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting,
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable
time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages
to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity.
By default automatic passivation is disabled.
to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity.
By default automatic passivation is disabled.

View file

@ -12,9 +12,10 @@ import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.internal.EventsourcedBehavior._
import scala.util.control.NonFatal
import akka.actor.typed.internal.PoisonPill
/***
* INTERNAL API
*
@ -37,7 +38,8 @@ private[persistence] object EventsourcedReplayingEvents {
seqNr: Long,
state: State,
eventSeenInInterval: Boolean,
toSeqNr: Long
toSeqNr: Long,
receivedPoisonPill: Boolean
)
def apply[C, E, S](
@ -69,9 +71,11 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
case JournalResponse(r) onJournalResponse(state, r)
case SnapshotterResponse(r) onSnapshotterResponse(r)
case RecoveryTickEvent(snap) onRecoveryTick(state, snap)
case cmd: IncomingCommand[C] onCommand(cmd)
case cmd: IncomingCommand[C] onCommand(cmd, state)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
}.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) stay(state.copy(receivedPoisonPill = true))
})
private def onJournalResponse(
state: ReplayingState[S],
@ -106,10 +110,16 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
}
}
private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = {
private def onCommand(cmd: InternalProtocol, state: ReplayingState[S]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
stash(cmd)
Behaviors.same
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing) setup.log.debug(
"Discarding message [{}], because actor is to be stopped", cmd)
Behaviors.unhandled
} else {
stash(cmd)
Behaviors.same
}
}
protected def onRecoveryTick(state: ReplayingState[S], snapshot: Boolean): Behavior[InternalProtocol] =
@ -157,12 +167,16 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
tryReturnRecoveryPermit("replay completed successfully")
setup.recoveryCompleted(state.state)
val running = EventsourcedRunning[C, E, S](
setup,
EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state)
)
if (state.receivedPoisonPill && isStashEmpty)
Behaviors.stopped
else {
val running = EventsourcedRunning[C, E, S](
setup,
EventsourcedRunning.EventsourcedState[S](state.seqNr, state.state, state.receivedPoisonPill)
)
tryUnstash(running)
tryUnstash(running)
}
} finally {
setup.cancelRecoveryTimer()
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.annotation.InternalApi
import akka.persistence.SnapshotProtocol.{ LoadSnapshotFailed, LoadSnapshotResult }
import akka.persistence._
@ -31,8 +32,8 @@ import akka.persistence.typed.internal.EventsourcedBehavior._
@InternalApi
private[akka] object EventsourcedReplayingSnapshot {
def apply[C, E, S](setup: EventsourcedSetup[C, E, S]): Behavior[InternalProtocol] =
new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior()
def apply[C, E, S](setup: EventsourcedSetup[C, E, S], receivedPoisonPill: Boolean): Behavior[InternalProtocol] =
new EventsourcedReplayingSnapshot(setup.setMdc(MDC.ReplayingSnapshot)).createBehavior(receivedPoisonPill)
}
@ -40,19 +41,26 @@ private[akka] object EventsourcedReplayingSnapshot {
private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: EventsourcedSetup[C, E, S])
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
def createBehavior(): Behavior[InternalProtocol] = {
def createBehavior(receivedPoisonPillInPreviousPhase: Boolean): Behavior[InternalProtocol] = {
// protect against snapshot stalling forever because of journal overloaded and such
setup.startRecoveryTimer(snapshot = true)
loadSnapshot(setup.recovery.fromSnapshot, setup.recovery.toSequenceNr)
Behaviors.receiveMessage[InternalProtocol] {
case SnapshotterResponse(r) onSnapshotterResponse(r)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot)
case cmd: IncomingCommand[C] onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop)
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
Behaviors.receiveMessage[InternalProtocol] {
case SnapshotterResponse(r) onSnapshotterResponse(r, receivedPoisonPill)
case JournalResponse(r) onJournalResponse(r)
case RecoveryTickEvent(snapshot) onRecoveryTick(snapshot)
case cmd: IncomingCommand[C]
if (receivedPoisonPill) Behaviors.unhandled
else onCommand(cmd)
case RecoveryPermitGranted Behaviors.unhandled // should not happen, we already have the permit
}.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) stay(receivedPoisonPill = true)
})
}
stay(receivedPoisonPillInPreviousPhase)
}
/**
@ -94,7 +102,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
Behaviors.unhandled
}
def onSnapshotterResponse(response: SnapshotProtocol.Response): Behavior[InternalProtocol] = {
def onSnapshotterResponse(response: SnapshotProtocol.Response, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
response match {
case LoadSnapshotResult(sso, toSnr)
var state: S = setup.emptyState
@ -106,7 +114,7 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
case None 0 // from the beginning please
}
becomeReplayingEvents(state, seqNr, toSnr)
becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill)
case LoadSnapshotFailed(cause)
onRecoveryFailure(cause, event = None)
@ -116,12 +124,12 @@ private[akka] class EventsourcedReplayingSnapshot[C, E, S](override val setup: E
}
}
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long): Behavior[InternalProtocol] = {
private def becomeReplayingEvents(state: S, lastSequenceNr: Long, toSnr: Long, receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
setup.cancelRecoveryTimer()
EventsourcedReplayingEvents[C, E, S](
setup,
EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr)
EventsourcedReplayingEvents.ReplayingState(lastSequenceNr, state, eventSeenInInterval = false, toSnr, receivedPoisonPill)
)
}

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
@ -36,21 +37,36 @@ private[akka] class EventsourcedRequestingRecoveryPermit[C, E, S](override val s
// request a permit, as only once we obtain one we can start replaying
requestRecoveryPermit()
Behaviors.receiveMessage[InternalProtocol] {
case InternalProtocol.RecoveryPermitGranted
becomeReplaying()
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
Behaviors.receiveMessage[InternalProtocol] {
case InternalProtocol.RecoveryPermitGranted
becomeReplaying(receivedPoisonPill)
case other
stash(other)
Behaviors.same
case _ if receivedPoisonPill
Behaviors.unhandled
case other
if (receivedPoisonPill) {
if (setup.settings.logOnStashing) setup.log.debug(
"Discarding message [{}], because actor is to be stopped", other)
Behaviors.unhandled
} else {
stash(other)
Behaviors.same
}
}.receiveSignal {
case (_, PoisonPill) stay(receivedPoisonPill = true)
}
}
stay(receivedPoisonPill = false)
}
private def becomeReplaying(): Behavior[InternalProtocol] = {
private def becomeReplaying(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
setup.log.debug(s"Initializing snapshot recovery: {}", setup.recovery)
setup.holdingRecoveryPermit = true
EventsourcedReplayingSnapshot(setup)
EventsourcedReplayingSnapshot(setup, receivedPoisonPill)
}
}

View file

@ -16,11 +16,13 @@ import akka.persistence.typed.{ Callback, EventRejectedException, SideEffect, St
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.scaladsl.Effect
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.{ Failure, Success }
import akka.actor.typed.Signal
import akka.actor.typed.internal.PoisonPill
/**
* INTERNAL API
*
@ -42,8 +44,9 @@ import scala.util.{ Failure, Success }
private[akka] object EventsourcedRunning {
final case class EventsourcedState[State](
seqNr: Long,
state: State
seqNr: Long,
state: State,
receivedPoisonPill: Boolean
) {
def nextSequenceNr(): EventsourcedState[State] =
@ -158,6 +161,10 @@ private[akka] object EventsourcedRunning {
case IncomingCommand(c: C @unchecked) onCommand(state, c)
case SnapshotterResponse(r) onSnapshotterResponse(r, Behaviors.same)
case _ Behaviors.unhandled
}.receiveSignal {
case (_, PoisonPill)
if (isStashEmpty) Behaviors.stopped
else handlingCommands(state.copy(receivedPoisonPill = true))
}
}
@ -194,8 +201,14 @@ private[akka] object EventsourcedRunning {
}
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
stash(cmd)
this
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing) setup.log.debug(
"Discarding message [{}], because actor is to be stopped", cmd)
Behaviors.unhandled
} else {
stash(cmd)
this
}
}
final def onJournalResponse(
@ -246,6 +259,13 @@ private[akka] object EventsourcedRunning {
}
}
override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = {
case PoisonPill
// wait for journal responses before stopping
state = state.copy(receivedPoisonPill = true)
this
}
}
private def onSnapshotterResponse(
@ -285,7 +305,10 @@ private[akka] object EventsourcedRunning {
if (stopped) res = Behaviors.stopped
}
res
if (state.receivedPoisonPill && isStashEmpty)
Behaviors.stopped
else
res
}
def applySideEffect(effect: SideEffect[S], state: EventsourcedState[S]): Behavior[InternalProtocol] = effect match {

View file

@ -24,6 +24,8 @@ private[akka] trait EventsourcedStashManagement[C, E, S] {
private def stashBuffer: StashBuffer[InternalProtocol] = setup.internalStash
protected def isStashEmpty: Boolean = stashBuffer.isEmpty
protected def stash(msg: InternalProtocol): Unit = {
if (setup.settings.logOnStashing) setup.log.debug("Stashing message: [{}]", msg)

View file

@ -133,8 +133,6 @@ public class BlogPostExample {
this.replyTo = replyTo;
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;
final String title;
@ -204,8 +202,7 @@ public class BlogPostExample {
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> commonCommandHandler() {
return commandHandlerBuilder(BlogState.class)
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
}
//#post-added-command-handler

View file

@ -104,9 +104,6 @@ public class NullBlogState {
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;
@ -128,8 +125,7 @@ public class NullBlogState {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
});
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> postCommandHandler() {
@ -147,8 +143,7 @@ public class NullBlogState {
cmd.replyTo.tell(state.postContent);
return Effect().none();
})
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
}
public BlogBehavior(PersistenceId persistenceId) {

View file

@ -104,9 +104,6 @@ public class OptionalBlogState {
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;
@ -128,8 +125,7 @@ public class OptionalBlogState {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
});
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, Optional<BlogState>, Optional<BlogState>> postCommandHandler() {
@ -147,8 +143,7 @@ public class OptionalBlogState {
cmd.replyTo.tell(state.get().postContent);
return Effect().none();
})
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
}
public BlogBehavior(PersistenceId persistenceId) {

View file

@ -51,7 +51,6 @@ object BlogPostExample {
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
final case object PassivatePost extends BlogCommand
final case class PostContent(postId: String, title: String, body: String)
//#commands
@ -69,9 +68,8 @@ object BlogPostExample {
state match {
case BlankState command match {
case cmd: AddPost addPost(cmd)
case PassivatePost Effect.stop()
case _ Effect.unhandled
case cmd: AddPost addPost(cmd)
case _ Effect.unhandled
}
case draftState: DraftState command match {
@ -79,12 +77,10 @@ object BlogPostExample {
case Publish(replyTo) publish(draftState, replyTo)
case GetPost(replyTo) getPost(draftState, replyTo)
case _: AddPost Effect.unhandled
case PassivatePost Effect.stop()
}
case publishedState: PublishedState command match {
case GetPost(replyTo) getPost(publishedState, replyTo)
case PassivatePost Effect.stop()
case _ Effect.unhandled
}
}