27729 consistent hashing routing (#28141)

This commit is contained in:
Piotr Lewandowski 2019-11-21 09:23:59 +01:00 committed by Patrik Nordwall
parent 4bf94fee2d
commit d90bfc6a0e
10 changed files with 357 additions and 70 deletions

View file

@ -3,14 +3,18 @@
*/ */
package akka.actor.typed.internal.routing package akka.actor.typed.internal.routing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.{ Address, ExtendedActorSystem }
import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe }
import org.scalatest.Matchers import akka.actor.typed.internal.routing.RoutingLogics.ConsistentHashingLogic
import org.scalatest.WordSpecLike import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorSystem, Behavior }
import org.scalatest.{ Matchers, WordSpecLike }
class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers with LogCapturing { class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers with LogCapturing {
val emptyMessage: Any = ""
"The round robin routing logic" must { "The round robin routing logic" must {
"round robin" in { "round robin" in {
@ -22,10 +26,10 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees) logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee() should ===(refB) logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee() should ===(refC) logic.selectRoutee(emptyMessage) should ===(refC)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
} }
"not skip one on removal" in { "not skip one on removal" in {
@ -36,12 +40,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees) logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee() should ===(refB) logic.selectRoutee(emptyMessage) should ===(refB)
val bRemoved = Set(refA, refC) val bRemoved = Set(refA, refC)
logic.routeesUpdated(bRemoved) logic.routeesUpdated(bRemoved)
logic.selectRoutee() should ===(refC) logic.selectRoutee(emptyMessage) should ===(refC)
} }
"handle last one removed" in { "handle last one removed" in {
@ -51,11 +55,11 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees) logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
val bRemoved = Set(refA) val bRemoved = Set(refA)
logic.routeesUpdated(bRemoved) logic.routeesUpdated(bRemoved)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
} }
"move on to next when several removed" in { "move on to next when several removed" in {
@ -68,12 +72,12 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees) logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee() should ===(refB) logic.selectRoutee(emptyMessage) should ===(refB)
val severalRemoved = Set(refA, refC) val severalRemoved = Set(refA, refC)
logic.routeesUpdated(severalRemoved) logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refC) logic.selectRoutee(emptyMessage) should ===(refC)
} }
"wrap around when several removed" in { "wrap around when several removed" in {
@ -86,13 +90,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(allRoutees) logic.routeesUpdated(allRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee() should ===(refB) logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee() should ===(refC) logic.selectRoutee(emptyMessage) should ===(refC)
val severalRemoved = Set(refA, refC) val severalRemoved = Set(refA, refC)
logic.routeesUpdated(severalRemoved) logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
} }
"pick first in with a completely new set of routees" in { "pick first in with a completely new set of routees" in {
@ -105,13 +109,13 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
val logic = new RoutingLogics.RoundRobinLogic[Any] val logic = new RoutingLogics.RoundRobinLogic[Any]
logic.routeesUpdated(initialRoutees) logic.routeesUpdated(initialRoutees)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
logic.selectRoutee() should ===(refB) logic.selectRoutee(emptyMessage) should ===(refB)
logic.selectRoutee() should ===(refA) logic.selectRoutee(emptyMessage) should ===(refA)
val severalRemoved = Set(refC, refD) val severalRemoved = Set(refC, refD)
logic.routeesUpdated(severalRemoved) logic.routeesUpdated(severalRemoved)
logic.selectRoutee() should ===(refC) logic.selectRoutee(emptyMessage) should ===(refC)
} }
} }
@ -129,11 +133,91 @@ class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with
(0 to 10).foreach { _ => (0 to 10).foreach { _ =>
// not much to verify here, but let's exercise it at least // not much to verify here, but let's exercise it at least
val routee = logic.selectRoutee() val routee = logic.selectRoutee(emptyMessage)
routees should contain(routee) routees should contain(routee)
} }
} }
} }
"The consistent hashing logic" must {
import akka.actor.typed.scaladsl.adapter._
val behavior: Behavior[Int] = Behaviors.empty[Int]
val typedSystem: ActorSystem[Int] = ActorSystem(behavior, "testSystem")
val selfAddress: Address = typedSystem.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val modulo10Mapping: Int => String = (in: Int) => (in % 10).toString
val messages: Map[Any, Seq[Int]] = (1 to 1000).groupBy(modulo10Mapping.apply)
"not accept virtualization factor lesser than 1" in {
val caught = intercept[IllegalArgumentException] {
new RoutingLogics.ConsistentHashingLogic[Int](0, modulo10Mapping, selfAddress)
}
caught.getMessage shouldEqual "requirement failed: virtualNodesFactor has to be a positive integer"
}
"throw an error when there are no routees" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](1, modulo10Mapping, selfAddress)
val caught = intercept[IllegalStateException] {
logic.selectRoutee(0) shouldBe typedSystem.deadLetters
}
(caught.getMessage should fullyMatch).regex("""Can't get node for \[.+\] from an empty node ring""")
}
"hash consistently" in {
consitentHashingTestWithVirtualizationFactor(1)
}
"hash consistently with virtualization factor" in {
consitentHashingTestWithVirtualizationFactor(13)
}
"hash consistently when several new added" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, selfAddress)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB, refC, refD))
// every group should have the same actor ref
verifyConsistentHashing(logic)
logic.routeesUpdated(Set(refA, refB))
verifyConsistentHashing(logic)
}
"hash consistently when several new removed" in {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](2, modulo10Mapping, selfAddress)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB))
// every group should have the same actor ref
verifyConsistentHashing(logic)
logic.routeesUpdated(Set(refA, refB, refC, refD))
verifyConsistentHashing(logic)
}
def consitentHashingTestWithVirtualizationFactor(virtualizationFactor: Int): Boolean = {
val logic =
new RoutingLogics.ConsistentHashingLogic[Int](virtualizationFactor, modulo10Mapping, selfAddress)
val refA = TestProbe("a").ref
val refB = TestProbe("b").ref
val refC = TestProbe("c").ref
val refD = TestProbe("d").ref
logic.routeesUpdated(Set(refA, refB, refC, refD))
verifyConsistentHashing(logic)
}
def verifyConsistentHashing(logic: ConsistentHashingLogic[Int]): Boolean = {
messages.view.map(_._2.map(logic.selectRoutee)).forall { refs =>
refs.headOption.forall(head => refs.forall(_ == head))
}
}
}
} }

