From d97b8fbd9c367aaf0b55ceb47a1aa3f7ff7b8792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 1 Apr 2011 15:33:46 +0200 Subject: [PATCH] Added Routing.Broadcast message and handling to be able to broadcast a message to all the actors a load-balancer represents --- .gitignore | 1 + .../main/scala/akka/routing/Iterators.scala | 9 ++-- .../src/main/scala/akka/routing/Routers.scala | 25 ++++++++-- .../src/main/scala/akka/routing/Routing.scala | 9 ++-- .../src/main/scala/Pi.scala | 49 +++++++------------ 5 files changed, 49 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index c0fa0f10b4..8613a0ba79 100755 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ run-codefellow multiverse.log .eprj .*.swp +akka-tutorials/akka-tutorial-pi-sbt/project/boot/ \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Iterators.scala b/akka-actor/src/main/scala/akka/routing/Iterators.scala index 1643c6a6b0..f076ea00c1 100644 --- a/akka-actor/src/main/scala/akka/routing/Iterators.scala +++ b/akka-actor/src/main/scala/akka/routing/Iterators.scala @@ -10,12 +10,14 @@ import scala.collection.JavaConversions._ /** * An Iterator that is either always empty or yields an infinite number of Ts. */ -trait InfiniteIterator[T] extends Iterator[T] +trait InfiniteIterator[T] extends Iterator[T] { + val items: Seq[T] +} /** * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. */ -case class CyclicIterator[T](items: Seq[T]) extends InfiniteIterator[T] { +case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { def this(items: java.util.List[T]) = this(items.toSeq) @volatile private[this] var current: Seq[T] = items @@ -29,14 +31,13 @@ case class CyclicIterator[T](items: Seq[T]) extends InfiniteIterator[T] { } override def exists(f: T => Boolean): Boolean = items.exists(f) - } /** * This InfiniteIterator always returns the Actor that has the currently smallest mailbox * useful for work-stealing. */ -case class SmallestMailboxFirstIterator(items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] { +case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] { def this(items: java.util.List[ActorRef]) = this(items.toSeq) def hasNext = items != Nil diff --git a/akka-actor/src/main/scala/akka/routing/Routers.scala b/akka-actor/src/main/scala/akka/routing/Routers.scala index 2dcf3d0ccc..57511076e8 100644 --- a/akka-actor/src/main/scala/akka/routing/Routers.scala +++ b/akka-actor/src/main/scala/akka/routing/Routers.scala @@ -15,7 +15,11 @@ trait Dispatcher { this: Actor => protected def routes: PartialFunction[Any, ActorRef] + protected def broadcast(message: Any) {} + protected def dispatch: Receive = { + case Routing.Broadcast(message) => + broadcast(message) case a if routes.isDefinedAt(a) => if (isSenderDefined) routes(a).forward(transform(a))(someSelf) else routes(a).!(transform(a))(None) @@ -34,14 +38,19 @@ abstract class UntypedDispatcher extends UntypedActor { protected def route(msg: Any): ActorRef + protected def broadcast(message: Any) {} + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined @throws(classOf[Exception]) def onReceive(msg: Any): Unit = { - val r = route(msg) - if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") - if (isSenderDefined) r.forward(transform(msg))(someSelf) - else r.!(transform(msg))(None) + if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message) + else { + val r = route(msg) + if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") + if (isSenderDefined) r.forward(transform(msg))(someSelf) + else r.!(transform(msg))(None) + } } } @@ -52,7 +61,11 @@ abstract class UntypedDispatcher extends UntypedActor { trait LoadBalancer extends Dispatcher { self: Actor => protected def seq: InfiniteIterator[ActorRef] - protected def routes = { case x if seq.hasNext => seq.next } + protected def routes = { + case x if seq.hasNext => seq.next + } + + override def broadcast(message: Any) = seq.items.foreach(_ ! message) override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) } @@ -68,5 +81,7 @@ abstract class UntypedLoadBalancer extends UntypedDispatcher { if (seq.hasNext) seq.next else null + override def broadcast(message: Any) = seq.items.foreach(_ ! message) + override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 1d43950f8b..2e041a4e35 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -9,6 +9,9 @@ import akka.actor.Actor._ object Routing { + sealed trait RoutingMessage + case class Broadcast(message: Any) extends RoutingMessage + type PF[A, B] = PartialFunction[A, B] /** @@ -31,7 +34,7 @@ object Routing { /** * Creates a LoadBalancer from the thunk-supplied InfiniteIterator. */ - def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = + def loadBalancerActor(actors: => InfiniteIterator[ActorRef]): ActorRef = actorOf(new Actor with LoadBalancer { val seq = actors }).start @@ -39,7 +42,7 @@ object Routing { /** * Creates a Dispatcher given a routing and a message-transforming function. */ - def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = + def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) => Any): ActorRef = actorOf(new Actor with Dispatcher { override def transform(msg: Any) = msgTransformer(msg) def routes = routing @@ -48,7 +51,7 @@ object Routing { /** * Creates a Dispatcher given a routing. */ - def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher { + def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher { def routes = routing }).start diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala index 787842e59e..ba4c960ff4 100644 --- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala @@ -4,9 +4,10 @@ package akka.tutorial.sbt.pi -import akka.actor.{Actor, ActorRef} +import akka.actor.{Actor, ActorRef, PoisonPill} import Actor._ import akka.routing.{Routing, CyclicIterator} +import Routing._ import akka.event.EventHandler import akka.dispatch.Dispatchers @@ -17,38 +18,21 @@ object Main extends App { Pi.calculate } -/* - Pi estimate: 3.1415926435897883 - - === 8 workers (with custom dispatcher 4/4) - Calculation time: 5163 millis - - === 8 workers (with default dispatcher) - Calculation time: 6789 millis - - === 4 workers - Calculation time: 5438 millis - - === 2 workers - Calculation time: 6002 millis - - === 1 workers - Calculation time: 8173 millis -*/ object Pi { - val nrOfWorkers = 4 - val nrOfMessages = 10000 + val nrOfWorkers = 4 + val nrOfMessages = 10000 val nrOfElements = 10000 // ===== Messages ===== sealed trait PiMessage + case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage case class Work(arg: Int, fun: (Int) => Double) extends PiMessage case class Result(value: Double) extends PiMessage // ===== Worker ===== class Worker extends Actor { def receive = { - case Work(arg, fun) => self.reply(Result(fun(arg))) + case Work(arg, fun) => self reply Result(fun(arg)) } } @@ -68,19 +52,20 @@ object Pi { // wrap them with a load-balancing router val router = Routing.loadBalancerActor(CyclicIterator(workers)).start + // define the work + val algorithm = (i: Int) => { + val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1) + val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1)) + results.sum + } + def receive = { case Calculate(nrOfMessages, nrOfElements) => - // define the work - val fun = (i: Int) => { - val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1) - val results = for (j <- range) yield (4 * math.pow(-1, j) / (2 * j + 1)) - results.sum - } // schedule work - for (arg <- 0 until nrOfMessages) router ! Work(arg, fun) + for (arg <- 0 until nrOfMessages) router ! Work(arg, algorithm) // send a PoisonPill to all workers telling them to shut down themselves - router broadcast PoisonPill + router ! Broadcast(PoisonPill) case Result(value) => pi += value @@ -92,12 +77,12 @@ object Pi { override def postStop = { EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) - latch.nrOfResultsDown + latch.countDown } } def calculate = { - val latch = new nrOfResultsDownLatch(1) + val latch = new CountDownLatch(1) // create the master val master = actorOf(new Master(latch)).start