diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 18a800e408..fe490fdd0a 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -36,7 +36,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }).start() - val d = dispatcherActor { + val d = routerActor { case Test1 | Test2 ⇒ t1 case Test3 ⇒ t2 }.start() diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index efda300114..839474a3ba 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -12,14 +12,14 @@ import akka.util.{ Duration } import java.util.concurrent.atomic.AtomicReference object TypedActor { - private val selfReference = new scala.util.DynamicVariable[AnyRef](null) - def self[T <: AnyRef] = selfReference.value.asInstanceOf[T] + private val selfReference = new ThreadLocal[AnyRef] + def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] class TypedActor[TI <: AnyRef](proxyRef: AtomicReference[AnyRef], createInstance: ⇒ TI) extends Actor { val me = createInstance def receive = { case m: MethodCall ⇒ - selfReference.value = proxyRef.get + selfReference set proxyRef.get try { m match { case m if m.isOneWay ⇒ m(me) @@ -27,7 +27,7 @@ object TypedActor { case m ⇒ self reply m(me) } } finally { - selfReference.value = null + selfReference set null } } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b87eef67bc..0ea431d88a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -32,8 +32,8 @@ object Routing { /** * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true. */ - def intercept[A, B](interceptor: (A) ⇒ Unit, interceptee: PF[A, B]): PF[A, B] = - filter({ case a if a.isInstanceOf[A] ⇒ interceptor(a) }, interceptee) + def intercept[A: Manifest, B](interceptor: (A) ⇒ Unit, interceptee: PF[A, B]): PF[A, B] = + filter({ case a ⇒ interceptor(a) }, interceptee) /** * Creates a LoadBalancer from the thunk-supplied InfiniteIterator. @@ -44,18 +44,18 @@ object Routing { }).start() /** - * Creates a Dispatcher given a routing and a message-transforming function. + * Creates a Router given a routing and a message-transforming function. */ - def dispatcherActor(routing: PF[Any, ActorRef], msgTransformer: (Any) ⇒ Any): ActorRef = - actorOf(new Actor with Dispatcher { + def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) ⇒ Any): ActorRef = + actorOf(new Actor with Router { override def transform(msg: Any) = msgTransformer(msg) def routes = routing }).start() /** - * Creates a Dispatcher given a routing. + * Creates a Router given a routing. */ - def dispatcherActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Dispatcher { + def routerActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Router { def routes = routing }).start() @@ -64,7 +64,7 @@ object Routing { * both another actor and through the supplied function */ def loggerActor(actorToLog: ActorRef, logger: (Any) ⇒ Unit): ActorRef = - dispatcherActor({ case _ ⇒ actorToLog }, logger) + routerActor({ case _ ⇒ actorToLog }, logger) } /** @@ -108,9 +108,9 @@ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends Infini } /** - * A Dispatcher is a trait whose purpose is to route incoming messages to actors. + * A Router is a trait whose purpose is to route incoming messages to actors. */ -trait Dispatcher { this: Actor ⇒ +trait Router { this: Actor ⇒ protected def transform(msg: Any): Any = msg @@ -132,9 +132,9 @@ trait Dispatcher { this: Actor ⇒ } /** - * An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors. + * An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors. */ -abstract class UntypedDispatcher extends UntypedActor { +abstract class UntypedRouter extends UntypedActor { protected def transform(msg: Any): Any = msg protected def route(msg: Any): ActorRef @@ -144,22 +144,21 @@ abstract class UntypedDispatcher extends UntypedActor { private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined @throws(classOf[Exception]) - def onReceive(msg: Any): Unit = { - if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message) - else { + def onReceive(msg: Any): Unit = msg match { + case m: Routing.Broadcast ⇒ broadcast(m.message) + case _ ⇒ val r = route(msg) if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") if (isSenderDefined) r.forward(transform(msg))(someSelf) else r.!(transform(msg))(None) - } } } /** - * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets + * A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets * to dispatch incoming messages to. */ -trait LoadBalancer extends Dispatcher { self: Actor ⇒ +trait LoadBalancer extends Router { self: Actor ⇒ protected def seq: InfiniteIterator[ActorRef] protected def routes = { @@ -172,10 +171,10 @@ trait LoadBalancer extends Dispatcher { self: Actor ⇒ } /** - * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets + * A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets * to dispatch incoming messages to. */ -abstract class UntypedLoadBalancer extends UntypedDispatcher { +abstract class UntypedLoadBalancer extends UntypedRouter { protected def seq: InfiniteIterator[ActorRef] protected def route(msg: Any) = diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 4caae2a7d0..bf4af356a3 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -26,7 +26,7 @@ An UntypedDispatcher is an actor that routes incoming messages to outbound actor } } - public class MyDispatcher extends UntypedDispatcher { + public class MyRouter extends UntypedRouter { private ActorRef pinger = actorOf(Pinger.class).start(); private ActorRef ponger = actorOf(Ponger.class).start(); @@ -38,9 +38,9 @@ An UntypedDispatcher is an actor that routes incoming messages to outbound actor } } - ActorRef dispatcher = actorOf(MyDispatcher.class).start(); - dispatcher.sendOneWay("Ping"); //Prints "Pinger: Ping" - dispatcher.sendOneWay("Pong"); //Prints "Ponger: Pong" + ActorRef router = actorOf(MyRouter.class).start(); + router.sendOneWay("Ping"); //Prints "Pinger: Ping" + router.sendOneWay("Pong"); //Prints "Ponger: Pong" UntypedLoadBalancer ------------------- @@ -80,15 +80,15 @@ An UntypedLoadBalancer is an actor that forwards messages it receives to a bound } } - ActorRef dispatcher = actorOf(MyLoadBalancer.class).start(); - dispatcher.sendOneWay("Pong"); //Prints "Pinger: Pong" - dispatcher.sendOneWay("Ping"); //Prints "Ponger: Ping" - dispatcher.sendOneWay("Ping"); //Prints "Pinger: Ping" - dispatcher.sendOneWay("Pong"); //Prints "Ponger: Pong + ActorRef balancer = actorOf(MyLoadBalancer.class).start(); + balancer.sendOneWay("Pong"); //Prints "Pinger: Pong" + balancer.sendOneWay("Ping"); //Prints "Ponger: Ping" + balancer.sendOneWay("Ping"); //Prints "Pinger: Ping" + balancer.sendOneWay("Pong"); //Prints "Ponger: Pong You can also send a 'new Routing.Broadcast(msg)' message to the router to have it be broadcasted out to all the actors it represents. .. code-block:: java - router.sendOneWay(new Routing.Broadcast(new PoisonPill())); + balancer.sendOneWay(new Routing.Broadcast(new PoisonPill())); diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 6f90f4fcce..8a0680d5cb 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -7,12 +7,12 @@ Routing (Scala) Akka-core includes some building blocks to build more complex message flow handlers, they are listed and explained below: -Dispatcher +Router ---------- -A Dispatcher is an actor that routes incoming messages to outbound actors. +A Router is an actor that routes incoming messages to outbound actors. -To use it you can either create a Dispatcher through the ``dispatcherActor()`` factory method +To use it you can either create a Router through the ``routerActor()`` factory method .. code-block:: scala @@ -29,9 +29,9 @@ To use it you can either create a Dispatcher through the ``dispatcherActor()`` f val pinger = actorOf(new Actor { def receive = { case x => println("Pinger: " + x) } }).start() val ponger = actorOf(new Actor { def receive = { case x => println("Ponger: " + x) } }).start() - //A dispatcher that dispatches Ping messages to the pinger + //A router that dispatches Ping messages to the pinger //and Pong messages to the ponger - val d = dispatcherActor { + val d = routerActor { case Ping => pinger case Pong => ponger } @@ -39,19 +39,19 @@ To use it you can either create a Dispatcher through the ``dispatcherActor()`` f d ! Ping //Prints "Pinger: Ping" d ! Pong //Prints "Ponger: Pong" -Or by mixing in akka.patterns.Dispatcher: +Or by mixing in akka.routing.Router: .. code-block:: scala import akka.actor.Actor import akka.actor.Actor._ - import akka.routing.Dispatcher + import akka.routing.Router //Our message types case object Ping case object Pong - class MyDispatcher extends Actor with Dispatcher { + class MyRouter extends Actor with Router { //Our pinger and ponger actors val pinger = actorOf(new Actor { def receive = { case x => println("Pinger: " + x) } }).start() val ponger = actorOf(new Actor { def receive = { case x => println("Ponger: " + x) } }).start() @@ -63,8 +63,8 @@ Or by mixing in akka.patterns.Dispatcher: } } - //Create an instance of our dispatcher, and start it - val d = actorOf[MyDispatcher].start() + //Create an instance of our router, and start it + val d = actorOf[MyRouter].start() d ! Ping //Prints "Pinger: Ping" d ! Pong //Prints "Ponger: Pong"