add local affinity support for group router #28174

This commit is contained in:
Xusheng Peng 2019-12-03 23:34:27 +08:00 committed by Johan Andrén
parent e338a5aecd
commit 7b943873fc
7 changed files with 273 additions and 8 deletions

View file

@ -204,7 +204,7 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
"not route to unreachable when there are reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
new GroupRouterImpl(context, serviceKey, false, new RoutingLogics.RoundRobinLogic[String], true)))
val reachableProbe = createTestProbe[String]
val unreachableProbe = createTestProbe[String]
@ -219,7 +219,7 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
"route to unreachable when there are no reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
new GroupRouterImpl(context, serviceKey, false, new RoutingLogics.RoundRobinLogic[String], true)))
val unreachableProbe = createTestProbe[String]
router.unsafeUpcast[Any] ! Receptionist.Listing(

View file

@ -0,0 +1,7 @@
# Those are new methods required for prefer local routees.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.GroupRouter.withRandomRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.GroupRouter.withRoundRobinRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.GroupRouter.withRandomRouting")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.GroupRouter.withRoundRobinRouting")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.GroupRouterImpl.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.InitialGroupRouterImpl.this")

View file

@ -22,17 +22,24 @@ import akka.annotation.InternalApi
@InternalApi
private[akka] final case class GroupRouterBuilder[T] private[akka] (
key: ServiceKey[T],
preferLocalRoutees: Boolean = false,
logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RandomLogic[T]())
extends javadsl.GroupRouter[T]
with scaladsl.GroupRouter[T] {
// deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] =
new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory(ctx.asScala.system))
new InitialGroupRouterImpl[T](ctx.asScala, key, preferLocalRoutees, logicFactory(ctx.asScala.system))
def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]())
def withRandomRouting(): GroupRouterBuilder[T] = withRandomRouting(false)
def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RoundRobinLogic[T])
def withRandomRouting(preferLocalRoutees: Boolean): GroupRouterBuilder[T] =
copy(preferLocalRoutees = preferLocalRoutees, logicFactory = _ => new RoutingLogics.RandomLogic[T]())
def withRoundRobinRouting(): GroupRouterBuilder[T] = withRoundRobinRouting(false)
def withRoundRobinRouting(preferLocalRoutees: Boolean): GroupRouterBuilder[T] =
copy(preferLocalRoutees = preferLocalRoutees, logicFactory = _ => new RoutingLogics.RoundRobinLogic[T])
def withConsistentHashingRouting(
virtualNodesFactor: Int,
@ -41,6 +48,7 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] (
def withConsistentHashingRouting(virtualNodesFactor: Int, mapping: T => String): GroupRouterBuilder[T] = {
copy(
preferLocalRoutees = false,
logicFactory = system => new RoutingLogics.ConsistentHashingLogic[T](virtualNodesFactor, mapping, system.address))
}
}
@ -54,6 +62,7 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] (
private final class InitialGroupRouterImpl[T](
ctx: ActorContext[T],
serviceKey: ServiceKey[T],
preferLocalRoutees: Boolean,
routingLogic: RoutingLogic[T])
extends AbstractBehavior[T](ctx) {
@ -66,10 +75,12 @@ private final class InitialGroupRouterImpl[T](
private val stash = StashBuffer[T](context, capacity = 10000)
def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) =>
case serviceKey.Listing(allRoutees) =>
val update = GroupRouterHelper.routeesToUpdate(allRoutees, preferLocalRoutees)
// we don't need to watch, because receptionist already does that
routingLogic.routeesUpdated(update)
val activeGroupRouter = new GroupRouterImpl[T](context, serviceKey, routingLogic, update.isEmpty)
val activeGroupRouter =
new GroupRouterImpl[T](context, serviceKey, preferLocalRoutees, routingLogic, update.isEmpty)
stash.unstashAll(activeGroupRouter)
case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._
@ -83,6 +94,19 @@ private final class InitialGroupRouterImpl[T](
}
}
/**
* INTERNAL API
*/
@InternalApi
private[routing] object GroupRouterHelper {
def routeesToUpdate[T](allRoutees: Set[ActorRef[T]], preferLocalRoutees: Boolean): Set[ActorRef[T]] = {
if (preferLocalRoutees) {
val localRoutees = allRoutees.filter(_.path.address.hasLocalScope)
if (localRoutees.nonEmpty) localRoutees else allRoutees
} else allRoutees
}
}
/**
* INTERNAL API
*/
@ -90,6 +114,7 @@ private final class InitialGroupRouterImpl[T](
private[akka] final class GroupRouterImpl[T](
ctx: ActorContext[T],
serviceKey: ServiceKey[T],
preferLocalRoutees: Boolean,
routingLogic: RoutingLogic[T],
routeesInitiallyEmpty: Boolean)
extends AbstractBehavior[T](ctx) {
@ -97,8 +122,10 @@ private[akka] final class GroupRouterImpl[T](
private var routeesEmpty = routeesInitiallyEmpty
def onMessage(msg: T): Behavior[T] = msg match {
case l @ serviceKey.Listing(update) =>
case l @ serviceKey.Listing(allRoutees) =>
context.log.debug("Update from receptionist: [{}]", l)
val update = GroupRouterHelper.routeesToUpdate(allRoutees, preferLocalRoutees)
val routees =
if (update.nonEmpty) update
else

View file

@ -55,6 +55,17 @@ abstract class GroupRouter[T] extends DeferredBehavior[T] {
*/
def withRandomRouting(): GroupRouter[T]
/**
* Route messages by randomly selecting the routee from the available routees.
*
* This is the default for group routers.
*
* @param preferLocalRoutees if the value is false, all reachable routees will be used;
* if the value is true and there are local routees, only local routees will be used.
* if the value is true and there is no local routees, remote routees will be used.
*/
def withRandomRouting(preferLocalRoutees: Boolean): GroupRouter[T]
/**
* Route messages by using round robin.
*
@ -63,6 +74,18 @@ abstract class GroupRouter[T] extends DeferredBehavior[T] {
*/
def withRoundRobinRouting(): GroupRouter[T]
/**
* Route messages by using round robin.
*
* Round robin gives fair routing where every available routee gets the same amount of messages as long as the set
* of routees stays relatively stable, but may be unfair if the set of routees changes a lot.
*
* @param preferLocalRoutees if the value is false, all reachable routees will be used;
* if the value is true and there are local routees, only local routees will be used.
* if the value is true and there is no local routees, remote routees will be used.
*/
def withRoundRobinRouting(preferLocalRoutees: Boolean): GroupRouter[T]
/**
* Route messages by using consistent hashing.
*

View file

@ -51,6 +51,15 @@ trait GroupRouter[T] extends Behavior[T] {
*/
def withRandomRouting(): GroupRouter[T]
/**
* Route messages by randomly selecting the routee from the available routees. This is the default for group routers.
*
* @param preferLocalRoutees if the value is false, all reachable routees will be used;
* if the value is true and there are local routees, only local routees will be used.
* if the value is true and there is no local routees, remote routees will be used.
*/
def withRandomRouting(preferLocalRoutees: Boolean): GroupRouter[T]
/**
* Route messages by using round robin.
*
@ -59,6 +68,18 @@ trait GroupRouter[T] extends Behavior[T] {
*/
def withRoundRobinRouting(): GroupRouter[T]
/**
* Route messages by using round robin.
*
* Round robin gives fair routing where every available routee gets the same amount of messages as long as the set
* of routees stays relatively stable, but may be unfair if the set of routees changes a lot.
*
* @param preferLocalRoutees if the value is false, all reachable routees will be used;
* if the value is true and there are local routees, only local routees will be used.
* if the value true and there is no local routees, remote routees will be used.
*/
def withRoundRobinRouting(preferLocalRoutees: Boolean): GroupRouter[T]
/**
* Route messages by using consistent hashing.
*

View file

@ -0,0 +1,182 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.{ Behaviors, GroupRouter, Routers }
import akka.serialization.jackson.CborSerializable
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.concurrent.Promise
object GroupRouterSpec {
def config = ConfigFactory.parseString(s"""
akka {
loglevel = debug
actor.provider = cluster
remote.classic.netty.tcp.port = 0
remote.classic.netty.tcp.host = 127.0.0.1
remote.artery {
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
def createSystem[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name, config)
case object Ping extends CborSerializable
trait Command
case class UpdateWorker(actorRef: ActorRef[Ping.type]) extends Command
case class GetWorkers(replyTo: ActorRef[Seq[ActorRef[Ping.type]]]) extends Command
// receive Ping and send self to statsActorRef, so the statsActorRef will know who handle the message
def pingPong(statsActorRef: ActorRef[Command]) = Behaviors.receive[Ping.type] { (ctx, msg) =>
msg match {
case Ping =>
statsActorRef ! UpdateWorker(ctx.self)
Behaviors.same
}
}
// stats the actors who handle the messages and gives the result if someone ask
def workerStatsBehavior(list: List[ActorRef[Ping.type]]): Behavior[Command] = Behaviors.receiveMessage {
case UpdateWorker(actorRef) =>
workerStatsBehavior(actorRef :: list)
case GetWorkers(replyTo) =>
replyTo ! list
Behaviors.same
}
case class GroupRouterSpecSettings(node1WorkerCount: Int, node2WorkerCount: Int, messageCount: Int)
val pingPongKey = ServiceKey[Ping.type]("ping-pong")
}
class GroupRouterSpec extends ScalaTestWithActorTestKit(GroupRouterSpec.config) with WordSpecLike with LogCapturing {
import GroupRouterSpec._
def checkGroupRouterBehavior[T](groupRouter: GroupRouter[Ping.type], settings: GroupRouterSpecSettings)(
resultCheck: (Seq[ActorRef[Ping.type]], Seq[ActorRef[Ping.type]]) => T): T = {
import scala.concurrent.duration._
import akka.actor.typed.scaladsl.AskPattern._
implicit val system1 =
createSystem(
Behaviors.setup[Command] { ctx =>
(0 until settings.node1WorkerCount).foreach { i =>
val worker = ctx.spawn(pingPong(ctx.self), s"ping-pong-$i")
ctx.system.receptionist ! Receptionist.Register(pingPongKey, worker)
}
val router = ctx.spawn(groupRouter, "group-router")
// ensure all nodes are joined and all actors are discovered.
ctx.system.receptionist ! Receptionist.Subscribe(
pingPongKey,
ctx.spawn(Behaviors.receiveMessage[Receptionist.Listing] {
case pingPongKey.Listing(update)
if update.size == settings.node1WorkerCount + settings.node2WorkerCount =>
(0 until settings.messageCount).foreach(_ => router ! Ping)
Behaviors.empty
case _ =>
Behaviors.same
}, "waiting-actor-discovery"))
workerStatsBehavior(List.empty)
},
system.name)
val system2 = createSystem(Behaviors.setup[Command] { ctx =>
(0 until settings.node2WorkerCount).foreach { i =>
val worker = ctx.spawn(pingPong(ctx.self), s"ping-pong-$i")
ctx.system.receptionist ! Receptionist.Register(pingPongKey, worker)
}
workerStatsBehavior(List.empty)
}, system.name)
val node1 = Cluster(system1)
node1.manager ! Join(node1.selfMember.address)
val node2 = Cluster(system2)
node2.manager ! Join(node1.selfMember.address)
val statsPromise = Promise[(Seq[ActorRef[Ping.type]], Seq[ActorRef[Ping.type]])]
val cancelable = system.scheduler.scheduleAtFixedRate(200.millis, 200.millis)(() => {
implicit val timeout = Timeout(3.seconds)
val actorRefsInNode1 = system1.ask[Seq[ActorRef[Ping.type]]](ref => GetWorkers(ref)).futureValue
val actorRefsInNode2 = system2.ask[Seq[ActorRef[Ping.type]]](ref => GetWorkers(ref)).futureValue
// waiting all messages are handled
if (actorRefsInNode1.size + actorRefsInNode2.size == settings.messageCount && !statsPromise.isCompleted) {
statsPromise.success((actorRefsInNode1, actorRefsInNode2))
}
})(system.executionContext)
try {
val stats = statsPromise.future.futureValue
cancelable.cancel()
resultCheck(stats._1, stats._2)
} finally {
system1.terminate()
system2.terminate()
}
}
"GroupRouter" must {
"use all reachable routees if preferLocalRoutees is not enabled" in {
val settings = GroupRouterSpecSettings(node1WorkerCount = 2, node2WorkerCount = 2, messageCount = 100)
val groupRouters = List(
Routers.group(pingPongKey),
Routers.group(pingPongKey).withRandomRouting(),
Routers.group(pingPongKey).withRoundRobinRouting(),
Routers.group(pingPongKey).withRoundRobinRouting(false),
Routers.group(pingPongKey).withRandomRouting(false))
groupRouters.foreach { groupRouter =>
checkGroupRouterBehavior(groupRouter, settings) {
case (actorRefsInNode1, actorRefsInNode2) =>
(actorRefsInNode1.size + actorRefsInNode2.size) shouldBe settings.messageCount
actorRefsInNode1.toSet.size shouldBe settings.node1WorkerCount
actorRefsInNode2.toSet.size shouldBe settings.node2WorkerCount
}
}
}
"only use local routees if preferLocalRoutees is enabled and there are local routees" in {
val settings = GroupRouterSpecSettings(node1WorkerCount = 2, node2WorkerCount = 2, messageCount = 100)
val groupRouters =
List(Routers.group(pingPongKey).withRoundRobinRouting(true), Routers.group(pingPongKey).withRandomRouting(true))
groupRouters.foreach { groupRouter =>
checkGroupRouterBehavior(groupRouter, settings) {
case (actorRefsInNode1, actorRefsInNode2) =>
actorRefsInNode1.size shouldBe settings.messageCount
actorRefsInNode1.toSet.size shouldBe settings.node1WorkerCount
actorRefsInNode2.toSet.size shouldBe 0
}
}
}
"use remote routees if preferLocalRoutees is enabled but there is no local routees" in {
val settings = GroupRouterSpecSettings(node1WorkerCount = 0, node2WorkerCount = 2, messageCount = 100)
val groupRouters =
List(Routers.group(pingPongKey).withRoundRobinRouting(true), Routers.group(pingPongKey).withRandomRouting(true))
groupRouters.foreach { groupRouter =>
checkGroupRouterBehavior(groupRouter, settings) {
case (actorRefsInNode1, actorRefsInNode2) =>
actorRefsInNode2.size shouldBe settings.messageCount
actorRefsInNode1.toSet.size shouldBe 0
actorRefsInNode2.toSet.size shouldBe settings.node2WorkerCount
}
}
}
}
}

View file

@ -81,12 +81,17 @@ of routees stays relatively stable, but may be unfair if the set of routees chan
This is the default for pool routers as the pool of routees is expected to remain the same.
An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false.
### Random
Randomly selects a routee when a message is sent through the router.
This is the default for group routers as the group of routees is expected to change as nodes join and leave the cluster.
An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false.
### Consistent Hashing
Uses [consistent hashing](http://en.wikipedia.org/wiki/Consistent_hashing) to select a routee based