Implemented a couple of router types. Updated some tests. See #1440

This commit is contained in:
Henrik Engstrom 2011-12-11 22:34:38 +01:00
parent fd7a041c09
commit a7886abdf0
6 changed files with 359 additions and 566 deletions

View file

@ -1,7 +1,5 @@
package akka.routing package akka.routing
import akka.routing._
import akka.config.ConfigurationException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor._ import akka.actor._
import collection.mutable.LinkedList import collection.mutable.LinkedList
@ -17,6 +15,7 @@ object RoutingSpec {
println("Hello") println("Hello")
} }
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -26,30 +25,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
import akka.routing.RoutingSpec._ import akka.routing.RoutingSpec._
// TODO (HE) : Update test with new routing functionality "no router" must {
/*
"direct router" must {
"be started when constructed" in { "be started when constructed" in {
val actor1 = system.actorOf[TestActor] val routedActor = system.actorOf(Props(new TestActor).withRouting(NoRouter))
routedActor.isTerminated must be(false)
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor.isTerminated must be(false)
} }
"send message to connection" in { "send message to connection" in {
val doneLatch = new CountDownLatch(1) val doneLatch = new CountDownLatch(1)
val counter = new AtomicInteger(0) val counter = new AtomicInteger(0)
val connection1 = system.actorOf(new Actor {
class Actor1 extends Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case _ counter.incrementAndGet case _ counter.incrementAndGet
} }
}) }
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) val routedActor = system.actorOf(Props(new Actor1).withRouting(NoRouter))
val routedActor = new RoutedActorRef(system, props, impl.guardian, "foo")
routedActor ! "hello" routedActor ! "hello"
routedActor ! "end" routedActor ! "end"
@ -57,38 +51,12 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
counter.get must be(1) counter.get must be(1)
} }
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(1)
val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
})
val props = RoutedProps(routerFactory = () new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor ! Broadcast(1)
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
}
} }
"round robin router" must { "round robin router" must {
"be started when constructed" in { "be started when constructed" in {
val actor1 = system.actorOf[TestActor] val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(nrOfInstances = 1)))
routedActor.isTerminated must be(false)
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor.isTerminated must be(false)
} }
//In this test a bunch of actors are created and each actor has its own counter. //In this test a bunch of actors are created and each actor has its own counter.
@ -101,32 +69,30 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val doneLatch = new CountDownLatch(connectionCount) val doneLatch = new CountDownLatch(connectionCount)
//lets create some connections. //lets create some connections.
var connections = new LinkedList[ActorRef] var actors = new LinkedList[ActorRef]
var counters = new LinkedList[AtomicInteger] var counters = new LinkedList[AtomicInteger]
for (i 0 until connectionCount) { for (i 0 until connectionCount) {
counters = counters :+ new AtomicInteger() counters = counters :+ new AtomicInteger()
val connection = system.actorOf(new Actor { val actor = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counters.get(i).get.addAndGet(msg) case msg: Int counters.get(i).get.addAndGet(msg)
} }
}) })
connections = connections :+ connection actors = actors :+ actor
} }
//create the routed actor. val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = actors)))
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
//send messages to the actor. //send messages to the actor.
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
for (k 0 until connectionCount) { for (k 0 until connectionCount) {
actor ! (k + 1) routedActor ! (k + 1)
} }
} }
actor ! Broadcast("end") routedActor ! Broadcast("end")
//now wait some and do validations. //now wait some and do validations.
doneLatch.await(5, TimeUnit.SECONDS) must be(true) doneLatch.await(5, TimeUnit.SECONDS) must be(true)
@ -140,7 +106,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val doneLatch = new CountDownLatch(2) val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor { val actor1 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg) case msg: Int counter1.addAndGet(msg)
@ -148,18 +114,17 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
}) })
val counter2 = new AtomicInteger val counter2 = new AtomicInteger
val connection2 = system.actorOf(new Actor { val actor2 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg) case msg: Int counter2.addAndGet(msg)
} }
}) })
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(actor1, actor2))))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor ! Broadcast(1) routedActor ! Broadcast(1)
actor ! Broadcast("end") routedActor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true) doneLatch.await(5, TimeUnit.SECONDS) must be(true)
@ -167,6 +132,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
counter2.get must be(1) counter2.get must be(1)
} }
// TODO (HE) : Is this still a valid test case?
/*
"fail to deliver a broadcast message using the ?" in { "fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1) val doneLatch = new CountDownLatch(1)
@ -178,33 +145,32 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
} }
}) })
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1))) val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(connection1))))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
intercept[RoutingException] { actor ? Broadcast(1) } intercept[RoutingException] {
routedActor ? Broadcast(1)
}
actor ! "end" routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true) doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0) counter1.get must be(0)
} }
*/
} }
"random router" must { "random router" must {
"be started when constructed" in { "be started when constructed" in {
val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(nrOfInstances = 1)))
val actor1 = system.actorOf[TestActor] routedActor.isTerminated must be(false)
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor.isTerminated must be(false)
} }
"deliver a broadcast message" in { "deliver a broadcast message" in {
val doneLatch = new CountDownLatch(2) val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor { val actor1 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg) case msg: Int counter1.addAndGet(msg)
@ -212,18 +178,17 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
}) })
val counter2 = new AtomicInteger val counter2 = new AtomicInteger
val connection2 = system.actorOf(new Actor { val actor2 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg) case msg: Int counter2.addAndGet(msg)
} }
}) })
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(targets = List(actor1, actor2))))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
actor ! Broadcast(1) routedActor ! Broadcast(1)
actor ! Broadcast("end") routedActor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true) doneLatch.await(5, TimeUnit.SECONDS) must be(true)
@ -231,49 +196,118 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
counter2.get must be(1) counter2.get must be(1)
} }
"fail to deliver a broadcast message using the ?" in { // TODO (HE) : Is this still a valid test case?
val doneLatch = new CountDownLatch(1) /*
"fail to deliver a broadcast message using the ?" in {
val doneLatch = new CountDownLatch(1)
val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case _ counter1.incrementAndGet()
}
})
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1)))
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
try {
actor ? Broadcast(1)
fail()
} catch {
case e: RoutingException
}
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
}
*/
}
"broadcast router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(nrOfInstances = 1)))
routedActor.isTerminated must be(false)
}
"broadcast message using !" in {
val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor { val actor1 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case _ counter1.incrementAndGet() case msg: Int counter1.addAndGet(msg)
} }
}) })
val props = RoutedProps(routerFactory = () new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) val counter2 = new AtomicInteger
val actor = new RoutedActorRef(system, props, impl.guardian, "foo") val actor2 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
})
try { val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2))))
actor ? Broadcast(1) routedActor ! 1
fail() routedActor ! "end"
} catch {
case e: RoutingException
}
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true) doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(0)
counter1.get must be(1)
counter2.get must be(1)
}
"broadcast message using ?" in {
val doneLatch = new CountDownLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int
counter1.addAndGet(msg)
sender ! "ack"
}
})
val counter2 = new AtomicInteger
val actor2 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
})
val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2))))
routedActor ? 1
routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
counter2.get must be(1)
} }
} }
"Scatter-gather router" must { // TODO (HE) : add tests below
/*
"return response, even if one of the connections has stopped" in { "Scatter-gather router" must {
val shutdownLatch = new TestLatch(1) "return response, even if one of the actors has stopped" in {
val shutdownLatch = new TestLatch(1)
val actor1 = newActor(1, Some(shutdownLatch))
val actor2 = newActor(2, Some(shutdownLatch))
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
val props = RoutedProps(routerFactory = () new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) routedActor ! Broadcast(Stop(Some(1)))
shutdownLatch.await
val actor = new RoutedActorRef(system, props, impl.guardian, "foo") (routedActor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
}
actor ! Broadcast(Stop(Some(0)))
shutdownLatch.await
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
}
"throw an exception, if all the connections have stopped" in { "throw an exception, if all the connections have stopped" in {
@ -330,7 +364,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val connection = system.actorOf(new Actor { val connection = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counters.get(i).get.addAndGet(msg) case msg: Int counters.get(i).get.addAndGet(msg)
} }
}) })
@ -363,7 +397,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val counter1 = new AtomicInteger val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor { val connection1 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg) case msg: Int counter1.addAndGet(msg)
} }
}) })
@ -371,7 +405,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val counter2 = new AtomicInteger val counter2 = new AtomicInteger
val connection2 = system.actorOf(new Actor { val connection2 = system.actorOf(new Actor {
def receive = { def receive = {
case "end" doneLatch.countDown() case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg) case msg: Int counter2.addAndGet(msg)
} }
}) })
@ -389,95 +423,31 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
counter2.get must be(1) counter2.get must be(1)
} }
case class Stop(id: Option[Int] = None)
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor { case class Stop(id: Option[Int] = None)
def receive = {
case Stop(None) self.stop() def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor {
case Stop(Some(_id)) if (_id == id) self.stop() def receive = {
case _id: Int if (_id == id) case Stop(None)
case _ Thread sleep 100 * id; sender.tell(id) println(">>>> STOPPING : " + id)
self.stop()
case Stop(Some(_id)) if (_id == id)
println(">>>> STOPPING >: " + id)
self.stop()
case _id: Int if (_id == id)
println("-----> ID MATCH - do nothing")
case x {
Thread sleep 100 * id
println("-----> SENDING REPLY: " + id)
sender.tell(id)
} }
override def postStop = {
shudownLatch foreach (_.countDown())
}
})
}
"broadcast router" must {
"be started when constructed" in {
val actor1 = system.actorOf[TestActor]
val props = RoutedProps(routerFactory = () new BroadcastRouter, connectionManager = new LocalConnectionManager(List(actor1)))
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
actor.isTerminated must be(false)
} }
"broadcast message using !" in { override def postStop = {
val doneLatch = new CountDownLatch(2) println("***** POSTSTOP")
shudownLatch foreach (_.countDown())
val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
})
val counter2 = new AtomicInteger
val connection2 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
})
val props = RoutedProps(routerFactory = () new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
actor ! 1
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
counter2.get must be(1)
} }
})
"broadcast message using ?" in { }
val doneLatch = new CountDownLatch(2) */
val counter1 = new AtomicInteger
val connection1 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int
counter1.addAndGet(msg)
sender ! "ack"
}
})
val counter2 = new AtomicInteger
val connection2 = system.actorOf(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
})
val props = RoutedProps(routerFactory = () new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
actor ? 1
actor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
counter1.get must be(1)
counter2.get must be(1)
}
}
*/
} }