View file

@ -5,34 +5,28 @@
package akka.actor.typed.scaladsl package akka.actor.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Dropped import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.internal.routing.GroupRouterImpl import akka.actor.typed.internal.routing.{ GroupRouterImpl, RoutingLogics }
import akka.actor.typed.internal.routing.RoutingLogics import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
import org.scalatest.Matchers import akka.actor.typed.{ ActorRef, Behavior }
import org.scalatest.WordSpecLike import akka.actor.{ ActorSystem, Dropped }
import org.scalatest.{ Matchers, WordSpecLike }
class RoutersSpec extends ScalaTestWithActorTestKit(""" class RoutersSpec extends ScalaTestWithActorTestKit("""
akka.loglevel=debug akka.loglevel=debug
""") with WordSpecLike with Matchers with LogCapturing { """) with WordSpecLike with Matchers with LogCapturing {
// needed for the event filter // needed for the event filter
implicit val classicSystem = system.toClassic implicit val classicSystem: ActorSystem = system.toClassic
def compileOnlyApiCoverage(): Unit = { def compileOnlyApiCoverage(): Unit = {
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting() Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()
Routers.pool(10)(Behaviors.empty[Any]).withRandomRouting() Routers.pool(10)(Behaviors.empty[Any]).withRandomRouting()
Routers.pool(10)(Behaviors.empty[Any]).withRoundRobinRouting() Routers.pool(10)(Behaviors.empty[Any]).withRoundRobinRouting()
Routers.pool(10)(Behaviors.empty[Any]).withConsistentHashingRouting(1, (msg: Any) => msg.toString)
} }
"The router pool" must { "The router pool" must {

View file

@ -5,14 +5,10 @@
package docs.akka.typed package docs.akka.typed
// #pool // #pool
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.Behavior import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.{ Behaviors, Routers }
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
// #pool // #pool

View file

@ -0,0 +1,34 @@
# Those are new methods required for consistent hashing
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.PoolRouter.withConsistentHashingRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.GroupRouter.withConsistentHashingRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.PoolRouter.withConsistentHashingRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.GroupRouter.withConsistentHashingRouting")
# Routee method has been updated to accept the message, but it's still an internal API.
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogic.selectRoutee")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogic.selectRoutee")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogics#RoundRobinLogic.selectRoutee")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.RoutingLogics#RandomLogic.selectRoutee")
# Internal changes due to of ActorSystem introduction
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply$default$2")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.logicFactory")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.copy$default$2")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.<init>$default$3")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$3")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.<init>$default$2")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.apply$default$2")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.GroupRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.<init>$default$3")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$3")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.logicFactory")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy$default$3")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.this")

View file

@ -4,7 +4,9 @@
package akka.actor.typed.internal.routing package akka.actor.typed.internal.routing
import akka.actor.Dropped import java.util.function
import akka.actor.{ Dropped, ExtendedActorSystem }
import akka.actor.typed._ import akka.actor.typed._
import akka.actor.typed.eventstream.EventStream import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.Receptionist
@ -20,17 +22,32 @@ import akka.annotation.InternalApi
@InternalApi @InternalApi
private[akka] final case class GroupRouterBuilder[T] private[akka] ( private[akka] final case class GroupRouterBuilder[T] private[akka] (
key: ServiceKey[T], key: ServiceKey[T],
logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RandomLogic[T]()) logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RandomLogic[T]())
extends javadsl.GroupRouter[T] extends javadsl.GroupRouter[T]
with scaladsl.GroupRouter[T] { with scaladsl.GroupRouter[T] {
// deferred creation of the actual router // deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] = new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory()) def apply(ctx: TypedActorContext[T]): Behavior[T] =
new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory(ctx.asScala.system))
def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]())
def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T]) def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RoundRobinLogic[T])
def withConsistentHashingRouting(
virtualNodesFactor: Int,
mapping: function.Function[T, String]): GroupRouterBuilder[T] =
withConsistentHashingRouting(virtualNodesFactor, mapping.apply(_))
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): GroupRouterBuilder[T] = {
import akka.actor.typed.scaladsl.adapter._
copy(
logicFactory = system =>
new RoutingLogics.ConsistentHashingLogic[T](
virtualNodesFactor,
mapping,
system.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress))
}
} }
/** /**
@ -98,7 +115,7 @@ private[akka] final class GroupRouterImpl[T](
this this
case msg: T @unchecked => case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
if (!routeesEmpty) routingLogic.selectRoutee() ! msg if (!routeesEmpty) routingLogic.selectRoutee(msg) ! msg
else else
context.system.eventStream ! EventStream.Publish( context.system.eventStream ! EventStream.Publish(
Dropped(msg, s"No routees in group router for [$serviceKey]", context.self.toClassic)) Dropped(msg, s"No routees in group router for [$serviceKey]", context.self.toClassic))

View file

@ -4,10 +4,12 @@
package akka.actor.typed.internal.routing package akka.actor.typed.internal.routing
import java.util.function
import akka.actor.ExtendedActorSystem
import akka.actor.typed._ import akka.actor.typed._
import akka.actor.typed.scaladsl.AbstractBehavior import akka.actor.typed.javadsl.PoolRouter
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi import akka.annotation.InternalApi
/** /**
@ -17,18 +19,31 @@ import akka.annotation.InternalApi
private[akka] final case class PoolRouterBuilder[T]( private[akka] final case class PoolRouterBuilder[T](
poolSize: Int, poolSize: Int,
behavior: Behavior[T], behavior: Behavior[T],
logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RoundRobinLogic[T]) logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RoundRobinLogic[T])
extends javadsl.PoolRouter[T] extends javadsl.PoolRouter[T]
with scaladsl.PoolRouter[T] { with scaladsl.PoolRouter[T] {
if (poolSize < 1) throw new IllegalArgumentException(s"pool size must be positive, was $poolSize") if (poolSize < 1) throw new IllegalArgumentException(s"pool size must be positive, was $poolSize")
// deferred creation of the actual router // deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] = def apply(ctx: TypedActorContext[T]): Behavior[T] =
new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory()) new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory(ctx.asScala.system))
def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]())
def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RoundRobinLogic[T]) def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RoundRobinLogic[T])
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: function.Function[T, String]): PoolRouter[T] =
withConsistentHashingRouting(virtualNodesFactor, mapping.apply(_))
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): PoolRouterBuilder[T] = {
import akka.actor.typed.scaladsl.adapter._
copy(
logicFactory = system =>
new RoutingLogics.ConsistentHashingLogic[T](
virtualNodesFactor,
mapping,
system.toClassic.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress))
}
def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize) def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize)
} }
@ -57,7 +72,7 @@ private final class PoolRouterImpl[T](
} }
def onMessage(msg: T): Behavior[T] = { def onMessage(msg: T): Behavior[T] = {
logic.selectRoutee() ! msg logic.selectRoutee(msg) ! msg
this this
} }

View file

@ -6,8 +6,10 @@ package akka.actor.typed.internal.routing
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.actor.Address
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.routing.ConsistentHash
/** /**
* Kept in the behavior, not shared between instances, meant to be stateful. * Kept in the behavior, not shared between instances, meant to be stateful.
@ -17,7 +19,7 @@ import akka.annotation.InternalApi
@InternalApi @InternalApi
sealed private[akka] trait RoutingLogic[T] { sealed private[akka] trait RoutingLogic[T] {
def selectRoutee(): ActorRef[T] def selectRoutee(msg: T): ActorRef[T]
/** /**
* Invoked an initial time before `selectRoutee` is ever called and then every time the set of available * Invoked an initial time before `selectRoutee` is ever called and then every time the set of available
@ -43,7 +45,7 @@ private[akka] object RoutingLogics {
private var nextIdx = 0 private var nextIdx = 0
def selectRoutee(): ActorRef[T] = { def selectRoutee(msg: T): ActorRef[T] = {
if (nextIdx >= currentRoutees.length) nextIdx = 0 if (nextIdx >= currentRoutees.length) nextIdx = 0
val selected = currentRoutees(nextIdx) val selected = currentRoutees(nextIdx)
nextIdx += 1 nextIdx += 1
@ -76,14 +78,37 @@ private[akka] object RoutingLogics {
private var currentRoutees: Array[ActorRef[T]] = _ private var currentRoutees: Array[ActorRef[T]] = _
override def selectRoutee(): ActorRef[T] = { override def selectRoutee(msg: T): ActorRef[T] = {
val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length) val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length)
currentRoutees(selectedIdx) currentRoutees(selectedIdx)
} }
override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = { override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = {
currentRoutees = newRoutees.toArray currentRoutees = newRoutees.toArray
} }
}
final class ConsistentHashingLogic[T](virtualNodesFactor: Int, mapping: T => String, baseAddress: Address)
extends RoutingLogic[T] {
require(virtualNodesFactor > 0, "virtualNodesFactor has to be a positive integer")
private var pathToRefs: Map[String, ActorRef[T]] = Map.empty
private var consistentHash: ConsistentHash[String] = ConsistentHash(Set.empty, virtualNodesFactor)
override def selectRoutee(msg: T): ActorRef[T] = pathToRefs(consistentHash.nodeFor(mapping(msg)))
override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = {
val updatedPathToRefs = newRoutees.map(routee => toFullAddressString(routee) -> routee).toMap
val withoutOld = pathToRefs.keySet.diff(updatedPathToRefs.keySet).foldLeft(consistentHash)(_ :- _)
consistentHash = updatedPathToRefs.keySet.diff(pathToRefs.keySet).foldLeft(withoutOld)(_ :+ _)
pathToRefs = updatedPathToRefs
}
private def toFullAddressString(routee: ActorRef[T]): String = routee.path.address match {
case Address(_, _, None, None) => routee.path.toStringWithAddress(baseAddress)
case _ => routee.path.toString
}
} }
} }

View file

@ -6,8 +6,7 @@ package akka.actor.typed.javadsl
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
import akka.actor.typed.internal.routing.GroupRouterBuilder import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder }
import akka.actor.typed.internal.routing.PoolRouterBuilder
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
@ -64,6 +63,37 @@ abstract class GroupRouter[T] extends DeferredBehavior[T] {
*/ */
def withRoundRobinRouting(): GroupRouter[T] def withRoundRobinRouting(): GroupRouter[T]
/**
* Route messages by using consistent hashing.
*
* From wikipedia: Consistent hashing is based on mapping each object to a point on a circle
* (or equivalently, mapping each object to a real angle). The system maps each available machine
* (or other storage bucket) to many pseudo-randomly distributed points on the same circle.
*
* @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader
* knows what consistent hashing is
* (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki).
* This number is responsible for creating additional,
* virtual addresses for a provided set of routees,
* so that in the total number of points on hashing ring
* will be equal to numberOfRoutees * virtualNodesFactor
* (if virtualNodesFactor is equal to 1, then no additional points will be created).
*
* Those virtual nodes are being created by additionally rehashing routees
* to evenly distribute them across hashing ring.
* Consider increasing this number when you have a small number of routees.
* For bigger loads one can aim in having around 100-200 total addresses.
*
* Please also note that setting this number to a too big value will cause
* reasonable overhead when new routees will be added or old one removed.
*
* @param mapping Hash key extractor. This function will be used in consistent hashing process.
* Result of this operation should possibly uniquely distinguish messages.
*/
def withConsistentHashingRouting(
virtualNodesFactor: Int,
mapping: java.util.function.Function[T, String]): GroupRouter[T]
} }
/** /**
@ -91,6 +121,34 @@ abstract class PoolRouter[T] extends DeferredBehavior[T] {
*/ */
def withRoundRobinRouting(): PoolRouter[T] def withRoundRobinRouting(): PoolRouter[T]
/**
* Route messages by using consistent hashing.
*
* From wikipedia: Consistent hashing is based on mapping each object to a point on a circle
* (or equivalently, mapping each object to a real angle). The system maps each available machine
* (or other storage bucket) to many pseudo-randomly distributed points on the same circle.
*
* @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader
* knows what consistent hashing is
* (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki).
* This number is responsible for creating additional,
* virtual addresses for a provided set of routees,
* so that in the total number of points on hashing ring
* will be equal to numberOfRoutees * virtualNodesFactor
* (if virtualNodesFactor is equal to 1, then no additional points will be created).
*
* Those virtual nodes are being created by additionally rehashing routees
* to evenly distribute them across hashing ring.
* Consider increasing this number when you have a small number of routees.
* For bigger loads one can aim in having around 100-200 total addresses.
*
* @param mapping Hash key extractor. This function will be used in consistent hashing process.
* Result of this operation should possibly uniquely distinguish messages.
*/
def withConsistentHashingRouting(
virtualNodesFactor: Int,
mapping: java.util.function.Function[T, String]): PoolRouter[T]
/** /**
* Set a new pool size from the one set at construction * Set a new pool size from the one set at construction
*/ */

