From ef6c8370cd09f8d8e7eff46dea513b93d7134f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 24 Nov 2011 16:35:37 +0100 Subject: [PATCH] Added BroadcastRouter which broadcasts all messages to all the connections it manages, also added tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../test/scala/akka/routing/RoutingSpec.scala | 75 +++++++++++++++++++ .../scala/akka/actor/ActorRefProvider.scala | 15 ++-- .../src/main/scala/akka/routing/Routing.scala | 47 ++++++++++-- .../akka/remote/RemoteActorRefProvider.scala | 17 +++-- 4 files changed, 132 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 8fea2d6f26..a744100a9b 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -400,4 +400,79 @@ class RoutingSpec extends AkkaSpec { } }) } + + "broadcast router" must { + + "be started when constructed" in { + val actor1 = actorOf[TestActor] + + val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(actor1))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") + actor.isShutdown must be(false) + } + + "broadcast message using !" in { + val doneLatch = new CountDownLatch(2) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) + } + }) + + val counter2 = new AtomicInteger + val connection2 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) + + val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") + + actor ! 1 + actor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) + counter2.get must be(1) + } + + "broadcast message using ?" in { + val doneLatch = new CountDownLatch(2) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ + counter1.addAndGet(msg) + sender ! "ack" + } + }) + + val counter2 = new AtomicInteger + val connection2 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) + + val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) + val actor = new RoutedActorRef(app, props, app.guardian, "foo") + + actor ? 1 + actor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) + counter2.get must be(1) + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 23a7083fda..67ff38f847 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -16,7 +16,7 @@ import akka.actor.Timeout.intToTimeout import akka.config.ConfigurationException import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise } import akka.event.{ Logging, DeathWatch, ActorClassification } -import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } +import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter, BroadcastRouter } import akka.util.Helpers import akka.AkkaException @@ -211,13 +211,14 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { // create a routed actor ref case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒ - + implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher + implicit val timeout = app.AkkaConfig.ActorTimeout val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ () ⇒ new DirectRouter - case RouterType.Random ⇒ () ⇒ new RandomRouter - case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter - case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter()( - if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher, app.AkkaConfig.ActorTimeout) + case RouterType.Direct ⇒ () ⇒ new DirectRouter + case RouterType.Random ⇒ () ⇒ new RandomRouter + case RouterType.Broadcast ⇒ () ⇒ new BroadcastRouter + case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c2bea21608..0c813ef92f 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -41,6 +41,11 @@ object RouterType { */ object ScatterGather extends RouterType + /** + * A RouterType that broadcasts the messages to all connections. + */ + object Broadcast extends RouterType + /** * A RouterType that selects the connection based on the least amount of cpu usage */ @@ -67,15 +72,14 @@ object RouterType { * Routed ActorRef configuration object, this is thread safe and fully sharable. */ case class RoutedProps private[akka] ( - routerFactory: () ⇒ Router = RoutedProps.defaultRouterFactory, - connectionManager: ConnectionManager = new LocalConnectionManager(List()), + routerFactory: () ⇒ Router, + connectionManager: ConnectionManager, timeout: Timeout = RoutedProps.defaultTimeout, localOnly: Boolean = RoutedProps.defaultLocalOnly) { } object RoutedProps { final val defaultTimeout = Timeout(Duration.MinusInf) - final val defaultRouterFactory = () ⇒ new RoundRobinRouter final val defaultLocalOnly = false } @@ -257,12 +261,41 @@ trait BasicRouter extends Router { private def throwNoConnectionsError = throw new RoutingException("No replica connections for router") } +/** + * A Router that uses broadcasts a message to all its connections. + */ +class BroadcastRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter with Serializable { + override def route(message: Any)(implicit sender: ActorRef) = { + connectionManager.connections.iterable foreach { connection ⇒ + try { + connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' + } catch { + case e: Exception ⇒ + connectionManager.remove(connection) + throw e + } + } + } + + //protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = + override def route[T](message: Any, timeout: Timeout): Future[T] = { + import Future._ + implicit val t = timeout + val futures = connectionManager.connections.iterable map { connection ⇒ + connection.?(message, timeout).asInstanceOf[Future[T]] + } + Future.firstCompletedOf(futures) + } + + protected def next: Option[ActorRef] = None +} + /** * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef. * * @author Jonas Bonér */ -class DirectRouter extends BasicRouter { +class DirectRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { private val state = new AtomicReference[DirectRouterState] @@ -304,7 +337,7 @@ class DirectRouter extends BasicRouter { * * @author Jonas Bonér */ -class RandomRouter extends BasicRouter { +class RandomRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { import java.security.SecureRandom private val state = new AtomicReference[RandomRouterState] @@ -346,7 +379,7 @@ class RandomRouter extends BasicRouter { * * @author Jonas Bonér */ -class RoundRobinRouter extends BasicRouter { +class RoundRobinRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { private val state = new AtomicReference[RoundRobinState] @@ -437,7 +470,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget * mode, the router would behave as {@link RoundRobinRouter} */ -class ScatterGatherFirstCompletedRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter { +class ScatterGatherFirstCompletedRouter(implicit dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter { protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index db5aafa444..66189632a5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -68,14 +68,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { deployer.lookupDeploymentFor(path.toString) match { case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ - // FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one - // val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { - // case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector - // case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector - // case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) - // case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) - // } - def isReplicaNode: Boolean = remoteAddresses exists { _ == app.address } //app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode)) @@ -85,6 +77,9 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { local.actorOf(props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create } else { + implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher + implicit val timeout = app.AkkaConfig.ActorTimeout + // we are on the single "reference" node uses the remote actors on the replica nodes val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { case RouterType.Direct ⇒ @@ -93,6 +88,12 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { .format(name, remoteAddresses.mkString(", "))) () ⇒ new DirectRouter + case RouterType.Broadcast ⇒ + if (remoteAddresses.size != 1) throw new ConfigurationException( + "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]" + .format(name, remoteAddresses.mkString(", "))) + () ⇒ new BroadcastRouter + case RouterType.Random ⇒ if (remoteAddresses.size < 1) throw new ConfigurationException( "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"