View file

@ -520,17 +520,14 @@ class LocalActorRefProvider(
val path = supervisor.path / name val path = supervisor.path / name
props.routerConfig match { props.routerConfig match {
case NoRouting new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor case NoRouter new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
case routedActor new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path) case routedActor new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path)
} }
} }
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
val lookupPath = p.elements.mkString("/", "/", "") val lookupPath = p.elements.mkString("/", "/", "")
println("**** LOOKUP PATH : " + lookupPath) r.adaptFromDeploy(deployer.lookup(lookupPath))
val deploy = deployer.lookup(lookupPath)
println("**** " + deploy)
r.adaptFromDeploy(deploy)
} }
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {

View file

@ -156,7 +156,7 @@ object DeploymentConfig {
} }
def routerTypeFor(routing: Routing): RouterType = routing match { def routerTypeFor(routing: Routing): RouterType = routing match {
case _: NoRouting | NoRouting RouterType.NoRouting case _: NoRouting | NoRouting RouterType.NoRouter
case _: RoundRobin | RoundRobin RouterType.RoundRobin case _: RoundRobin | RoundRobin RouterType.RoundRobin
case _: Random | Random RouterType.Random case _: Random | Random RouterType.Random
case _: ScatterGather | ScatterGather RouterType.ScatterGather case _: ScatterGather | ScatterGather RouterType.ScatterGather

View file

@ -8,7 +8,7 @@ import akka.dispatch._
import akka.japi.Creator import akka.japi.Creator
import akka.util._ import akka.util._
import collection.immutable.Stack import collection.immutable.Stack
import akka.routing.{ NoRouting, RouterConfig, RoutedProps } import akka.routing.{ NoRouter, RouterConfig }
/** /**
* ActorRef configuration object, this is threadsafe and fully sharable * ActorRef configuration object, this is threadsafe and fully sharable
@ -29,7 +29,7 @@ object Props {
case _ Escalate case _ Escalate
} }
final val defaultRoutedProps: RouterConfig = NoRouting final val defaultRoutedProps: RouterConfig = NoRouter
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)

View file

@ -7,12 +7,15 @@ package akka.routing
import akka.actor._ import akka.actor._
import akka.japi.Creator import akka.japi.Creator
import akka.util.ReflectiveAccess
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.routing.Routing.Broadcast import akka.routing.Routing.Broadcast
import akka.actor.DeploymentConfig.Deploy import akka.actor.DeploymentConfig.Deploy
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Future
import akka.util.{ Duration, ReflectiveAccess }
import java.util.concurrent.TimeUnit
import akka.AkkaException
sealed trait RouterType sealed trait RouterType
@ -26,7 +29,7 @@ object RouterType {
/** /**
* A RouterType that indicates no routing - i.e. direct message. * A RouterType that indicates no routing - i.e. direct message.
*/ */
object NoRouting extends RouterType object NoRouter extends RouterType
/** /**
* A RouterType that randomly selects a connection to send a message to. * A RouterType that randomly selects a connection to send a message to.
@ -70,6 +73,11 @@ object RouterType {
} }
/**
* An {@link AkkaException} thrown when something goes wrong while routing a message
*/
class RoutingException(message: String) extends AkkaException(message)
/** /**
* Contains the configuration to create local and clustered routed actor references. * Contains the configuration to create local and clustered routed actor references.
* Routed ActorRef configuration object, this is thread safe and fully sharable. * Routed ActorRef configuration object, this is thread safe and fully sharable.
@ -84,51 +92,9 @@ case class RoutedProps private[akka] (
} }
} }
///**
// * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
// * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
// *
// * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
// */
//trait Router {
//
// /**
// * Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
// * the current connections, signal that there were problems with one of the connections and see if there have
// * been changes in the connections.
// *
// * This method is not threadsafe, and should only be called once
// *
// * JMM Guarantees:
// * This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
// */
// def init(connectionManager: ConnectionManager)
//
// /**
// * Routes the message to one of the connections.
// *
// * @throws RoutingException if something goes wrong while routing the message
// */
// def route(message: Any)(implicit sender: ActorRef)
//
// /**
// * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
// * completion of the processing of the message.
// *
// * @throws RoutingExceptionif something goes wrong while routing the message.
// */
// def route[T](message: Any, timeout: Timeout): Future[T]
//}
//
///**
// * An {@link AkkaException} thrown when something goes wrong while routing a message
// */
//class RoutingException(message: String) extends AkkaException(message)
//
/** /**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
* on (or more) of these actors. * send a message to on (or more) of these actors.
*/ */
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
extends LocalActorRef( extends LocalActorRef(
@ -141,39 +107,33 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
override def !(message: Any)(implicit sender: ActorRef = null) { override def !(message: Any)(implicit sender: ActorRef = null) {
route(message) match { route(message) match {
case null super.!(message)(sender) case null super.!(message)(sender)
case ref: ActorRef ref.!(message)(sender) case ref: ActorRef ref.!(message)(sender)
case refs: Traversable[ActorRef] refs foreach (_.!(message)(sender)) case refs: Traversable[ActorRef]
message match {
case Broadcast(m) refs foreach (_.!(m)(sender))
case _ refs foreach (_.!(message)(sender))
}
} }
} }
// TODO (HE) : Should the RoutedActorRef also override "?"?
// If not how then Broadcast messages cannot be sent via ? -
// which it is in some test cases at the moment.
} }
trait RouterConfig extends Function0[Actor] { trait RouterConfig extends Function0[Actor] {
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig
def createRoute(creator: () Actor, actorContext: ActorContext): Routing.Route def createRoute(creator: () Actor, actorContext: ActorContext): Routing.Route
} }
/** /**
* Routing configuration that indicates no routing. * A Router is responsible for sending a message to one (or more) of its connections.
* Oxymoron style.
*/
case object NoRouting extends RouterConfig {
def adaptFromDeploy(deploy: Option[Deploy]) = null
def createRoute(creator: () Actor, actorContext: ActorContext) = null
def apply(): Actor = null
}
/**
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
* {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait Router { trait Router {
// TODO (HE): implement failure detection
} }
/** /**
@ -206,6 +166,18 @@ object Routing {
type Route = (Any) AnyRef type Route = (Any) AnyRef
} }
/**
* Routing configuration that indicates no routing.
* Oxymoron style.
*/
case object NoRouter extends RouterConfig {
def adaptFromDeploy(deploy: Option[Deploy]) = null
def createRoute(creator: () Actor, actorContext: ActorContext) = null
def apply(): Actor = null
}
/** /**
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
* <br> * <br>
@ -222,8 +194,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match { deploy match {
case Some(d) copy(nrOfInstances = d.nrOfInstances.factor) case Some(d)
case _ this // In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
} }
} }
@ -236,12 +210,8 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = { def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
val routees: Vector[ActorRef] = (nrOfInstances, targets) match { val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.") case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) case (x, Nil) (1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
println("----> 0, Nil") case (x, xs) Vector.empty[ActorRef] ++ xs
(1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouting)))(scala.collection.breakOut)
case (x, xs)
println("----> x, xs")
Vector.empty[ActorRef] ++ xs
} }
val next = new AtomicInteger(0) val next = new AtomicInteger(0)
@ -251,292 +221,164 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
} }
{ {
case _: AutoReceivedMessage null //TODO: handle system specific messages case msg: AutoReceivedMessage null // TODO (HE): how should system messages be handled?
case Broadcast(msg) routees case Broadcast(msg) routees
case msg getNext() case msg getNext()
} }
} }
/*
private val state = new AtomicReference[RoundRobinState]
def next: Option[ActorRef] = currentState.next
@tailrec
private def currentState: RoundRobinState = {
val current = state.get
if (current != null && current.version == connectionManager.version) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
current
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val connections = connectionManager.connections
val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
else //we failed to update the state, lets try again... better luck next time.
currentState
}
}
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
private val index = new AtomicInteger(0)
def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex))
@tailrec
private def nextIndex: Int = {
val oldIndex = index.get
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex
else oldIndex
}
}
*/
} }
///** /**
// * An Abstract Router implementation that already provides the basic infrastructure so that a concrete * A Router that randomly selects one of the target connections to send a message to.
// * Router only needs to implement the next method. * <br>
// */ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
//trait BasicRouter extends Router { * that the random router should both create new actors and use the 'targets' actor(s).
// * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
// @volatile * <br>
// protected var connectionManager: ConnectionManager = _ * <b>The</b> configuration parameter trumps the constructor arguments. This means that
// * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
// def init(connectionManager: ConnectionManager) = { * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
// this.connectionManager = connectionManager */
// } case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
// extends Router with RouterConfig {
// def route(message: Any)(implicit sender: ActorRef) = message match {
// case Routing.Broadcast(message) def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
// deploy match {
// //it is a broadcast message, we are going to send to message to all connections. case Some(d)
// connectionManager.connections.iterable foreach { // In case there is a config then use this over any programmed settings.
// connection copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
// try { case _ this
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' }
// } catch { }
// case e: Exception
// connectionManager.remove(connection) def apply(): Actor = new Actor {
// throw e def receive = {
// } case _
// } }
// case _ }
// //it no broadcast message, we are going to select an actor from the connections and send the message to him.
// next match { import java.security.SecureRandom
// case Some(connection)
// try { private val random = new ThreadLocal[SecureRandom] {
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' override def initialValue = SecureRandom.getInstance("SHA1PRNG")
// } catch { }
// case e: Exception
// connectionManager.remove(connection) def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
// throw e val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
// } case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
// case None case (x, Nil) (1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
// throwNoConnectionsError case (x, xs) Vector.empty[ActorRef] ++ xs
// } }
// }
// def getNext(): ActorRef = {
// def route[T](message: Any, timeout: Timeout): Future[T] = message match { routees(random.get.nextInt(routees.size))
// case Routing.Broadcast(message) }
// throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
// case _ {
// //it no broadcast message, we are going to select an actor from the connections and send the message to him. case msg: AutoReceivedMessage null // TODO (HE): how should system messages be handled?
// next match { case Broadcast(msg) routees
// case Some(connection) case msg getNext()
// try { }
// connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?? }
// } catch { }
// case e: Exception
// connectionManager.remove(connection)
// throw e
// }
// case None
// throwNoConnectionsError
// }
// }
//
// protected def next: Option[ActorRef]
//
// private def throwNoConnectionsError = throw new RoutingException("No replica connections for router")
//}
//
///**
// * A Router that uses broadcasts a message to all its connections.
// */
//class BroadcastRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter with Serializable {
// override def route(message: Any)(implicit sender: ActorRef) = {
// connectionManager.connections.iterable foreach {
// connection
// try {
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
// } catch {
// case e: Exception
// connectionManager.remove(connection)
// throw e
// }
// }
// }
//
// //protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] =
// override def route[T](message: Any, timeout: Timeout): Future[T] = {
// import Future._
// implicit val t = timeout
// val futures = connectionManager.connections.iterable map {
// connection
// connection.?(message, timeout).asInstanceOf[Future[T]]
// }
// Future.firstCompletedOf(futures)
// }
//
// protected def next: Option[ActorRef] = None
//}
//
///**
// * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
// *
// * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
// */
//class DirectRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter {
//
// private val state = new AtomicReference[DirectRouterState]
//
// lazy val next: Option[ActorRef] = {
// val current = currentState
// if (current.ref == null) None else Some(current.ref)
// }
//
// @tailrec
// private def currentState: DirectRouterState = {
// val current = state.get
//
// if (current != null && connectionManager.version == current.version) {
// //we are lucky since nothing has changed in the connections.
// current
// } else {
// //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
//
// val connections = connectionManager.connections
//
// val connectionCount = connections.iterable.size
// if (connectionCount > 1)
// throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
//
// val newState = new DirectRouterState(connections.iterable.head, connections.version)
// if (state.compareAndSet(current, newState))
// //we are lucky since we just updated the state, so we can send it back as the state to use
// newState
// else //we failed to update the state, lets try again... better luck next time.
// currentState // recur
// }
// }
//
// private case class DirectRouterState(ref: ActorRef, version: Long)
//
//}
//
///**
// * A Router that randomly selects one of the target connections to send a message to.
// *
// * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
// */
//class RandomRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter {
//
// import java.security.SecureRandom
//
// private val state = new AtomicReference[RandomRouterState]
//
// private val random = new ThreadLocal[SecureRandom] {
// override def initialValue = SecureRandom.getInstance("SHA1PRNG")
// }
//
// def next: Option[ActorRef] = currentState.array match {
// case a if a.isEmpty None
// case a Some(a(random.get.nextInt(a.length)))
// }
//
// @tailrec
// private def currentState: RandomRouterState = {
// val current = state.get
//
// if (current != null && current.version == connectionManager.version) {
// //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
// current
// } else {
// //there has been a change in connections, or it was the first try, so we need to update the internal state
//
// val connections = connectionManager.connections
// val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version)
// if (state.compareAndSet(current, newState))
// //we are lucky since we just updated the state, so we can send it back as the state to use
// newState
// else //we failed to update the state, lets try again... better luck next time.
// currentState
// }
// }
//
// private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
//}
/** /**
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the * A Router that uses broadcasts a message to all its connections.
* specified strategy (specific router needs to implement `gather` method). * <br>
* Scatter-gather pattern will be applied only to the messages broadcasted using Future * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget * that the random router should both create new actors and use the 'targets' actor(s).
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
* * <br>
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected. * <b>The</b> configuration parameter trumps the constructor arguments. This means that
* FIXME: this is also the location where message buffering should be done in case of failure. * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
/* case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
trait ScatterGatherRouter extends BasicRouter with Serializable { extends Router with RouterConfig {
/** def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
* Aggregates the responses into a single Future. deploy match {
* case Some(d)
* @param results Futures of the responses from connections // In case there is a config then use this over any programmed settings.
*/ copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] case _ this
}
}
private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = { def apply(): Actor = new Actor {
val responses = connectionManager.connections.iterable.flatMap { def receive = {
actor case _
}
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
case (x, xs) Vector.empty[ActorRef] ++ xs
}
{
case msg: AutoReceivedMessage null // TODO (HE): how should system messages be handled?
case Broadcast(msg) routees
case msg routees
}
}
}
// TODO (HE) : Correct description below
/**
* Simple router that broadcasts the message to all connections, and replies with the first response.
* Scatter-gather pattern will be applied only to the messages broadcast using Future
* (wrapped into {@link Routing.Broadcast} and sent with "?" method).
* For the messages sent in a fire-forget mode, the router would behave as {@link RoundRobinRouter}
*/
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig {
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
deploy match {
case Some(d)
// In case there is a config then use this over any programmed settings.
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
case _ this
}
}
def apply(): Actor = new Actor {
def receive = {
case _
}
}
def createRoute(creator: () Actor, context: ActorContext): Routing.Route = {
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
case (0, Nil) throw new IllegalArgumentException("Insufficient information - missing configuration.")
case (x, Nil) (1 to x).map(_ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
case (x, xs) Vector.empty[ActorRef] ++ xs
}
def scatterGather[S, G >: S](message: Any, t: Timeout): Future[G] = {
val responses = routees.flatMap { actor
try { try {
if (actor.isTerminated) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace if (actor.isTerminated) None else Some(actor.?(message, t).asInstanceOf[Future[S]])
Some(actor.?(message, timeout).asInstanceOf[Future[S]])
} catch { } catch {
case e: Exception case e: Exception None
connectionManager.remove(actor)
None
} }
}
if (!responses.isEmpty) throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
else {
implicit val messageDispatcher = context.dispatcher
implicit val timeout = t
Future.firstCompletedOf(responses)
}
} }
if (responses.isEmpty) // TODO (HE) : Timeout and Future should be updated to new strategy - or hardcoded value below should at least be removed!
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message)) {
else gather(responses) case msg: AutoReceivedMessage null // TODO (HE): how should system messages be handled?
} case Broadcast(msg) routees
case msg scatterGather(msg, Timeout(Duration(5000, TimeUnit.MILLISECONDS)))
override def route[T](message: Any, timeout: Timeout): Future[T] = message match { }
case Routing.Broadcast(message) scatterGather(message, timeout)
case message super.route(message, timeout)
} }
} }
*/
/**
* Simple router that broadcasts the message to all connections, and replies with the first response
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
* mode, the router would behave as {@link RoundRobinRouter}
*/
/*
class ScatterGatherFirstCompletedRouter(implicit dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
}
*/