View file

@ -4,8 +4,7 @@
package akka.actor.typed.scaladsl package akka.actor.typed.scaladsl
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.internal.routing.GroupRouterBuilder import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder }
import akka.actor.typed.internal.routing.PoolRouterBuilder
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
@ -60,6 +59,35 @@ trait GroupRouter[T] extends Behavior[T] {
*/ */
def withRoundRobinRouting(): GroupRouter[T] def withRoundRobinRouting(): GroupRouter[T]
/**
* Route messages by using consistent hashing.
*
* From wikipedia: Consistent hashing is based on mapping each object to a point on a circle
* (or equivalently, mapping each object to a real angle). The system maps each available machine
* (or other storage bucket) to many pseudo-randomly distributed points on the same circle.
*
* @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader
* knows what consistent hashing is
* (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki).
* This number is responsible for creating additional,
* virtual addresses for a provided set of routees,
* so that in the total number of points on hashing ring
* will be equal to numberOfRoutees * virtualNodesFactor
* (if virtualNodesFactor is equal to 1, then no additional points will be created).
*
* Those virtual nodes are being created by additionally rehashing routees
* to evenly distribute them across hashing ring.
* Consider increasing this number when you have a small number of routees.
* For bigger loads one can aim in having around 100-200 total addresses.
*
* Please also note that setting this number to a too big value will cause
* reasonable overhead when new routees will be added or old one removed.
*
* @param mapping Hash key extractor. This function will be used in consistent hashing process.
* Result of this operation should possibly uniquely distinguish messages.
*/
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): GroupRouter[T]
} }
/** /**
@ -87,6 +115,32 @@ trait PoolRouter[T] extends Behavior[T] {
*/ */
def withRoundRobinRouting(): PoolRouter[T] def withRoundRobinRouting(): PoolRouter[T]
/**
* Route messages by using consistent hashing.
*
* From wikipedia: Consistent hashing is based on mapping each object to a point on a circle
* (or equivalently, mapping each object to a real angle). The system maps each available machine
* (or other storage bucket) to many pseudo-randomly distributed points on the same circle.
*
* @param virtualNodesFactor This factor has to be greater or equal to 1. Assuming that the reader
* knows what consistent hashing is
* (if not, please refer: http://www.tom-e-white.com/2007/11/consistent-hashing.html or wiki).
* This number is responsible for creating additional,
* virtual addresses for a provided set of routees,
* so that in the total number of points on hashing ring
* will be equal to numberOfRoutees * virtualNodesFactor
* (if virtualNodesFactor is equal to 1, then no additional points will be created).
*
* Those virtual nodes are being created by additionally rehashing routees
* to evenly distribute them across hashing ring.
* Consider increasing this number when you have a small number of routees.
* For bigger loads one can aim in having around 100-200 total addresses.
*
* @param mapping Hash key extractor. This function will be used in consistent hashing process.
* Result of this operation should possibly uniquely distinguish messages.
*/
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): PoolRouter[T]
/** /**
* Set a new pool size from the one set at construction * Set a new pool size from the one set at construction
*/ */

View file

@ -79,13 +79,23 @@ sent through the router, each actor is forwarded one message.
Round robin gives fair routing where every available routee gets the same amount of messages as long as the set Round robin gives fair routing where every available routee gets the same amount of messages as long as the set
of routees stays relatively stable, but may be unfair if the set of routees changes a lot. of routees stays relatively stable, but may be unfair if the set of routees changes a lot.
This is the default for pool routers as the pool of routees is expected to remain the same. This is the default for pool routers as the pool of routees is expected to remain the same.
### Random ### Random
Randomly selects a routee when a message is sent through the router. Randomly selects a routee when a message is sent through the router.
This is the default for group routers as the group of routees is expected to change as nodes join and leave the cluster. This is the default for group routers as the group of routees is expected to change as nodes join and leave the cluster.
### Consistent Hashing
Uses [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing) to select a routee based
on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html)
gives good insight into how consistent hashing is implemented.
Currently you have to define hashMapping of the router to map incoming messages to their consistent
hash key. This makes the decision transparent for the sender.
## Routers and performance ## Routers and performance
Note that if the routees are sharing a resource, the resource will determine if increasing the number of Note that if the routees are sharing a resource, the resource will determine if increasing the number of