Akka29924 router pool bcast (#29927)

This commit is contained in:
eyal farago 2021-01-11 16:08:56 +02:00 committed by GitHub
parent 2dedb715a1
commit b8c79f8695
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 123 additions and 8 deletions

View file

@ -17,4 +17,10 @@ public class RoutersTest {
Behavior<String> pool = Behavior<String> pool =
Routers.pool(5, Behaviors.<String>empty()).withRandomRouting().withRoundRobinRouting(); Routers.pool(5, Behaviors.<String>empty()).withRandomRouting().withRoundRobinRouting();
} }
public void poolBroadcastCompileOnlyApiTest() {
Behavior<String> b = Behaviors.receiveMessage((String str) -> Behaviors.same());
Behavior<String> poolBehavior =
Routers.pool(5, b).withBroadcastPredicate(str -> str.startsWith("bc-"));
}
} }

View file

@ -56,6 +56,14 @@ public class RouterTest {
// #routee // #routee
// intentionally outside the routee scope
static class DoBroadcastLog extends Worker.DoLog {
public DoBroadcastLog(String text) {
super(text);
}
}
static Behavior<Void> showPoolRouting() { static Behavior<Void> showPoolRouting() {
return return
// #pool // #pool
@ -88,6 +96,11 @@ public class RouterTest {
PoolRouter<Worker.Command> alternativePool = pool.withPoolSize(2).withRoundRobinRouting(); PoolRouter<Worker.Command> alternativePool = pool.withPoolSize(2).withRoundRobinRouting();
// #strategy // #strategy
// #broadcast
PoolRouter<Worker.Command> broadcastingPool =
pool.withBroadcastPredicate(msg -> msg instanceof DoBroadcastLog);
// #broadcast
return Behaviors.empty(); return Behaviors.empty();
// #pool // #pool
}); });

View file

@ -4,11 +4,9 @@
package akka.actor.typed.scaladsl package akka.actor.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.{ ActorPath, ActorSystem }
import akka.actor.ActorSystem
import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -115,6 +113,35 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
} }
} }
"support broadcast" in {
trait Cmd
case object ReplyWithAck extends Cmd
case object BCast extends Cmd
def behavior(replyTo: ActorRef[AnyRef]) = Behaviors.setup[Cmd] { ctx =>
Behaviors.receiveMessage[Cmd] {
case ReplyWithAck | BCast =>
val reply = ctx.self.path
replyTo ! reply
Behaviors.same
}
}
val probe = testKit.createTestProbe[AnyRef]()
val pool = testKit.spawn(Routers.pool(4)(behavior(probe.ref)).withBroadcastPredicate(_ eq BCast))
pool ! BCast
val msgs = probe.receiveMessages(4).map { m =>
m should be(an[ActorPath])
m.asInstanceOf[ActorPath]
}
msgs should equal(msgs.distinct)
probe.expectNoMessage()
pool ! ReplyWithAck
probe.expectMessageType[ActorPath]
probe.expectNoMessage()
}
} }
"The router group" must { "The router group" must {

View file

@ -32,6 +32,13 @@ object RouterSpec {
} }
// #routee // #routee
//intentionally out of the routee section
class DoBroadcastLog(text: String) extends Worker.DoLog(text)
object DoBroadcastLog {
def apply(text: String) = new DoBroadcastLog(text)
}
// This code is extra indented for visualization purposes // This code is extra indented for visualization purposes
// format: OFF // format: OFF
// #group // #group
@ -87,13 +94,19 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool") val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool")
alternativeRouter ! Worker.DoLog("msg") alternativeRouter ! Worker.DoLog("msg")
//#pool //#pool
Behaviors.empty
// #broadcast
val poolWithBroadcast = pool.withBroadcastPredicate(_.isInstanceOf[DoBroadcastLog])
val routerWithBroadcast = ctx.spawn(poolWithBroadcast, "pool-with-broadcast")
//this will be sent to all 4 routees
routerWithBroadcast ! DoBroadcastLog("msg")
Behaviors.empty
// #broadcast
} }
//#pool //#pool
) )
probe.receiveMessages(11) probe.receiveMessages(15)
} }
"show group routing" in { "show group routing" in {

View file

@ -0,0 +1,14 @@
# akka29924 aktor.typed Router.Pool support for broadcast messages
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.PoolRouter.withBroadcastPredicate")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.PoolRouter.withBroadcastPredicate")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.<init>$default$4")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$4")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterImpl.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.this")

View file

@ -5,11 +5,13 @@
package akka.actor.typed.internal.routing package akka.actor.typed.internal.routing
import java.util.function import java.util.function
import akka.actor.typed._ import akka.actor.typed._
import akka.actor.typed.javadsl.PoolRouter import akka.actor.typed.javadsl.PoolRouter
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.ConstantFun
import java.util.function.Predicate
/** /**
* INTERNAL API * INTERNAL API
@ -19,6 +21,7 @@ private[akka] final case class PoolRouterBuilder[T](
poolSize: Int, poolSize: Int,
behavior: Behavior[T], behavior: Behavior[T],
logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RoundRobinLogic[T], logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RoundRobinLogic[T],
broadcastPredicate: T => Boolean = ConstantFun.anyToFalse,
routeeProps: Props = Props.empty) routeeProps: Props = Props.empty)
extends javadsl.PoolRouter[T] extends javadsl.PoolRouter[T]
with scaladsl.PoolRouter[T] { with scaladsl.PoolRouter[T] {
@ -26,7 +29,13 @@ private[akka] final case class PoolRouterBuilder[T](
// deferred creation of the actual router // deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] = def apply(ctx: TypedActorContext[T]): Behavior[T] =
new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory(ctx.asScala.system), routeeProps) new PoolRouterImpl[T](
ctx.asScala,
poolSize,
behavior,
logicFactory(ctx.asScala.system),
broadcastPredicate,
routeeProps)
def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]()) def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]())
@ -43,6 +52,10 @@ private[akka] final case class PoolRouterBuilder[T](
def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize) def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize)
def withRouteeProps(routeeProps: Props): PoolRouterBuilder[T] = copy(routeeProps = routeeProps) def withRouteeProps(routeeProps: Props): PoolRouterBuilder[T] = copy(routeeProps = routeeProps)
override def withBroadcastPredicate(pred: Predicate[T]): PoolRouter[T] = withBroadcastPredicate(pred.test _)
override def withBroadcastPredicate(pred: T => Boolean): scaladsl.PoolRouter[T] = copy(broadcastPredicate = pred)
} }
/** /**
@ -54,6 +67,7 @@ private final class PoolRouterImpl[T](
poolSize: Int, poolSize: Int,
behavior: Behavior[T], behavior: Behavior[T],
logic: RoutingLogic[T], logic: RoutingLogic[T],
broadcastPredicate: T => Boolean,
routeeProps: Props) routeeProps: Props)
extends AbstractBehavior[T](ctx) { extends AbstractBehavior[T](ctx) {
@ -70,7 +84,11 @@ private final class PoolRouterImpl[T](
} }
def onMessage(msg: T): Behavior[T] = { def onMessage(msg: T): Behavior[T] = {
logic.selectRoutee(msg) ! msg if ((broadcastPredicate ne ConstantFun.anyToFalse) && broadcastPredicate(msg)) {
ctx.children.foreach(_.unsafeUpcast ! msg)
} else {
logic.selectRoutee(msg) ! msg
}
this this
} }

View file

@ -10,6 +10,8 @@ import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
import java.util.function.Predicate
object Routers { object Routers {
/** /**
@ -181,4 +183,9 @@ abstract class PoolRouter[T] extends DeferredBehavior[T] {
* Set the props used to spawn the pool's routees * Set the props used to spawn the pool's routees
*/ */
def withRouteeProps(routeeProps: Props): PoolRouter[T] def withRouteeProps(routeeProps: Props): PoolRouter[T]
/**
* Any message that the predicate returns true for will be broadcast to all routees.
*/
def withBroadcastPredicate(pred: Predicate[T]): PoolRouter[T]
} }

View file

@ -171,4 +171,9 @@ trait PoolRouter[T] extends Behavior[T] {
* Set the props used to spawn the pool's routees * Set the props used to spawn the pool's routees
*/ */
def withRouteeProps(routeeProps: Props): PoolRouter[T] def withRouteeProps(routeeProps: Props): PoolRouter[T]
/**
* Any message that the predicate returns true for will be broadcast to all routees.
*/
def withBroadcastPredicate(pred: T => Boolean): PoolRouter[T]
} }

View file

@ -48,6 +48,7 @@ import akka.japi.function.{ Function => JFun, Function2 => JFun2 }
val unitToUnit = () => () val unitToUnit = () => ()
val anyToTrue: Any => Boolean = (_: Any) => true val anyToTrue: Any => Boolean = (_: Any) => true
val anyToFalse: Any => Boolean = (_: Any) => false
private val _nullFun = (_: Any) => null private val _nullFun = (_: Any) => null

View file

@ -62,6 +62,17 @@ Scala
Java Java
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #pool-dispatcher } : @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #pool-dispatcher }
### Broadcasting a message to all routees
Pool routers can be configured to identify messages intended to be broad-casted to all routees.
Therefore, the `PoolRouter` has a property to configure its `broadcastPredicate`:
Scala
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #broadcast }
Java
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #broadcast }
## Group Router ## Group Router
The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover