Pattern for responding to a sharded actor (#27077)
This commit is contained in:
parent
1dfe55fcc3
commit
7c151a4279
4 changed files with 106 additions and 1 deletions
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.akka.cluster.sharding.typed;
|
||||
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
|
||||
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
|
||||
|
||||
public class ShardingReplyCompileOnlyTest {
|
||||
|
||||
ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample");
|
||||
ClusterSharding sharding = ClusterSharding.get(system);
|
||||
|
||||
// #sharded-response
|
||||
// a sharded actor that needs counter updates
|
||||
EntityTypeKey<Command> typeKey = EntityTypeKey.create(Command.class, "example-sharded-response");
|
||||
|
||||
public interface Command {}
|
||||
|
||||
class NewCount implements Command {
|
||||
public final long value;
|
||||
|
||||
NewCount(long value) {
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
// a sharded counter that sends responses to another sharded actor
|
||||
interface CounterCommand {}
|
||||
|
||||
enum Increment implements CounterCommand {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
class GetValue implements CounterCommand {
|
||||
public final String replyToEntityId;
|
||||
|
||||
GetValue(String replyToEntityId) {
|
||||
this.replyToEntityId = replyToEntityId;
|
||||
}
|
||||
}
|
||||
|
||||
public Behavior<CounterCommand> counter(int value) {
|
||||
return Behaviors.receive(CounterCommand.class)
|
||||
.onMessage(Increment.class, msg -> counter(value + 1))
|
||||
.onMessage(
|
||||
GetValue.class,
|
||||
msg -> {
|
||||
sharding.entityRefFor(typeKey, msg.replyToEntityId).tell(new NewCount(value));
|
||||
return Behaviors.same();
|
||||
})
|
||||
.build();
|
||||
}
|
||||
// #sharded-response
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ import com.github.ghik.silencer.silent
|
|||
import docs.akka.persistence.typed.BlogPostExample
|
||||
import docs.akka.persistence.typed.BlogPostExample.BlogCommand
|
||||
|
||||
@silent
|
||||
object ShardingCompileOnlySpec {
|
||||
|
||||
val system = ActorSystem(Behaviors.empty, "Sharding")
|
||||
|
|
@ -71,7 +72,6 @@ object ShardingCompileOnlySpec {
|
|||
case object Idle extends CounterCommand
|
||||
case object GoodByeCounter extends CounterCommand
|
||||
|
||||
@silent
|
||||
def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
|
||||
Behaviors.setup { ctx =>
|
||||
def become(value: Int): Behavior[CounterCommand] =
|
||||
|
|
@ -100,4 +100,30 @@ object ShardingCompileOnlySpec {
|
|||
.withStopMessage(GoodByeCounter))
|
||||
//#counter-passivate
|
||||
|
||||
def counterWithResponseToShardedActor(): Unit = {
|
||||
|
||||
//#sharded-response
|
||||
// a sharded actor that needs counter updates
|
||||
trait Command
|
||||
final case class NewCount(count: Long) extends Command
|
||||
val entityTypeKey: EntityTypeKey[Command] = EntityTypeKey[Command]("example-sharded-response")
|
||||
|
||||
// a sharded counter that sends responses to another sharded actor
|
||||
trait CounterCommand
|
||||
case object Increment extends CounterCommand
|
||||
final case class GetValue(replyToEntityId: String) extends CounterCommand
|
||||
|
||||
def counter(value: Long): Behavior[CounterCommand] =
|
||||
Behaviors.receiveMessage[CounterCommand] {
|
||||
case Increment =>
|
||||
counter(value + 1)
|
||||
case GetValue(replyToEntityId) =>
|
||||
sharding.entityRefFor(entityTypeKey, replyToEntityId) ! NewCount(value)
|
||||
Behaviors.same
|
||||
}
|
||||
//#sharded-response
|
||||
|
||||
counter(1)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,3 +122,5 @@ time to keep the actor alive. Note that only messages sent through sharding are
|
|||
to the `ActorRef` or messages that the actor sends to itself are not counted in this activity.
|
||||
Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`.
|
||||
It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled.
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -245,6 +245,24 @@ This can be used with any type of `Behavior`, including `receive`, `receiveMessa
|
|||
* The `TimerScheduler` is bound to the lifecycle of the actor that owns it and it's cancelled automatically when the actor is stopped.
|
||||
* `Behaviors.withTimers` can also be used inside `Behaviors.supervise` and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation.
|
||||
|
||||
## Responding to a sharded actor
|
||||
|
||||
The normal pattern for expecting a reply is to include an @apidoc[akka.actor.typed.ActorRef] in the message, typically a message adapter. This can be used
|
||||
for a sharded actor but if @scala[`ctx.self`]@java[`ctx.getSelf()`] is sent and the sharded actor is moved or passivated then the reply
|
||||
will sent to dead letters.
|
||||
|
||||
An alternative is to send the `entityId` in the message and have the reply sent via sharding:
|
||||
|
||||
Scala
|
||||
: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharded-response }
|
||||
|
||||
Java
|
||||
: @@snip [sharded.response](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingReplyCompileOnlyTest.java) { #sharded-response }
|
||||
|
||||
A disadvantage is that a message adapter can't be used so the response has to be in the protocol of the actor being responded to. Additionally the `EntityTypeKey`
|
||||
could be included in the message if it is not known statically.
|
||||
|
||||
|
||||
### Schedule periodically
|
||||
|
||||
Scheduling of recurring messages can have two different characteristics:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue