Misc improvements of ActorRoutedRef. Implemented a scatterer gatherer router. Enabled router related tests. See #1440.
This commit is contained in:
parent
a7886abdf0
commit
192f84df71
9 changed files with 203 additions and 308 deletions
|
|
@ -131,32 +131,6 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
|||
counter1.get must be(1)
|
||||
counter2.get must be(1)
|
||||
}
|
||||
|
||||
// TODO (HE) : Is this still a valid test case?
|
||||
/*
|
||||
"fail to deliver a broadcast message using the ?" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case _ ⇒ counter1.incrementAndGet()
|
||||
}
|
||||
})
|
||||
|
||||
val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(connection1))))
|
||||
|
||||
intercept[RoutingException] {
|
||||
routedActor ? Broadcast(1)
|
||||
}
|
||||
|
||||
routedActor ! "end"
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
counter1.get must be(0)
|
||||
}
|
||||
*/
|
||||
|
||||
}
|
||||
|
||||
"random router" must {
|
||||
|
|
@ -195,35 +169,6 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
|||
counter1.get must be(1)
|
||||
counter2.get must be(1)
|
||||
}
|
||||
|
||||
// TODO (HE) : Is this still a valid test case?
|
||||
/*
|
||||
"fail to deliver a broadcast message using the ?" in {
|
||||
val doneLatch = new CountDownLatch(1)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case _ ⇒ counter1.incrementAndGet()
|
||||
}
|
||||
})
|
||||
|
||||
val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1)))
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
try {
|
||||
actor ? Broadcast(1)
|
||||
fail()
|
||||
} catch {
|
||||
case e: RoutingException ⇒
|
||||
}
|
||||
|
||||
actor ! "end"
|
||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||
counter1.get must be(0)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
"broadcast router" must {
|
||||
|
|
@ -293,129 +238,35 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO (HE) : add tests below
|
||||
/*
|
||||
|
||||
"Scatter-gather router" must {
|
||||
|
||||
"return response, even if one of the actors has stopped" in {
|
||||
val shutdownLatch = new TestLatch(1)
|
||||
val actor1 = newActor(1, Some(shutdownLatch))
|
||||
val actor2 = newActor(2, Some(shutdownLatch))
|
||||
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
|
||||
|
||||
routedActor ! Broadcast(Stop(Some(1)))
|
||||
shutdownLatch.await
|
||||
(routedActor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
|
||||
}
|
||||
|
||||
"throw an exception, if all the connections have stopped" in {
|
||||
|
||||
val shutdownLatch = new TestLatch(2)
|
||||
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))))
|
||||
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
actor ! Broadcast(Stop())
|
||||
|
||||
shutdownLatch.await
|
||||
|
||||
(intercept[RoutingException] {
|
||||
actor ? Broadcast(0)
|
||||
}) must not be (null)
|
||||
|
||||
}
|
||||
|
||||
"return the first response from connections, when all of them replied" in {
|
||||
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
|
||||
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
|
||||
|
||||
}
|
||||
|
||||
"return the first response from connections, when some of them failed to reply" in {
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
|
||||
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
|
||||
}
|
||||
"Scatter-gather router" must {
|
||||
|
||||
"be started when constructed" in {
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0))))
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
actor.isTerminated must be(false)
|
||||
}
|
||||
|
||||
"deliver one-way messages in a round robin fashion" in {
|
||||
val connectionCount = 10
|
||||
val iterationCount = 10
|
||||
val doneLatch = new TestLatch(connectionCount)
|
||||
|
||||
var connections = new LinkedList[ActorRef]
|
||||
var counters = new LinkedList[AtomicInteger]
|
||||
for (i ← 0 until connectionCount) {
|
||||
counters = counters :+ new AtomicInteger()
|
||||
|
||||
val connection = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
||||
}
|
||||
})
|
||||
connections = connections :+ connection
|
||||
}
|
||||
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections))
|
||||
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
for (i ← 0 until iterationCount) {
|
||||
for (k ← 0 until connectionCount) {
|
||||
actor ! (k + 1)
|
||||
}
|
||||
}
|
||||
|
||||
actor ! Broadcast("end")
|
||||
|
||||
doneLatch.await
|
||||
|
||||
for (i ← 0 until connectionCount) {
|
||||
val counter = counters.get(i).get
|
||||
counter.get must be((iterationCount * (i + 1)))
|
||||
}
|
||||
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(newActor(0)))))
|
||||
routedActor.isTerminated must be(false)
|
||||
}
|
||||
|
||||
"deliver a broadcast message using the !" in {
|
||||
val doneLatch = new TestLatch(2)
|
||||
|
||||
val counter1 = new AtomicInteger
|
||||
val connection1 = system.actorOf(new Actor {
|
||||
val actor1 = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
||||
}
|
||||
})
|
||||
|
||||
val counter2 = new AtomicInteger
|
||||
val connection2 = system.actorOf(new Actor {
|
||||
val actor2 = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||
}
|
||||
})
|
||||
|
||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
||||
|
||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
||||
|
||||
actor ! Broadcast(1)
|
||||
actor ! Broadcast("end")
|
||||
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
|
||||
routedActor ! Broadcast(1)
|
||||
routedActor ! Broadcast("end")
|
||||
|
||||
doneLatch.await
|
||||
|
||||
|
|
@ -423,31 +274,33 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
|||
counter2.get must be(1)
|
||||
}
|
||||
|
||||
"return response, even if one of the actors has stopped" in {
|
||||
val shutdownLatch = new TestLatch(1)
|
||||
val actor1 = newActor(1, Some(shutdownLatch))
|
||||
val actor2 = newActor(22, Some(shutdownLatch))
|
||||
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
|
||||
|
||||
case class Stop(id: Option[Int] = None)
|
||||
routedActor ! Broadcast(Stop(Some(1)))
|
||||
shutdownLatch.await
|
||||
(routedActor ? Broadcast(0)).as[Int].get must be(22)
|
||||
}
|
||||
|
||||
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor {
|
||||
def receive = {
|
||||
case Stop(None) ⇒
|
||||
println(">>>> STOPPING : " + id)
|
||||
self.stop()
|
||||
case Stop(Some(_id)) if (_id == id) ⇒
|
||||
println(">>>> STOPPING >: " + id)
|
||||
self.stop()
|
||||
case _id: Int if (_id == id) ⇒
|
||||
println("-----> ID MATCH - do nothing")
|
||||
case x ⇒ {
|
||||
Thread sleep 100 * id
|
||||
println("-----> SENDING REPLY: " + id)
|
||||
sender.tell(id)
|
||||
case class Stop(id: Option[Int] = None)
|
||||
|
||||
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case Stop(None) ⇒ self.stop()
|
||||
case Stop(Some(_id)) if (_id == id) ⇒ self.stop()
|
||||
case _id: Int if (_id == id) ⇒
|
||||
case x ⇒ {
|
||||
Thread sleep 100 * id
|
||||
sender.tell(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop = {
|
||||
println("***** POSTSTOP")
|
||||
shudownLatch foreach (_.countDown())
|
||||
}
|
||||
})
|
||||
}
|
||||
*/
|
||||
override def postStop = {
|
||||
shudownLatch foreach (_.countDown())
|
||||
}
|
||||
}), "Actor:" + id)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -299,7 +299,16 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
||||
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout)
|
||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
actorCell.provider.ask(timeout) match {
|
||||
case Some(a) ⇒
|
||||
this.!(message)(a)
|
||||
a.result
|
||||
case None ⇒
|
||||
this.!(message)(null)
|
||||
new DefaultPromise[Any](0)(system.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||
|
||||
|
|
|
|||
|
|
@ -5,21 +5,13 @@
|
|||
package akka.actor
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
|
||||
import scala.annotation.tailrec
|
||||
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
|
||||
import akka.actor.Timeout.intToTimeout
|
||||
import akka.config.ConfigurationException
|
||||
import akka.dispatch._
|
||||
import akka.routing._
|
||||
import akka.AkkaException
|
||||
import akka.util.{ Duration, Switch, Helpers }
|
||||
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
|
||||
import akka.event._
|
||||
import akka.event.Logging.Error._
|
||||
import akka.event.Logging.Warning
|
||||
import java.io.Closeable
|
||||
import com.typesafe.config.Config
|
||||
|
||||
/**
|
||||
* Interface for all ActorRef providers to implement.
|
||||
|
|
@ -105,9 +97,10 @@ trait ActorRefProvider {
|
|||
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
|
||||
|
||||
/**
|
||||
* Create AskActorRef to hook up message send to recipient with Future receiver.
|
||||
* Create AskActorRef and register it properly so it can be serialized/deserialized;
|
||||
* caller needs to send the message.
|
||||
*/
|
||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
|
||||
def ask(within: Timeout): Option[AskActorRef]
|
||||
|
||||
/**
|
||||
* This Future is completed upon termination of this ActorRefProvider, which
|
||||
|
|
@ -530,11 +523,10 @@ class LocalActorRefProvider(
|
|||
r.adaptFromDeploy(deployer.lookup(lookupPath))
|
||||
}
|
||||
|
||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
|
||||
import akka.dispatch.DefaultPromise
|
||||
def ask(within: Timeout): Option[AskActorRef] = {
|
||||
(if (within == null) settings.ActorTimeout else within) match {
|
||||
case t if t.duration.length <= 0 ⇒
|
||||
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
|
||||
None
|
||||
case t ⇒
|
||||
val path = tempPath()
|
||||
val name = path.name
|
||||
|
|
@ -544,8 +536,7 @@ class LocalActorRefProvider(
|
|||
}
|
||||
}
|
||||
tempContainer.addChild(name, a)
|
||||
recipient.tell(message, a)
|
||||
a.result
|
||||
Some(a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,13 +9,13 @@ import akka.actor._
|
|||
import akka.japi.Creator
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import akka.config.ConfigurationException
|
||||
import akka.routing.Routing.Broadcast
|
||||
import akka.actor.DeploymentConfig.Deploy
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.dispatch.Future
|
||||
import akka.util.{ Duration, ReflectiveAccess }
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.AkkaException
|
||||
import scala.collection.JavaConversions._
|
||||
import akka.routing.Routing.{ Destination, Broadcast }
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
sealed trait RouterType
|
||||
|
||||
|
|
@ -41,16 +41,16 @@ object RouterType {
|
|||
*/
|
||||
object RoundRobin extends RouterType
|
||||
|
||||
/**
|
||||
* A RouterType that selects the connection by using scatter gather.
|
||||
*/
|
||||
object ScatterGather extends RouterType
|
||||
|
||||
/**
|
||||
* A RouterType that broadcasts the messages to all connections.
|
||||
*/
|
||||
object Broadcast extends RouterType
|
||||
|
||||
/**
|
||||
* A RouterType that selects the connection by using scatter gather.
|
||||
*/
|
||||
object ScatterGather extends RouterType
|
||||
|
||||
/**
|
||||
* A RouterType that selects the connection based on the least amount of cpu usage
|
||||
*/
|
||||
|
|
@ -70,7 +70,6 @@ object RouterType {
|
|||
* A user-defined custom RouterType.
|
||||
*/
|
||||
case class Custom(implClass: String) extends RouterType
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -106,20 +105,18 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
|
|||
val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext)
|
||||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null) {
|
||||
route(message) match {
|
||||
case null ⇒ super.!(message)(sender)
|
||||
case ref: ActorRef ⇒ ref.!(message)(sender)
|
||||
case refs: Traversable[ActorRef] ⇒
|
||||
message match {
|
||||
case Broadcast(m) ⇒ refs foreach (_.!(m)(sender))
|
||||
case _ ⇒ refs foreach (_.!(message)(sender))
|
||||
}
|
||||
val s = if (sender eq null) underlying.system.deadLetters else sender
|
||||
|
||||
val msg = message match {
|
||||
case Broadcast(m) ⇒ m
|
||||
case m ⇒ m
|
||||
}
|
||||
|
||||
route(s, message) match {
|
||||
case Nil ⇒ super.!(message)(s)
|
||||
case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (HE) : Should the RoutedActorRef also override "?"?
|
||||
// If not how then Broadcast messages cannot be sent via ? -
|
||||
// which it is in some test cases at the moment.
|
||||
}
|
||||
|
||||
trait RouterConfig extends Function0[Actor] {
|
||||
|
|
@ -134,6 +131,11 @@ trait RouterConfig extends Function0[Actor] {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Router {
|
||||
def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut)
|
||||
case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -163,7 +165,8 @@ object Routing {
|
|||
}
|
||||
}
|
||||
|
||||
type Route = (Any) ⇒ AnyRef
|
||||
case class Destination(sender: ActorRef, recipient: ActorRef)
|
||||
type Route = (ActorRef, Any) ⇒ Iterable[Destination]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -192,6 +195,22 @@ case object NoRouter extends RouterConfig {
|
|||
case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||
extends Router with RouterConfig {
|
||||
|
||||
/**
|
||||
* Constructor that sets nrOfInstances to be created.
|
||||
* Java API
|
||||
*/
|
||||
def this(nr: Int) = {
|
||||
this(nrOfInstances = nr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor that sets the targets to be used.
|
||||
* Java API
|
||||
*/
|
||||
def this(t: java.util.Collection[ActorRef]) = {
|
||||
this(targets = collectionAsScalaIterable(t))
|
||||
}
|
||||
|
||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||
deploy match {
|
||||
case Some(d) ⇒
|
||||
|
|
@ -208,11 +227,8 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
|
|||
}
|
||||
|
||||
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
|
||||
case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||
}
|
||||
val routees: Vector[ActorRef] =
|
||||
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||
|
||||
val next = new AtomicInteger(0)
|
||||
|
||||
|
|
@ -220,10 +236,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
|
|||
routees(next.getAndIncrement % routees.size)
|
||||
}
|
||||
|
||||
{
|
||||
case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled?
|
||||
case Broadcast(msg) ⇒ routees
|
||||
case msg ⇒ getNext()
|
||||
{ (sender, message) ⇒
|
||||
message match {
|
||||
case msg: AutoReceivedMessage ⇒ Nil
|
||||
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||
case msg ⇒ List(Destination(sender, getNext()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -242,6 +260,22 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
|
|||
case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||
extends Router with RouterConfig {
|
||||
|
||||
/**
|
||||
* Constructor that sets nrOfInstances to be created.
|
||||
* Java API
|
||||
*/
|
||||
def this(nr: Int) = {
|
||||
this(nrOfInstances = nr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor that sets the targets to be used.
|
||||
* Java API
|
||||
*/
|
||||
def this(t: java.util.Collection[ActorRef]) = {
|
||||
this(targets = collectionAsScalaIterable(t))
|
||||
}
|
||||
|
||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||
deploy match {
|
||||
case Some(d) ⇒
|
||||
|
|
@ -264,20 +298,19 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni
|
|||
}
|
||||
|
||||
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
|
||||
case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||
}
|
||||
val routees: Vector[ActorRef] =
|
||||
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||
|
||||
def getNext(): ActorRef = {
|
||||
routees(random.get.nextInt(routees.size))
|
||||
}
|
||||
|
||||
{
|
||||
case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled?
|
||||
case Broadcast(msg) ⇒ routees
|
||||
case msg ⇒ getNext()
|
||||
{ (sender, message) ⇒
|
||||
message match {
|
||||
case msg: AutoReceivedMessage ⇒ Nil
|
||||
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||
case msg ⇒ List(Destination(sender, getNext()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -296,6 +329,22 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni
|
|||
case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||
extends Router with RouterConfig {
|
||||
|
||||
/**
|
||||
* Constructor that sets nrOfInstances to be created.
|
||||
* Java API
|
||||
*/
|
||||
def this(nr: Int) = {
|
||||
this(nrOfInstances = nr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor that sets the targets to be used.
|
||||
* Java API
|
||||
*/
|
||||
def this(t: java.util.Collection[ActorRef]) = {
|
||||
this(targets = collectionAsScalaIterable(t))
|
||||
}
|
||||
|
||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||
deploy match {
|
||||
case Some(d) ⇒
|
||||
|
|
@ -312,29 +361,48 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] =
|
|||
}
|
||||
|
||||
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
|
||||
case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||
}
|
||||
val routees: Vector[ActorRef] =
|
||||
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||
|
||||
{
|
||||
case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled?
|
||||
case Broadcast(msg) ⇒ routees
|
||||
case msg ⇒ routees
|
||||
{ (sender, message) ⇒
|
||||
message match {
|
||||
case msg: AutoReceivedMessage ⇒ Nil
|
||||
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||
case msg ⇒ routees map (Destination(sender, _))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (HE) : Correct description below
|
||||
/**
|
||||
* Simple router that broadcasts the message to all connections, and replies with the first response.
|
||||
* Scatter-gather pattern will be applied only to the messages broadcast using Future
|
||||
* (wrapped into {@link Routing.Broadcast} and sent with "?" method).
|
||||
* For the messages sent in a fire-forget mode, the router would behave as {@link RoundRobinRouter}
|
||||
* Simple router that broadcasts the message to all routees, and replies with the first response.
|
||||
* <br>
|
||||
* Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
|
||||
* that the random router should both create new actors and use the 'targets' actor(s).
|
||||
* In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
|
||||
* <br>
|
||||
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
|
||||
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
|
||||
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
|
||||
*/
|
||||
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig {
|
||||
|
||||
/**
|
||||
* Constructor that sets nrOfInstances to be created.
|
||||
* Java API
|
||||
*/
|
||||
def this(nr: Int) = {
|
||||
this(nrOfInstances = nr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor that sets the targets to be used.
|
||||
* Java API
|
||||
*/
|
||||
def this(t: java.util.Collection[ActorRef]) = {
|
||||
this(targets = collectionAsScalaIterable(t))
|
||||
}
|
||||
|
||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||
deploy match {
|
||||
case Some(d) ⇒
|
||||
|
|
@ -357,28 +425,14 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It
|
|||
case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||
}
|
||||
|
||||
def scatterGather[S, G >: S](message: Any, t: Timeout): Future[G] = {
|
||||
val responses = routees.flatMap { actor ⇒
|
||||
try {
|
||||
if (actor.isTerminated) None else Some(actor.?(message, t).asInstanceOf[Future[S]])
|
||||
} catch {
|
||||
case e: Exception ⇒ None
|
||||
}
|
||||
{ (sender, message) ⇒
|
||||
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME!
|
||||
asker.result.pipeTo(sender)
|
||||
message match {
|
||||
case msg: AutoReceivedMessage ⇒ Nil
|
||||
case Broadcast(msg) ⇒ routees map (Destination(asker, _))
|
||||
case msg ⇒ routees map (Destination(asker, _))
|
||||
}
|
||||
|
||||
if (!responses.isEmpty) throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
|
||||
else {
|
||||
implicit val messageDispatcher = context.dispatcher
|
||||
implicit val timeout = t
|
||||
Future.firstCompletedOf(responses)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (HE) : Timeout and Future should be updated to new strategy - or hardcoded value below should at least be removed!
|
||||
{
|
||||
case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled?
|
||||
case Broadcast(msg) ⇒ routees
|
||||
case msg ⇒ scatterGather(msg, Timeout(Duration(5000, TimeUnit.MILLISECONDS)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -7,7 +7,6 @@ package akka.remote
|
|||
import akka.actor._
|
||||
import akka.actor.Status._
|
||||
import akka.event.Logging
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||
|
|
@ -161,7 +160,7 @@ class Gossiper(remote: Remote) {
|
|||
node ← oldAvailableNodes
|
||||
if connectionManager.connectionFor(node).isEmpty
|
||||
} {
|
||||
val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None)
|
||||
val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider.asInstanceOf[RemoteActorRefProvider], remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None)
|
||||
connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node
|
||||
oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes
|
||||
}
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ class RemoteActorRefProvider(
|
|||
|
||||
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
||||
|
||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
|
||||
def ask(within: Timeout): Option[AskActorRef] = local.ask(within)
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
|
|
@ -143,7 +143,7 @@ class RemoteActorRefProvider(
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] class RemoteActorRef private[akka] (
|
||||
provider: ActorRefProvider,
|
||||
provider: RemoteActorRefProvider,
|
||||
remote: RemoteSupport,
|
||||
val path: ActorPath,
|
||||
val getParent: InternalActorRef,
|
||||
|
|
@ -168,7 +168,16 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout)
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
provider.ask(timeout) match {
|
||||
case Some(a) ⇒
|
||||
this.!(message)(a)
|
||||
a.result
|
||||
case None ⇒
|
||||
this.!(message)(null)
|
||||
new DefaultPromise[Any](0)(provider.dispatcher)
|
||||
}
|
||||
}
|
||||
|
||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||
|
||||
|
|
|
|||
|
|
@ -149,5 +149,5 @@ class RemoteConnectionManager(
|
|||
}
|
||||
|
||||
private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) =
|
||||
new RemoteActorRef(remote.system.provider, remote.server, actorPath, Nobody, None)
|
||||
new RemoteActorRef(remote.system.provider.asInstanceOf[RemoteActorRefProvider], remote.server, actorPath, Nobody, None)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,13 +6,12 @@ package akka.tutorial.first.java;
|
|||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.InternalActorRef;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.japi.Creator;
|
||||
import akka.routing.*;
|
||||
import akka.routing.RoundRobinRouter;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Pi {
|
||||
|
|
@ -105,23 +104,8 @@ public class Pi {
|
|||
this.nrOfMessages = nrOfMessages;
|
||||
this.nrOfElements = nrOfElements;
|
||||
this.latch = latch;
|
||||
Creator<Router> routerCreator = new Creator<Router>() {
|
||||
public Router create() {
|
||||
// TODO (HE) : implement
|
||||
//return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1));
|
||||
return null;
|
||||
}
|
||||
};
|
||||
LinkedList<ActorRef> actors = new LinkedList<ActorRef>() {
|
||||
{
|
||||
for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class));
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME routers are intended to be used like this
|
||||
// TODO (HE): implement
|
||||
//RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true);
|
||||
//router = new RoutedActorRef(getContext().system(), props, (InternalActorRef) getSelf(), "pi");
|
||||
router = this.getContext().actorOf(new Props().withCreator(Worker.class).withRouting(new RoundRobinRouter(5)), "pi");
|
||||
}
|
||||
|
||||
// message handler
|
||||
|
|
@ -170,7 +154,7 @@ public class Pi {
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// create the master
|
||||
ActorRef master = system.actorOf(new UntypedActorFactory() {
|
||||
ActorRef master = system.actorOf(new akka.actor.UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,12 +53,8 @@ object Pi extends App {
|
|||
var nrOfResults: Int = _
|
||||
var start: Long = _
|
||||
|
||||
// create the workers
|
||||
val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
|
||||
|
||||
// create a round robin router for the workers
|
||||
val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
|
||||
//val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi")
|
||||
|
||||
// message handler
|
||||
def receive = {
|
||||
|
|
@ -91,7 +87,7 @@ object Pi extends App {
|
|||
// ===== Run it =====
|
||||
// ==================
|
||||
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
||||
val system = ActorSystem("PiSystem", ConfigFactory.load("akka.conf"))
|
||||
val system = ActorSystem()
|
||||
|
||||
// this latch is only plumbing to know when the calculation is completed
|
||||
val latch = new CountDownLatch(1)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue