From b8c79f86958399b7f68104a21689488162b5e04c Mon Sep 17 00:00:00 2001 From: eyal farago Date: Mon, 11 Jan 2021 16:08:56 +0200 Subject: [PATCH] Akka29924 router pool bcast (#29927) --- .../akka/actor/typed/javadsl/RoutersTest.java | 6 ++++ .../java/jdocs/akka/typed/RouterTest.java | 13 ++++++++ .../actor/typed/scaladsl/RoutersSpec.scala | 33 +++++++++++++++++-- .../scala/docs/akka/typed/RouterSpec.scala | 17 ++++++++-- .../routers-pool-bcast.backwards.excludes | 14 ++++++++ .../internal/routing/PoolRouterImpl.scala | 24 ++++++++++++-- .../akka/actor/typed/javadsl/Routers.scala | 7 ++++ .../akka/actor/typed/scaladsl/Routers.scala | 5 +++ .../main/scala/akka/util/ConstantFun.scala | 1 + akka-docs/src/main/paradox/typed/routers.md | 11 +++++++ 10 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/routers-pool-bcast.backwards.excludes diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java index c9587a64ee..1b92bbfb1a 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java @@ -17,4 +17,10 @@ public class RoutersTest { Behavior pool = Routers.pool(5, Behaviors.empty()).withRandomRouting().withRoundRobinRouting(); } + + public void poolBroadcastCompileOnlyApiTest() { + Behavior b = Behaviors.receiveMessage((String str) -> Behaviors.same()); + Behavior poolBehavior = + Routers.pool(5, b).withBroadcastPredicate(str -> str.startsWith("bc-")); + } } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java index de1f11a6d9..b60e235c53 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java @@ -56,6 +56,14 @@ public class RouterTest { // #routee + // intentionally outside the routee scope + static class DoBroadcastLog extends Worker.DoLog { + + public DoBroadcastLog(String text) { + super(text); + } + } + static Behavior showPoolRouting() { return // #pool @@ -88,6 +96,11 @@ public class RouterTest { PoolRouter alternativePool = pool.withPoolSize(2).withRoundRobinRouting(); // #strategy + // #broadcast + PoolRouter broadcastingPool = + pool.withBroadcastPredicate(msg -> msg instanceof DoBroadcastLog); + // #broadcast + return Behaviors.empty(); // #pool }); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala index bf9e49e2a0..d190f53cc6 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -4,11 +4,9 @@ package akka.actor.typed.scaladsl import java.util.concurrent.atomic.AtomicInteger - import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike - -import akka.actor.ActorSystem +import akka.actor.{ ActorPath, ActorSystem } import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LoggingTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -115,6 +113,35 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" } } + "support broadcast" in { + trait Cmd + case object ReplyWithAck extends Cmd + case object BCast extends Cmd + + def behavior(replyTo: ActorRef[AnyRef]) = Behaviors.setup[Cmd] { ctx => + Behaviors.receiveMessage[Cmd] { + case ReplyWithAck | BCast => + val reply = ctx.self.path + replyTo ! reply + Behaviors.same + } + } + + val probe = testKit.createTestProbe[AnyRef]() + val pool = testKit.spawn(Routers.pool(4)(behavior(probe.ref)).withBroadcastPredicate(_ eq BCast)) + pool ! BCast + val msgs = probe.receiveMessages(4).map { m => + m should be(an[ActorPath]) + m.asInstanceOf[ActorPath] + } + msgs should equal(msgs.distinct) + probe.expectNoMessage() + + pool ! ReplyWithAck + probe.expectMessageType[ActorPath] + probe.expectNoMessage() + } + } "The router group" must { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala index 68717df81a..32d8950f1e 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -32,6 +32,13 @@ object RouterSpec { } // #routee + + //intentionally out of the routee section + class DoBroadcastLog(text: String) extends Worker.DoLog(text) + object DoBroadcastLog { + def apply(text: String) = new DoBroadcastLog(text) + } + // This code is extra indented for visualization purposes // format: OFF // #group @@ -87,13 +94,19 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool") alternativeRouter ! Worker.DoLog("msg") //#pool - Behaviors.empty + // #broadcast + val poolWithBroadcast = pool.withBroadcastPredicate(_.isInstanceOf[DoBroadcastLog]) + val routerWithBroadcast = ctx.spawn(poolWithBroadcast, "pool-with-broadcast") + //this will be sent to all 4 routees + routerWithBroadcast ! DoBroadcastLog("msg") + Behaviors.empty + // #broadcast } //#pool ) - probe.receiveMessages(11) + probe.receiveMessages(15) } "show group routing" in { diff --git a/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/routers-pool-bcast.backwards.excludes b/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/routers-pool-bcast.backwards.excludes new file mode 100644 index 0000000000..086148d6a8 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.10.backwards.excludes/routers-pool-bcast.backwards.excludes @@ -0,0 +1,14 @@ +# akka29924 aktor.typed Router.Pool support for broadcast messages + +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.PoolRouter.withBroadcastPredicate") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.PoolRouter.withBroadcastPredicate") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.$default$4") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.unapply") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$4") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.copy$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterImpl.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.routing.PoolRouterBuilder.this") \ No newline at end of file diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala index f54ba9c88b..1f4c710372 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala @@ -5,11 +5,13 @@ package akka.actor.typed.internal.routing import java.util.function - import akka.actor.typed._ import akka.actor.typed.javadsl.PoolRouter import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors } import akka.annotation.InternalApi +import akka.util.ConstantFun + +import java.util.function.Predicate /** * INTERNAL API @@ -19,6 +21,7 @@ private[akka] final case class PoolRouterBuilder[T]( poolSize: Int, behavior: Behavior[T], logicFactory: ActorSystem[_] => RoutingLogic[T] = (_: ActorSystem[_]) => new RoutingLogics.RoundRobinLogic[T], + broadcastPredicate: T => Boolean = ConstantFun.anyToFalse, routeeProps: Props = Props.empty) extends javadsl.PoolRouter[T] with scaladsl.PoolRouter[T] { @@ -26,7 +29,13 @@ private[akka] final case class PoolRouterBuilder[T]( // deferred creation of the actual router def apply(ctx: TypedActorContext[T]): Behavior[T] = - new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory(ctx.asScala.system), routeeProps) + new PoolRouterImpl[T]( + ctx.asScala, + poolSize, + behavior, + logicFactory(ctx.asScala.system), + broadcastPredicate, + routeeProps) def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = _ => new RoutingLogics.RandomLogic[T]()) @@ -43,6 +52,10 @@ private[akka] final case class PoolRouterBuilder[T]( def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize) def withRouteeProps(routeeProps: Props): PoolRouterBuilder[T] = copy(routeeProps = routeeProps) + + override def withBroadcastPredicate(pred: Predicate[T]): PoolRouter[T] = withBroadcastPredicate(pred.test _) + + override def withBroadcastPredicate(pred: T => Boolean): scaladsl.PoolRouter[T] = copy(broadcastPredicate = pred) } /** @@ -54,6 +67,7 @@ private final class PoolRouterImpl[T]( poolSize: Int, behavior: Behavior[T], logic: RoutingLogic[T], + broadcastPredicate: T => Boolean, routeeProps: Props) extends AbstractBehavior[T](ctx) { @@ -70,7 +84,11 @@ private final class PoolRouterImpl[T]( } def onMessage(msg: T): Behavior[T] = { - logic.selectRoutee(msg) ! msg + if ((broadcastPredicate ne ConstantFun.anyToFalse) && broadcastPredicate(msg)) { + ctx.children.foreach(_.unsafeUpcast ! msg) + } else { + logic.selectRoutee(msg) ! msg + } this } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala index 0f015cdd5c..b99e4855fa 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala @@ -10,6 +10,8 @@ import akka.actor.typed.internal.routing.{ GroupRouterBuilder, PoolRouterBuilder import akka.actor.typed.receptionist.ServiceKey import akka.annotation.DoNotInherit +import java.util.function.Predicate + object Routers { /** @@ -181,4 +183,9 @@ abstract class PoolRouter[T] extends DeferredBehavior[T] { * Set the props used to spawn the pool's routees */ def withRouteeProps(routeeProps: Props): PoolRouter[T] + + /** + * Any message that the predicate returns true for will be broadcast to all routees. + */ + def withBroadcastPredicate(pred: Predicate[T]): PoolRouter[T] } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala index 50ba734899..13925ae305 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala @@ -171,4 +171,9 @@ trait PoolRouter[T] extends Behavior[T] { * Set the props used to spawn the pool's routees */ def withRouteeProps(routeeProps: Props): PoolRouter[T] + + /** + * Any message that the predicate returns true for will be broadcast to all routees. + */ + def withBroadcastPredicate(pred: T => Boolean): PoolRouter[T] } diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 298519ae4e..daac559c84 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -48,6 +48,7 @@ import akka.japi.function.{ Function => JFun, Function2 => JFun2 } val unitToUnit = () => () val anyToTrue: Any => Boolean = (_: Any) => true + val anyToFalse: Any => Boolean = (_: Any) => false private val _nullFun = (_: Any) => null diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index 690bf2c2bb..6fd6bcc52a 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -62,6 +62,17 @@ Scala Java : @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #pool-dispatcher } +### Broadcasting a message to all routees + +Pool routers can be configured to identify messages intended to be broad-casted to all routees. +Therefore, the `PoolRouter` has a property to configure its `broadcastPredicate`: + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #broadcast } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #broadcast } + ## Group Router The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover