Typed consistent hashing example (#30036)

Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
This commit is contained in:
franciscolopezsancho 2021-04-01 08:31:42 +01:00 committed by GitHub
parent 55ec18962c
commit fb432b3d8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 154 additions and 2 deletions

View file

@ -21,9 +21,18 @@ import akka.actor.typed.javadsl.Routers;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
// #pool
public class RouterTest {
public class RouterTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
static // #routee
class Worker {
@ -54,6 +63,46 @@ public class RouterTest {
}
}
static class Proxy {
public final ServiceKey<Message> registeringKey =
ServiceKey.create(Message.class, "aggregator-key");
public String mapping(Message message) {
return message.getId();
}
static class Message {
public Message(String id, String content) {
this.id = id;
this.content = content;
}
private String content;
private String id;
public final String getContent() {
return content;
}
public final String getId() {
return id;
}
}
static Behavior<Message> create(ActorRef<String> monitor) {
return Behaviors.receive(Message.class)
.onMessage(Message.class, in -> onMyMessage(monitor, in))
.build();
}
private static Behavior<Message> onMyMessage(ActorRef<String> monitor, Message message) {
monitor.tell(message.getId());
return Behaviors.same();
}
}
// #routee
// intentionally outside the routee scope
@ -138,6 +187,52 @@ public class RouterTest {
// #group
}
@Test
public void showGroupRoutingWithConsistentHashing() throws Exception {
TestProbe<String> probe1 = testKit.createTestProbe();
TestProbe<String> probe2 = testKit.createTestProbe();
Proxy proxy = new Proxy();
ActorRef<Proxy.Message> proxy1 = testKit.spawn(proxy.create(probe1.ref()));
ActorRef<Proxy.Message> proxy2 = testKit.spawn(proxy.create(probe2.ref()));
TestProbe<Receptionist.Registered> waiterProbe = testKit.createTestProbe();
// registering proxies
testKit
.system()
.receptionist()
.tell(Receptionist.register(proxy.registeringKey, proxy1, waiterProbe.ref()));
testKit
.system()
.receptionist()
.tell(Receptionist.register(proxy.registeringKey, proxy2, waiterProbe.ref()));
// wait until both registrations get Receptionist.Registered
waiterProbe.receiveSeveralMessages(2);
// messages sent to a router with consistent hashing
// #consistent-hashing
ActorRef<Proxy.Message> router =
testKit.spawn(
Routers.group(proxy.registeringKey)
.withConsistentHashingRouting(10, command -> proxy.mapping(command)));
router.tell(new Proxy.Message("123", "Text1"));
router.tell(new Proxy.Message("123", "Text2"));
router.tell(new Proxy.Message("zh3", "Text3"));
router.tell(new Proxy.Message("zh3", "Text4"));
// the hash is calculated over the Proxy.Message first parameter obtained through the
// Proxy.mapping function
// #consistent-hashing
// Then messages with equal Message.id reach the same actor
// so the first message in each probe queue is equal to its second
probe1.expectMessage(probe1.receiveMessage());
probe2.expectMessage(probe2.receiveMessage());
}
public static void main(String[] args) {
ActorSystem<Void> system =
ActorSystem.create(

View file

@ -7,7 +7,7 @@ package docs.akka.typed
import akka.actor.typed.DispatcherSelector
// #pool
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.{ Behaviors, Routers }
@ -143,5 +143,55 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
probe.receiveMessages(10)
}
"show group routing with consistent hashing" in {
val probe1 = createTestProbe[String]()
val probe2 = createTestProbe[String]()
object Proxy {
val RegisteringKey = ServiceKey[Message]("aggregator-key")
def mapping(message: Message) = message.id
case class Message(id: String, content: String)
def apply(monitor: ActorRef[String]): Behavior[Message] =
Behaviors.receiveMessage {
case Message(id, _) =>
monitor ! id
Behaviors.same
}
}
//registering proxies
val proxy1 = spawn(Proxy(probe1.ref))
val proxy2 = spawn(Proxy(probe2.ref))
val waiterProbe = createTestProbe[Receptionist.Registered]()
system.receptionist ! Receptionist.Register(Proxy.RegisteringKey, proxy1, waiterProbe.ref)
system.receptionist ! Receptionist.Register(Proxy.RegisteringKey, proxy2, waiterProbe.ref)
//wait until both registrations get Receptionist.Registered
waiterProbe.receiveMessages(2)
//messages sent to a router with consistent hashing
// #consistent-hashing
val router = spawn(Routers.group(Proxy.RegisteringKey).withConsistentHashingRouting(10, Proxy.mapping))
router ! Proxy.Message("123", "Text1")
router ! Proxy.Message("123", "Text2")
router ! Proxy.Message("zh3", "Text3")
router ! Proxy.Message("zh3", "Text4")
// the hash is calculated over the Proxy.Message first parameter obtained through the Proxy.mapping function
// #consistent-hashing
//Then messages with equal Message.id reach the same actor
//so the first message in each probe queue is equal to its second
probe1.receiveMessage() shouldBe probe1.receiveMessage()
probe2.receiveMessage() shouldBe probe2.receiveMessage()
}
}
}

View file

@ -139,6 +139,13 @@ hash key. This makes the decision transparent for the sender.
Consistent hashing delivers messages with the same hash to the same routee as long as the set of routees stays the same.
When the set of routees changes, consistent hashing tries to make sure, but does not guarantee, that messages with the same hash are routed to the same routee.
Scala
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #consistent-hashing }
Java
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #consistent-hashing }
See also @ref[Akka Cluster Sharding](cluster-sharding.md) which provides stable routing and rebalancing of the routee actors.
## Routers and performance