View file

@ -6,12 +6,12 @@ package akka.tutorial.first.scala
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import akka.actor._ import akka.actor._
import akka.routing._ import akka.routing._
import com.typesafe.config.ConfigFactory import com.typesafe.config.{ ConfigFactory, Config }
object Pi extends App { object Pi extends App {
// Initiate the calculation // Initiate the calculation
calculate(nrOfWorkers = 4, nrOfElements = 10, nrOfMessages = 10) calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// ==================== // ====================
// ===== Messages ===== // ===== Messages =====
@ -39,7 +39,6 @@ object Pi extends App {
def receive = { def receive = {
case Work(start, nrOfElements) case Work(start, nrOfElements)
println("*** RECEIVED MESSAGE IN: " + self.path)
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
} }
} }
@ -55,26 +54,11 @@ object Pi extends App {
var start: Long = _ var start: Long = _
// create the workers // create the workers
val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
var workers = Vector.empty[ActorRef] // create a round robin router for the workers
for (i 1 to 2) { val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
workers = context.actorOf[Worker] +: workers //val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi")
}
// TODO (HE) : use this way of creating actors
//val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
/*
// wrap them with a load-balancing router
// FIXME routers are intended to be used like this
implicit val timout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers))
val router = new RoutedActorRef(context.system, props, self.asInstanceOf[InternalActorRef], "pi")
*/
//val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi")
// message handler // message handler
def receive = { def receive = {
@ -107,7 +91,7 @@ object Pi extends App {
// ===== Run it ===== // ===== Run it =====
// ================== // ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
val system = ActorSystem() val system = ActorSystem("PiSystem", ConfigFactory.load("akka.conf"))
// this latch is only plumbing to know when the calculation is completed // this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1) val latch = new CountDownLatch(1)