diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java new file mode 100644 index 0000000000..eb5c3a6136 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; + +public class ShardingReplyCompileOnlyTest { + + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ClusterSharding sharding = ClusterSharding.get(system); + + // #sharded-response + // a sharded actor that needs counter updates + EntityTypeKey typeKey = EntityTypeKey.create(Command.class, "example-sharded-response"); + + public interface Command {} + + class NewCount implements Command { + public final long value; + + NewCount(long value) { + this.value = value; + } + } + + // a sharded counter that sends responses to another sharded actor + interface CounterCommand {} + + enum Increment implements CounterCommand { + INSTANCE + } + + class GetValue implements CounterCommand { + public final String replyToEntityId; + + GetValue(String replyToEntityId) { + this.replyToEntityId = replyToEntityId; + } + } + + public Behavior counter(int value) { + return Behaviors.receive(CounterCommand.class) + .onMessage(Increment.class, msg -> counter(value + 1)) + .onMessage( + GetValue.class, + msg -> { + sharding.entityRefFor(typeKey, msg.replyToEntityId).tell(new NewCount(value)); + return Behaviors.same(); + }) + .build(); + } + // #sharded-response +} diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index 09f58785b6..6eba97e940 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -12,6 +12,7 @@ import com.github.ghik.silencer.silent import docs.akka.persistence.typed.BlogPostExample import docs.akka.persistence.typed.BlogPostExample.BlogCommand +@silent object ShardingCompileOnlySpec { val system = ActorSystem(Behaviors.empty, "Sharding") @@ -71,7 +72,6 @@ object ShardingCompileOnlySpec { case object Idle extends CounterCommand case object GoodByeCounter extends CounterCommand - @silent def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = { Behaviors.setup { ctx => def become(value: Int): Behavior[CounterCommand] = @@ -100,4 +100,30 @@ object ShardingCompileOnlySpec { .withStopMessage(GoodByeCounter)) //#counter-passivate + def counterWithResponseToShardedActor(): Unit = { + + //#sharded-response + // a sharded actor that needs counter updates + trait Command + final case class NewCount(count: Long) extends Command + val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response") + + // a sharded counter that sends responses to another sharded actor + trait CounterCommand + case object Increment extends CounterCommand + final case class GetValue(replyToEntityId: String) extends CounterCommand + + def counter(value: Long): Behavior[CounterCommand] = + Behaviors.receiveMessage[CounterCommand] { + case Increment => + counter(value + 1) + case GetValue(replyToEntityId) => + sharding.entityRefFor(entityTypeKey, replyToEntityId) ! NewCount(value) + Behaviors.same + } + //#sharded-response + + counter(1) + } + } diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index bb2bf17058..8382ac9ad4 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -122,3 +122,5 @@ time to keep the actor alive. Note that only messages sent through sharding are to the `ActorRef` or messages that the actor sends to itself are not counted in this activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled. + + diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index c3852cfdbe..dd77101edd 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -245,6 +245,24 @@ This can be used with any type of `Behavior`, including `receive`, `receiveMessa * The `TimerScheduler` is bound to the lifecycle of the actor that owns it and it's cancelled automatically when the actor is stopped. * `Behaviors.withTimers` can also be used inside `Behaviors.supervise` and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation. +## Responding to a sharded actor + +The normal pattern for expecting a reply is to include an @apidoc[akka.actor.typed.ActorRef] in the message, typically a message adapter. This can be used +for a sharded actor but if @scala[`ctx.self`]@java[`ctx.getSelf()`] is sent and the sharded actor is moved or passivated then the reply +will sent to dead letters. + +An alternative is to send the `entityId` in the message and have the reply sent via sharding: + +Scala +: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharded-response } + +Java +: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java) { #sharded-response } + +A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey` +could be included in the message if it is not known statically. + + ### Schedule periodically Scheduling of recurring messages can have two different characteristics: