diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala index cc5d7e9de7..74aa8ef352 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala @@ -3,14 +3,18 @@ */ package akka.actor.typed.internal.routing -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.actor.testkit.typed.scaladsl.LogCapturing -import org.scalatest.Matchers -import org.scalatest.WordSpecLike + +import akka.actor.{ Address, ExtendedActorSystem } +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorSystem, Behavior } +import org.scalatest.{ Matchers, WordSpecLike } class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers with LogCapturing { + val emptyMessage: Any = "" + "The round robin routing logic" must { "round robin" in { @@ -22,10 +26,10 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(allRoutees) - logic.selectRoutee() should ===(refA) - logic.selectRoutee() should ===(refB) - logic.selectRoutee() should ===(refC) - logic.selectRoutee() should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refB) + logic.selectRoutee(emptyMessage) should ===(refC) + logic.selectRoutee(emptyMessage) should ===(refA) } "not skip one on removal" in { @@ -36,12 +40,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(allRoutees) - logic.selectRoutee() should ===(refA) - logic.selectRoutee() should ===(refB) + logic.selectRoutee(emptyMessage) should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refB) val bRemoved = Set(refA, refC) logic.routeesUpdated(bRemoved) - logic.selectRoutee() should ===(refC) + logic.selectRoutee(emptyMessage) should ===(refC) } "handle last one removed" in { @@ -51,11 +55,11 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(allRoutees) - logic.selectRoutee() should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refA) val bRemoved = Set(refA) logic.routeesUpdated(bRemoved) - logic.selectRoutee() should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refA) } "move on to next when several removed" in { @@ -68,12 +72,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(allRoutees) - logic.selectRoutee() should ===(refA) - logic.selectRoutee() should ===(refB) + logic.selectRoutee(emptyMessage) should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refB) val severalRemoved = Set(refA, refC) logic.routeesUpdated(severalRemoved) - logic.selectRoutee() should ===(refC) + logic.selectRoutee(emptyMessage) should ===(refC) } "wrap around when several removed" in { @@ -86,13 +90,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(allRoutees) - logic.selectRoutee() should ===(refA) - logic.selectRoutee() should ===(refB) - logic.selectRoutee() should ===(refC) + logic.selectRoutee(emptyMessage) should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refB) + logic.selectRoutee(emptyMessage) should ===(refC) val severalRemoved = Set(refA, refC) logic.routeesUpdated(severalRemoved) - logic.selectRoutee() should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refA) } "pick first in with a completely new set of routees" in { @@ -105,13 +109,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with val logic = new RoutingLogics.RoundRobinLogic[Any] logic.routeesUpdated(initialRoutees) - logic.selectRoutee() should ===(refA) - logic.selectRoutee() should ===(refB) - logic.selectRoutee() should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refA) + logic.selectRoutee(emptyMessage) should ===(refB) + logic.selectRoutee(emptyMessage) should ===(refA) val severalRemoved = Set(refC, refD) logic.routeesUpdated(severalRemoved) - logic.selectRoutee() should ===(refC) + logic.selectRoutee(emptyMessage) should ===(refC) } } @@ -129,11 +133,91 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with (0 to 10).foreach { _ => // not much to verify here, but let's exercise it at least - val routee = logic.selectRoutee() + val routee = logic.selectRoutee(emptyMessage) routees should contain(routee) } } } + + "The consistent hashing logic" must { + import akka.actor.typed.scaladsl.adapter._ + val behavior: Behavior[Int] = Behaviors.empty[Int] + val typedSystem: ActorSystem[Int] = ActorSystem(behavior, "testSystem") + val selfAddress: Address = typedSystem.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val modulo10Mapping: Int => String = (in: Int) => (in % 10).toString + val messages: Map[Any, Seq[Int]] = (1 to 1000).groupBy(modulo10Mapping.apply) + + "not accept virtualization factor lesser than 1" in { + val caught = intercept[IllegalArgumentException] { + new RoutingLogics.ConsistentHashingLogic[Int](0, modulo10Mapping, selfAddress) + } + caught.getMessage shouldEqual "requirement failed: virtualNodesFactor has to be a positive integer" + } + + "throw an error when there are no routees" in { + val logic = + new RoutingLogics.ConsistentHashingLogic[Int](1, modulo10Mapping, selfAddress) + val caught = intercept[IllegalStateException] { + logic.selectRoutee(0) shouldBe typedSystem.deadLetters + } + (caught.getMessage should fullyMatch).regex("""Can't get node for \[.+\] from an empty node ring""") + + } + + "hash consistently" in { + consitentHashingTestWithVirtualizationFactor(1) + } + + "hash consistently with virtualization factor" in { + consitentHashingTestWithVirtualizationFactor(13) + } + + "hash consistently when several new added" in { + val logic = + new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, selfAddress) + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + logic.routeesUpdated(Set(refA, refB, refC, refD)) + // every group should have the same actor ref + verifyConsistentHashing(logic) + logic.routeesUpdated(Set(refA, refB)) + verifyConsistentHashing(logic) + } + + "hash consistently when several new removed" in { + val logic = + new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, selfAddress) + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + logic.routeesUpdated(Set(refA, refB)) + // every group should have the same actor ref + verifyConsistentHashing(logic) + logic.routeesUpdated(Set(refA, refB, refC, refD)) + verifyConsistentHashing(logic) + } + + def consitentHashingTestWithVirtualizationFactor(virtualizationFactor: Int): Boolean = { + val logic = + new RoutingLogics.ConsistentHashingLogic[Int](virtualizationFactor, modulo10Mapping, selfAddress) + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + logic.routeesUpdated(Set(refA, refB, refC, refD)) + verifyConsistentHashing(logic) + } + + def verifyConsistentHashing(logic: ConsistentHashingLogic[Int]): Boolean = { + messages.view.map(_._2.map(logic.selectRoutee)).forall { refs => + refs.headOption.forall(head => refs.forall(_ == head)) + } + } + + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala index 145cf1fe55..968a8a3232 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -5,34 +5,28 @@ package akka.actor.typed.scaladsl import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Dropped -import akka.actor.testkit.typed.scaladsl.LoggingTestKit -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.actor.testkit.typed.scaladsl.LogCapturing -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe } import akka.actor.typed.eventstream.EventStream -import akka.actor.typed.internal.routing.GroupRouterImpl -import akka.actor.typed.internal.routing.RoutingLogics -import akka.actor.typed.receptionist.Receptionist -import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.internal.routing.{ GroupRouterImpl, RoutingLogics } +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.adapter._ -import org.scalatest.Matchers -import org.scalatest.WordSpecLike +import akka.actor.typed.{ ActorRef, Behavior } +import akka.actor.{ ActorSystem, Dropped } +import org.scalatest.{ Matchers, WordSpecLike } class RoutersSpec extends ScalaTestWithActorTestKit(""" akka.loglevel=debug """) with WordSpecLike with Matchers with LogCapturing { // needed for the event filter - implicit val classicSystem = system.toClassic + implicit val classicSystem: ActorSystem = system.toClassic def compileOnlyApiCoverage(): Unit = { Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting() Routers.pool(10)(Behaviors.empty[Any]).withRandomRouting() Routers.pool(10)(Behaviors.empty[Any]).withRoundRobinRouting() + Routers.pool(10)(Behaviors.empty[Any]).withConsistentHashingRouting(1, (msg: Any) => msg.toString) } "The router pool" must { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala index f2a2a27980..13e6191a17 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -5,14 +5,10 @@ package docs.akka.typed // #pool -import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.testkit.typed.scaladsl.LogCapturing -import akka.actor.typed.Behavior -import akka.actor.typed.SupervisorStrategy -import akka.actor.typed.receptionist.Receptionist -import akka.actor.typed.receptionist.ServiceKey -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.Routers +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.{ Behavior, SupervisorStrategy } +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } +import akka.actor.typed.scaladsl.{ Behaviors, Routers } import org.scalatest.WordSpecLike // #pool diff --git a/akka-actor-typed/src/main/mima-filters/2.6.x.backwards.excludes/issue-27729-consistent-hashing.excludes b/akka-actor-typed/src/main/mima-filters/2.6.x.backwards.excludes/issue-27729-consistent-hashing.excludes new file mode 100644 index 0000000000..c9a4247145 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.x.backwards.excludes/issue-27729-consistent-hashing.excludes @@ -0,0 +1,34 @@ +# Those are new methods required for consistent hashing +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.PoolRouter.withConsistentHashingRouting") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.GroupRouter.withConsistentHashingRouting") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.PoolRouter.withConsistentHashingRouting") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.GroupRouter.withConsistentHashingRouting") + +# Routee method has been updated to accept the message, but it's still an internal API. +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogic.selectRoutee") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogic.selectRoutee") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogics#RoundRobinLogic.selectRoutee") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogics#RandomLogic.selectRoutee") + +# Internal changes due to of ActorSystem introduction +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply$default$2") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.logicFactory") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.copy$default$2") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.$default$3") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$3") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.$default$2") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply$default$2") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.$default$3") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$3") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.logicFactory") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy$default$3") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.this") diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala index f5d19a11f6..0ec4eeaa9e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala @@ -4,7 +4,9 @@ package akka.actor.typed.internal.routing -import akka.actor.Dropped +import java.util.function + +import akka.actor.{ Dropped, ExtendedActorSystem } import akka.actor.typed._ import akka.actor.typed.eventstream.EventStream import akka.actor.typed.receptionist.Receptionist @@ -20,17 +22,32 @@ import akka.annotation.InternalApi @InternalApi private[akka] final case class GroupRouterBuilder[T] private[akka] ( key: ServiceKey[T], - logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RandomLogic[T]()) + logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RandomLogic[T]()) extends javadsl.GroupRouter[T] with scaladsl.GroupRouter[T] { // deferred creation of the actual router - def apply(ctx: TypedActorContext[T]): Behavior[T] = new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory()) + def apply(ctx: TypedActorContext[T]): Behavior[T] = + new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory(ctx.asScala.system)) - def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) + def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]()) - def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T]) + def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RoundRobinLogic[T]) + def withConsistentHashingRouting( + virtualNodesFactor: Int, + mapping: function.Function[T, String]): GroupRouterBuilder[T] = + withConsistentHashingRouting(virtualNodesFactor, mapping.apply(_)) + + def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): GroupRouterBuilder[T] = { + import akka.actor.typed.scaladsl.adapter._ + copy( + logicFactory = system => + new RoutingLogics.ConsistentHashingLogic[T]( + virtualNodesFactor, + mapping, + system.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)) + } } /** @@ -98,7 +115,7 @@ private[akka] final class GroupRouterImpl[T]( this case msg: T @unchecked => import akka.actor.typed.scaladsl.adapter._ - if (!routeesEmpty) routingLogic.selectRoutee() ! msg + if (!routeesEmpty) routingLogic.selectRoutee(msg) ! msg else context.system.eventStream ! EventStream.Publish( Dropped(msg, s"No routees in group router for [$serviceKey]", context.self.toClassic)) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala index f8904a4dfc..2c3f154dc6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala @@ -4,10 +4,12 @@ package akka.actor.typed.internal.routing +import java.util.function + +import akka.actor.ExtendedActorSystem import akka.actor.typed._ -import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.ActorContext -import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.javadsl.PoolRouter +import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } import akka.annotation.InternalApi /** @@ -17,18 +19,31 @@ import akka.annotation.InternalApi private[akka] final case class PoolRouterBuilder[T]( poolSize: Int, behavior: Behavior[T], - logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RoundRobinLogic[T]) + logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RoundRobinLogic[T]) extends javadsl.PoolRouter[T] with scaladsl.PoolRouter[T] { if (poolSize < 1) throw new IllegalArgumentException(s"pool size must be positive, was $poolSize") // deferred creation of the actual router def apply(ctx: TypedActorContext[T]): Behavior[T] = - new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory()) + new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory(ctx.asScala.system)) - def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) + def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]()) - def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T]) + def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RoundRobinLogic[T]) + + def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: function.Function[T, String]): PoolRouter[T] = + withConsistentHashingRouting(virtualNodesFactor, mapping.apply(_)) + + def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): PoolRouterBuilder[T] = { + import akka.actor.typed.scaladsl.adapter._ + copy( + logicFactory = system => + new RoutingLogics.ConsistentHashingLogic[T]( + virtualNodesFactor, + mapping, + system.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress)) + } def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize) } @@ -57,7 +72,7 @@ private final class PoolRouterImpl[T]( } def onMessage(msg: T): Behavior[T] = { - logic.selectRoutee() ! msg + logic.selectRoutee(msg) ! msg this } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala index 176408b901..e0a0df80ef 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala @@ -6,8 +6,10 @@ package akka.actor.typed.internal.routing import java.util.concurrent.ThreadLocalRandom +import akka.actor.Address import akka.actor.typed.ActorRef import akka.annotation.InternalApi +import akka.routing.ConsistentHash /** * Kept in the behavior, not shared between instances, meant to be stateful. @@ -17,7 +19,7 @@ import akka.annotation.InternalApi @InternalApi sealed private[akka] trait RoutingLogic[T] { - def selectRoutee(): ActorRef[T] + def selectRoutee(msg: T): ActorRef[T] /** * Invoked an initial time before `selectRoutee` is ever called and then every time the set of available @@ -43,7 +45,7 @@ private[akka] object RoutingLogics { private var nextIdx = 0 - def selectRoutee(): ActorRef[T] = { + def selectRoutee(msg: T): ActorRef[T] = { if (nextIdx >= currentRoutees.length) nextIdx = 0 val selected = currentRoutees(nextIdx) nextIdx += 1 @@ -76,14 +78,37 @@ private[akka] object RoutingLogics { private var currentRoutees: Array[ActorRef[T]] = _ - override def selectRoutee(): ActorRef[T] = { + override def selectRoutee(msg: T): ActorRef[T] = { val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length) currentRoutees(selectedIdx) } + override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = { currentRoutees = newRoutees.toArray } + } + final class ConsistentHashingLogic[T](virtualNodesFactor: Int, mapping: T => String, baseAddress: Address) + extends RoutingLogic[T] { + require(virtualNodesFactor > 0, "virtualNodesFactor has to be a positive integer") + + private var pathToRefs: Map[String, ActorRef[T]] = Map.empty + + private var consistentHash: ConsistentHash[String] = ConsistentHash(Set.empty, virtualNodesFactor) + + override def selectRoutee(msg: T): ActorRef[T] = pathToRefs(consistentHash.nodeFor(mapping(msg))) + + override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = { + val updatedPathToRefs = newRoutees.map(routee => toFullAddressString(routee) -> routee).toMap + val withoutOld = pathToRefs.keySet.diff(updatedPathToRefs.keySet).foldLeft(consistentHash)(_ :- _) + consistentHash = updatedPathToRefs.keySet.diff(pathToRefs.keySet).foldLeft(withoutOld)(_ :+ _) + pathToRefs = updatedPathToRefs + } + + private def toFullAddressString(routee: ActorRef[T]): String = routee.path.address match { + case Address(_, _, None, None) => routee.path.toStringWithAddress(baseAddress) + case _ => routee.path.toString + } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala index 65e953b91d..152be43c73 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala @@ -6,8 +6,7 @@ package akka.actor.typed.javadsl import akka.actor.typed.Behavior import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior -import akka.actor.typed.internal.routing.GroupRouterBuilder -import akka.actor.typed.internal.routing.PoolRouterBuilder +import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder } import akka.actor.typed.receptionist.ServiceKey import akka.annotation.DoNotInherit @@ -64,6 +63,37 @@ abstract class GroupRouter[T] extends DeferredBehavior[T] { */ def withRoundRobinRouting(): GroupRouter[T] + /** + * Route messages by using consistent hashing. + * + * From wikipedia: Consistent hashing is based on mapping each object to a point on a circle + * (or equivalently, mapping each object to a real angle). The system maps each available machine + * (or other storage bucket) to many pseudo-randomly distributed points on the same circle. + * + * @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader + * knows what consistent hashing is + * (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki). + * This number is responsible for creating additional, + * virtual addresses for a provided set of routees, + * so that in the total number of points on hashing ring + * will be equal to numberOfRoutees * virtualNodesFactor + * (if virtualNodesFactor is equal to 1, then no additional points will be created). + * + * Those virtual nodes are being created by additionally rehashing routees + * to evenly distribute them across hashing ring. + * Consider increasing this number when you have a small number of routees. + * For bigger loads one can aim in having around 100-200 total addresses. + * + * Please also note that setting this number to a too big value will cause + * reasonable overhead when new routees will be added or old one removed. + * + * @param mapping Hash key extractor. This function will be used in consistent hashing process. + * Result of this operation should possibly uniquely distinguish messages. + */ + def withConsistentHashingRouting( + virtualNodesFactor: Int, + mapping: java.util.function.Function[T, String]): GroupRouter[T] + } /** @@ -91,6 +121,34 @@ abstract class PoolRouter[T] extends DeferredBehavior[T] { */ def withRoundRobinRouting(): PoolRouter[T] + /** + * Route messages by using consistent hashing. + * + * From wikipedia: Consistent hashing is based on mapping each object to a point on a circle + * (or equivalently, mapping each object to a real angle). The system maps each available machine + * (or other storage bucket) to many pseudo-randomly distributed points on the same circle. + * + * @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader + * knows what consistent hashing is + * (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki). + * This number is responsible for creating additional, + * virtual addresses for a provided set of routees, + * so that in the total number of points on hashing ring + * will be equal to numberOfRoutees * virtualNodesFactor + * (if virtualNodesFactor is equal to 1, then no additional points will be created). + * + * Those virtual nodes are being created by additionally rehashing routees + * to evenly distribute them across hashing ring. + * Consider increasing this number when you have a small number of routees. + * For bigger loads one can aim in having around 100-200 total addresses. + * + * @param mapping Hash key extractor. This function will be used in consistent hashing process. + * Result of this operation should possibly uniquely distinguish messages. + */ + def withConsistentHashingRouting( + virtualNodesFactor: Int, + mapping: java.util.function.Function[T, String]): PoolRouter[T] + /** * Set a new pool size from the one set at construction */ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala index 7a7665009c..c6f0574368 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala @@ -4,8 +4,7 @@ package akka.actor.typed.scaladsl import akka.actor.typed.Behavior -import akka.actor.typed.internal.routing.GroupRouterBuilder -import akka.actor.typed.internal.routing.PoolRouterBuilder +import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder } import akka.actor.typed.receptionist.ServiceKey import akka.annotation.DoNotInherit @@ -60,6 +59,35 @@ trait GroupRouter[T] extends Behavior[T] { */ def withRoundRobinRouting(): GroupRouter[T] + /** + * Route messages by using consistent hashing. + * + * From wikipedia: Consistent hashing is based on mapping each object to a point on a circle + * (or equivalently, mapping each object to a real angle). The system maps each available machine + * (or other storage bucket) to many pseudo-randomly distributed points on the same circle. + * + * @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader + * knows what consistent hashing is + * (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki). + * This number is responsible for creating additional, + * virtual addresses for a provided set of routees, + * so that in the total number of points on hashing ring + * will be equal to numberOfRoutees * virtualNodesFactor + * (if virtualNodesFactor is equal to 1, then no additional points will be created). + * + * Those virtual nodes are being created by additionally rehashing routees + * to evenly distribute them across hashing ring. + * Consider increasing this number when you have a small number of routees. + * For bigger loads one can aim in having around 100-200 total addresses. + * + * Please also note that setting this number to a too big value will cause + * reasonable overhead when new routees will be added or old one removed. + * + * @param mapping Hash key extractor. This function will be used in consistent hashing process. + * Result of this operation should possibly uniquely distinguish messages. + */ + def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): GroupRouter[T] + } /** @@ -87,6 +115,32 @@ trait PoolRouter[T] extends Behavior[T] { */ def withRoundRobinRouting(): PoolRouter[T] + /** + * Route messages by using consistent hashing. + * + * From wikipedia: Consistent hashing is based on mapping each object to a point on a circle + * (or equivalently, mapping each object to a real angle). The system maps each available machine + * (or other storage bucket) to many pseudo-randomly distributed points on the same circle. + * + * @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader + * knows what consistent hashing is + * (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki). + * This number is responsible for creating additional, + * virtual addresses for a provided set of routees, + * so that in the total number of points on hashing ring + * will be equal to numberOfRoutees * virtualNodesFactor + * (if virtualNodesFactor is equal to 1, then no additional points will be created). + * + * Those virtual nodes are being created by additionally rehashing routees + * to evenly distribute them across hashing ring. + * Consider increasing this number when you have a small number of routees. + * For bigger loads one can aim in having around 100-200 total addresses. + * + * @param mapping Hash key extractor. This function will be used in consistent hashing process. + * Result of this operation should possibly uniquely distinguish messages. + */ + def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): PoolRouter[T] + /** * Set a new pool size from the one set at construction */ diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index de189e5cd7..961863983a 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -79,13 +79,23 @@ sent through the router, each actor is forwarded one message. Round robin gives fair routing where every available routee gets the same amount of messages as long as the set of routees stays relatively stable, but may be unfair if the set of routees changes a lot. -This is the default for pool routers as the pool of routees is expected to remain the same. +This is the default for pool routers as the pool of routees is expected to remain the same. ### Random + Randomly selects a routee when a message is sent through the router. This is the default for group routers as the group of routees is expected to change as nodes join and leave the cluster. +### Consistent Hashing + +Uses [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing) to select a routee based +on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html) +gives good insight into how consistent hashing is implemented. + +Currently you have to define hashMapping of the router to map incoming messages to their consistent +hash key. This makes the decision transparent for the sender. + ## Routers and performance Note that if the routees are sharing a resource, the resource will determine if increasing the number of