diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java index b60e235c53..206507c194 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java @@ -21,9 +21,18 @@ import akka.actor.typed.javadsl.Routers; import akka.actor.typed.receptionist.Receptionist; import akka.actor.typed.receptionist.ServiceKey; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; + +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + // #pool -public class RouterTest { +public class RouterTest extends JUnitSuite { + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); static // #routee class Worker { @@ -54,6 +63,46 @@ public class RouterTest { } } + static class Proxy { + + public final ServiceKey registeringKey = + ServiceKey.create(Message.class, "aggregator-key"); + + public String mapping(Message message) { + return message.getId(); + } + + static class Message { + + public Message(String id, String content) { + this.id = id; + this.content = content; + } + + private String content; + private String id; + + public final String getContent() { + return content; + } + + public final String getId() { + return id; + } + } + + static Behavior create(ActorRef monitor) { + return Behaviors.receive(Message.class) + .onMessage(Message.class, in -> onMyMessage(monitor, in)) + .build(); + } + + private static Behavior onMyMessage(ActorRef monitor, Message message) { + monitor.tell(message.getId()); + return Behaviors.same(); + } + } + // #routee // intentionally outside the routee scope @@ -138,6 +187,52 @@ public class RouterTest { // #group } + @Test + public void showGroupRoutingWithConsistentHashing() throws Exception { + + TestProbe probe1 = testKit.createTestProbe(); + TestProbe probe2 = testKit.createTestProbe(); + + Proxy proxy = new Proxy(); + + ActorRef proxy1 = testKit.spawn(proxy.create(probe1.ref())); + ActorRef proxy2 = testKit.spawn(proxy.create(probe2.ref())); + + TestProbe waiterProbe = testKit.createTestProbe(); + // registering proxies + + testKit + .system() + .receptionist() + .tell(Receptionist.register(proxy.registeringKey, proxy1, waiterProbe.ref())); + testKit + .system() + .receptionist() + .tell(Receptionist.register(proxy.registeringKey, proxy2, waiterProbe.ref())); + // wait until both registrations get Receptionist.Registered + + waiterProbe.receiveSeveralMessages(2); + // messages sent to a router with consistent hashing + // #consistent-hashing + ActorRef router = + testKit.spawn( + Routers.group(proxy.registeringKey) + .withConsistentHashingRouting(10, command -> proxy.mapping(command))); + + router.tell(new Proxy.Message("123", "Text1")); + router.tell(new Proxy.Message("123", "Text2")); + + router.tell(new Proxy.Message("zh3", "Text3")); + router.tell(new Proxy.Message("zh3", "Text4")); + // the hash is calculated over the Proxy.Message first parameter obtained through the + // Proxy.mapping function + // #consistent-hashing + // Then messages with equal Message.id reach the same actor + // so the first message in each probe queue is equal to its second + probe1.expectMessage(probe1.receiveMessage()); + probe2.expectMessage(probe2.receiveMessage()); + } + public static void main(String[] args) { ActorSystem system = ActorSystem.create( diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala index 32d8950f1e..c52b8a247d 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -7,7 +7,7 @@ package docs.akka.typed import akka.actor.typed.DispatcherSelector // #pool import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } -import akka.actor.typed.{ Behavior, SupervisorStrategy } +import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.{ Behaviors, Routers } @@ -143,5 +143,55 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with probe.receiveMessages(10) } + "show group routing with consistent hashing" in { + + val probe1 = createTestProbe[String]() + val probe2 = createTestProbe[String]() + + object Proxy { + + val RegisteringKey = ServiceKey[Message]("aggregator-key") + + def mapping(message: Message) = message.id + + case class Message(id: String, content: String) + + def apply(monitor: ActorRef[String]): Behavior[Message] = + Behaviors.receiveMessage { + case Message(id, _) => + monitor ! id + Behaviors.same + } + } + + //registering proxies + val proxy1 = spawn(Proxy(probe1.ref)) + val proxy2 = spawn(Proxy(probe2.ref)) + val waiterProbe = createTestProbe[Receptionist.Registered]() + + system.receptionist ! Receptionist.Register(Proxy.RegisteringKey, proxy1, waiterProbe.ref) + system.receptionist ! Receptionist.Register(Proxy.RegisteringKey, proxy2, waiterProbe.ref) + //wait until both registrations get Receptionist.Registered + waiterProbe.receiveMessages(2) + + //messages sent to a router with consistent hashing + // #consistent-hashing + val router = spawn(Routers.group(Proxy.RegisteringKey).withConsistentHashingRouting(10, Proxy.mapping)) + + router ! Proxy.Message("123", "Text1") + router ! Proxy.Message("123", "Text2") + + router ! Proxy.Message("zh3", "Text3") + router ! Proxy.Message("zh3", "Text4") + // the hash is calculated over the Proxy.Message first parameter obtained through the Proxy.mapping function + // #consistent-hashing + //Then messages with equal Message.id reach the same actor + //so the first message in each probe queue is equal to its second + probe1.receiveMessage() shouldBe probe1.receiveMessage() + probe2.receiveMessage() shouldBe probe2.receiveMessage() + + } + } + } diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index f1375044b0..512f60593f 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -139,6 +139,13 @@ hash key. This makes the decision transparent for the sender. Consistent hashing delivers messages with the same hash to the same routee as long as the set of routees stays the same. When the set of routees changes, consistent hashing tries to make sure, but does not guarantee, that messages with the same hash are routed to the same routee. + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #consistent-hashing } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #consistent-hashing } + See also @ref[Akka Cluster Sharding](cluster-sharding.md) which provides stable routing and rebalancing of the routee actors. ## Routers and performance