Merge remote-tracking branch 'origin/1428-RoutedActorRef-henrikengstrom' into wip-remote-supervision-rk
This commit is contained in:
commit
d8bc57dc17
14 changed files with 434 additions and 750 deletions
|
|
@ -5,7 +5,6 @@
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.util.duration._
|
|
||||||
import DeploymentConfig._
|
import DeploymentConfig._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import com.typesafe.config.ConfigParseOptions
|
import com.typesafe.config.ConfigParseOptions
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,5 @@
|
||||||
package akka.routing
|
package akka.routing
|
||||||
|
|
||||||
import akka.routing._
|
|
||||||
import akka.config.ConfigurationException
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import collection.mutable.LinkedList
|
import collection.mutable.LinkedList
|
||||||
|
|
@ -17,6 +15,7 @@ object RoutingSpec {
|
||||||
println("Hello")
|
println("Hello")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -26,30 +25,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
|
|
||||||
import akka.routing.RoutingSpec._
|
import akka.routing.RoutingSpec._
|
||||||
|
|
||||||
// TODO (HE) : Update test with new routing functionality
|
"no router" must {
|
||||||
/*
|
|
||||||
"direct router" must {
|
|
||||||
"be started when constructed" in {
|
"be started when constructed" in {
|
||||||
val actor1 = system.actorOf[TestActor]
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(NoRouter))
|
||||||
|
routedActor.isTerminated must be(false)
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
actor.isTerminated must be(false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"send message to connection" in {
|
"send message to connection" in {
|
||||||
val doneLatch = new CountDownLatch(1)
|
val doneLatch = new CountDownLatch(1)
|
||||||
|
|
||||||
val counter = new AtomicInteger(0)
|
val counter = new AtomicInteger(0)
|
||||||
val connection1 = system.actorOf(new Actor {
|
|
||||||
|
class Actor1 extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case _ ⇒ counter.incrementAndGet
|
case _ ⇒ counter.incrementAndGet
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1)))
|
val routedActor = system.actorOf(Props(new Actor1).withRouting(NoRouter))
|
||||||
val routedActor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
routedActor ! "hello"
|
routedActor ! "hello"
|
||||||
routedActor ! "end"
|
routedActor ! "end"
|
||||||
|
|
||||||
|
|
@ -57,38 +51,12 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
|
|
||||||
counter.get must be(1)
|
counter.get must be(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
"deliver a broadcast message" in {
|
|
||||||
val doneLatch = new CountDownLatch(1)
|
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
|
||||||
val connection1 = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(1)
|
|
||||||
actor ! "end"
|
|
||||||
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
|
||||||
|
|
||||||
counter1.get must be(1)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"round robin router" must {
|
"round robin router" must {
|
||||||
|
|
||||||
"be started when constructed" in {
|
"be started when constructed" in {
|
||||||
val actor1 = system.actorOf[TestActor]
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(nrOfInstances = 1)))
|
||||||
|
routedActor.isTerminated must be(false)
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
actor.isTerminated must be(false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//In this test a bunch of actors are created and each actor has its own counter.
|
//In this test a bunch of actors are created and each actor has its own counter.
|
||||||
|
|
@ -101,32 +69,30 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
val doneLatch = new CountDownLatch(connectionCount)
|
val doneLatch = new CountDownLatch(connectionCount)
|
||||||
|
|
||||||
//lets create some connections.
|
//lets create some connections.
|
||||||
var connections = new LinkedList[ActorRef]
|
var actors = new LinkedList[ActorRef]
|
||||||
var counters = new LinkedList[AtomicInteger]
|
var counters = new LinkedList[AtomicInteger]
|
||||||
for (i ← 0 until connectionCount) {
|
for (i ← 0 until connectionCount) {
|
||||||
counters = counters :+ new AtomicInteger()
|
counters = counters :+ new AtomicInteger()
|
||||||
|
|
||||||
val connection = system.actorOf(new Actor {
|
val actor = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
connections = connections :+ connection
|
actors = actors :+ actor
|
||||||
}
|
}
|
||||||
|
|
||||||
//create the routed actor.
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = actors)))
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
//send messages to the actor.
|
//send messages to the actor.
|
||||||
for (i ← 0 until iterationCount) {
|
for (i ← 0 until iterationCount) {
|
||||||
for (k ← 0 until connectionCount) {
|
for (k ← 0 until connectionCount) {
|
||||||
actor ! (k + 1)
|
routedActor ! (k + 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
actor ! Broadcast("end")
|
routedActor ! Broadcast("end")
|
||||||
//now wait some and do validations.
|
//now wait some and do validations.
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||||
|
|
||||||
|
|
@ -140,7 +106,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
val doneLatch = new CountDownLatch(2)
|
val doneLatch = new CountDownLatch(2)
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
val counter1 = new AtomicInteger
|
||||||
val connection1 = system.actorOf(new Actor {
|
val actor1 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
||||||
|
|
@ -148,63 +114,37 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
})
|
})
|
||||||
|
|
||||||
val counter2 = new AtomicInteger
|
val counter2 = new AtomicInteger
|
||||||
val connection2 = system.actorOf(new Actor {
|
val actor2 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(actor1, actor2))))
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(1)
|
routedActor ! Broadcast(1)
|
||||||
actor ! Broadcast("end")
|
routedActor ! Broadcast("end")
|
||||||
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||||
|
|
||||||
counter1.get must be(1)
|
counter1.get must be(1)
|
||||||
counter2.get must be(1)
|
counter2.get must be(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail to deliver a broadcast message using the ?" in {
|
|
||||||
val doneLatch = new CountDownLatch(1)
|
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
|
||||||
val connection1 = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case _ ⇒ counter1.incrementAndGet()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
intercept[RoutingException] { actor ? Broadcast(1) }
|
|
||||||
|
|
||||||
actor ! "end"
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
|
||||||
counter1.get must be(0)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"random router" must {
|
"random router" must {
|
||||||
|
|
||||||
"be started when constructed" in {
|
"be started when constructed" in {
|
||||||
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(nrOfInstances = 1)))
|
||||||
val actor1 = system.actorOf[TestActor]
|
routedActor.isTerminated must be(false)
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
actor.isTerminated must be(false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"deliver a broadcast message" in {
|
"deliver a broadcast message" in {
|
||||||
val doneLatch = new CountDownLatch(2)
|
val doneLatch = new CountDownLatch(2)
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
val counter1 = new AtomicInteger
|
||||||
val connection1 = system.actorOf(new Actor {
|
val actor1 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
||||||
|
|
@ -212,214 +152,36 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
})
|
})
|
||||||
|
|
||||||
val counter2 = new AtomicInteger
|
val counter2 = new AtomicInteger
|
||||||
val connection2 = system.actorOf(new Actor {
|
val actor2 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(targets = List(actor1, actor2))))
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(1)
|
routedActor ! Broadcast(1)
|
||||||
actor ! Broadcast("end")
|
routedActor ! Broadcast("end")
|
||||||
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||||
|
|
||||||
counter1.get must be(1)
|
counter1.get must be(1)
|
||||||
counter2.get must be(1)
|
counter2.get must be(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail to deliver a broadcast message using the ?" in {
|
|
||||||
val doneLatch = new CountDownLatch(1)
|
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
|
||||||
val connection1 = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case _ ⇒ counter1.incrementAndGet()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
try {
|
|
||||||
actor ? Broadcast(1)
|
|
||||||
fail()
|
|
||||||
} catch {
|
|
||||||
case e: RoutingException ⇒
|
|
||||||
}
|
|
||||||
|
|
||||||
actor ! "end"
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
|
||||||
counter1.get must be(0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"Scatter-gather router" must {
|
|
||||||
|
|
||||||
"return response, even if one of the connections has stopped" in {
|
|
||||||
|
|
||||||
val shutdownLatch = new TestLatch(1)
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(Stop(Some(0)))
|
|
||||||
|
|
||||||
shutdownLatch.await
|
|
||||||
|
|
||||||
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
"throw an exception, if all the connections have stopped" in {
|
|
||||||
|
|
||||||
val shutdownLatch = new TestLatch(2)
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(Stop())
|
|
||||||
|
|
||||||
shutdownLatch.await
|
|
||||||
|
|
||||||
(intercept[RoutingException] {
|
|
||||||
actor ? Broadcast(0)
|
|
||||||
}) must not be (null)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
"return the first response from connections, when all of them replied" in {
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
"return the first response from connections, when some of them failed to reply" in {
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1))))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
"be started when constructed" in {
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0))))
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor.isTerminated must be(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
"deliver one-way messages in a round robin fashion" in {
|
|
||||||
val connectionCount = 10
|
|
||||||
val iterationCount = 10
|
|
||||||
val doneLatch = new TestLatch(connectionCount)
|
|
||||||
|
|
||||||
var connections = new LinkedList[ActorRef]
|
|
||||||
var counters = new LinkedList[AtomicInteger]
|
|
||||||
for (i ← 0 until connectionCount) {
|
|
||||||
counters = counters :+ new AtomicInteger()
|
|
||||||
|
|
||||||
val connection = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case msg: Int ⇒ counters.get(i).get.addAndGet(msg)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
connections = connections :+ connection
|
|
||||||
}
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
for (i ← 0 until iterationCount) {
|
|
||||||
for (k ← 0 until connectionCount) {
|
|
||||||
actor ! (k + 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
actor ! Broadcast("end")
|
|
||||||
|
|
||||||
doneLatch.await
|
|
||||||
|
|
||||||
for (i ← 0 until connectionCount) {
|
|
||||||
val counter = counters.get(i).get
|
|
||||||
counter.get must be((iterationCount * (i + 1)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
"deliver a broadcast message using the !" in {
|
|
||||||
val doneLatch = new TestLatch(2)
|
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
|
||||||
val connection1 = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
val counter2 = new AtomicInteger
|
|
||||||
val connection2 = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case "end" ⇒ doneLatch.countDown()
|
|
||||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
|
||||||
|
|
||||||
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
|
|
||||||
|
|
||||||
actor ! Broadcast(1)
|
|
||||||
actor ! Broadcast("end")
|
|
||||||
|
|
||||||
doneLatch.await
|
|
||||||
|
|
||||||
counter1.get must be(1)
|
|
||||||
counter2.get must be(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class Stop(id: Option[Int] = None)
|
|
||||||
|
|
||||||
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor {
|
|
||||||
def receive = {
|
|
||||||
case Stop(None) ⇒ self.stop()
|
|
||||||
case Stop(Some(_id)) if (_id == id) ⇒ self.stop()
|
|
||||||
case _id: Int if (_id == id) ⇒
|
|
||||||
case _ ⇒ Thread sleep 100 * id; sender.tell(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def postStop = {
|
|
||||||
shudownLatch foreach (_.countDown())
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"broadcast router" must {
|
"broadcast router" must {
|
||||||
|
|
||||||
"be started when constructed" in {
|
"be started when constructed" in {
|
||||||
val actor1 = system.actorOf[TestActor]
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(nrOfInstances = 1)))
|
||||||
|
routedActor.isTerminated must be(false)
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(actor1)))
|
|
||||||
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
|
|
||||||
actor.isTerminated must be(false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"broadcast message using !" in {
|
"broadcast message using !" in {
|
||||||
val doneLatch = new CountDownLatch(2)
|
val doneLatch = new CountDownLatch(2)
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
val counter1 = new AtomicInteger
|
||||||
val connection1 = system.actorOf(new Actor {
|
val actor1 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter1.addAndGet(msg)
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
||||||
|
|
@ -427,18 +189,16 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
})
|
})
|
||||||
|
|
||||||
val counter2 = new AtomicInteger
|
val counter2 = new AtomicInteger
|
||||||
val connection2 = system.actorOf(new Actor {
|
val actor2 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2))))
|
||||||
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
|
routedActor ! 1
|
||||||
|
routedActor ! "end"
|
||||||
actor ! 1
|
|
||||||
actor ! "end"
|
|
||||||
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||||
|
|
||||||
|
|
@ -450,7 +210,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
val doneLatch = new CountDownLatch(2)
|
val doneLatch = new CountDownLatch(2)
|
||||||
|
|
||||||
val counter1 = new AtomicInteger
|
val counter1 = new AtomicInteger
|
||||||
val connection1 = system.actorOf(new Actor {
|
val actor1 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒
|
case msg: Int ⇒
|
||||||
|
|
@ -460,18 +220,16 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
})
|
})
|
||||||
|
|
||||||
val counter2 = new AtomicInteger
|
val counter2 = new AtomicInteger
|
||||||
val connection2 = system.actorOf(new Actor {
|
val actor2 = system.actorOf(new Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "end" ⇒ doneLatch.countDown()
|
case "end" ⇒ doneLatch.countDown()
|
||||||
case msg: Int ⇒ counter2.addAndGet(msg)
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2)))
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2))))
|
||||||
val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo")
|
routedActor ? 1
|
||||||
|
routedActor ! "end"
|
||||||
actor ? 1
|
|
||||||
actor ! "end"
|
|
||||||
|
|
||||||
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
|
||||||
|
|
||||||
|
|
@ -479,5 +237,70 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
|
||||||
counter2.get must be(1)
|
counter2.get must be(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
"Scatter-gather router" must {
|
||||||
|
|
||||||
|
"be started when constructed" in {
|
||||||
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(newActor(0)))))
|
||||||
|
routedActor.isTerminated must be(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"deliver a broadcast message using the !" in {
|
||||||
|
val doneLatch = new TestLatch(2)
|
||||||
|
|
||||||
|
val counter1 = new AtomicInteger
|
||||||
|
val actor1 = system.actorOf(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case "end" ⇒ doneLatch.countDown()
|
||||||
|
case msg: Int ⇒ counter1.addAndGet(msg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
val counter2 = new AtomicInteger
|
||||||
|
val actor2 = system.actorOf(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case "end" ⇒ doneLatch.countDown()
|
||||||
|
case msg: Int ⇒ counter2.addAndGet(msg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
|
||||||
|
routedActor ! Broadcast(1)
|
||||||
|
routedActor ! Broadcast("end")
|
||||||
|
|
||||||
|
doneLatch.await
|
||||||
|
|
||||||
|
counter1.get must be(1)
|
||||||
|
counter2.get must be(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
"return response, even if one of the actors has stopped" in {
|
||||||
|
val shutdownLatch = new TestLatch(1)
|
||||||
|
val actor1 = newActor(1, Some(shutdownLatch))
|
||||||
|
val actor2 = newActor(22, Some(shutdownLatch))
|
||||||
|
val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2))))
|
||||||
|
|
||||||
|
routedActor ! Broadcast(Stop(Some(1)))
|
||||||
|
shutdownLatch.await
|
||||||
|
(routedActor ? Broadcast(0)).as[Int].get must be(22)
|
||||||
|
}
|
||||||
|
|
||||||
|
case class Stop(id: Option[Int] = None)
|
||||||
|
|
||||||
|
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
|
||||||
|
def receive = {
|
||||||
|
case Stop(None) ⇒ self.stop()
|
||||||
|
case Stop(Some(_id)) if (_id == id) ⇒ self.stop()
|
||||||
|
case _id: Int if (_id == id) ⇒
|
||||||
|
case x ⇒ {
|
||||||
|
Thread sleep 100 * id
|
||||||
|
sender.tell(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def postStop = {
|
||||||
|
shudownLatch foreach (_.countDown())
|
||||||
|
}
|
||||||
|
}), "Actor:" + id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -295,7 +295,16 @@ private[akka] class LocalActorRef private[akka] (
|
||||||
|
|
||||||
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
||||||
|
|
||||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout)
|
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||||
|
actorCell.provider.ask(timeout) match {
|
||||||
|
case Some(a) ⇒
|
||||||
|
this.!(message)(a)
|
||||||
|
a.result
|
||||||
|
case None ⇒
|
||||||
|
this.!(message)(null)
|
||||||
|
new DefaultPromise[Any](0)(actorCell.system.dispatcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,21 +5,13 @@
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
|
|
||||||
import scala.annotation.tailrec
|
|
||||||
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
|
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
|
||||||
import akka.actor.Timeout.intToTimeout
|
|
||||||
import akka.config.ConfigurationException
|
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.util.{ Duration, Switch, Helpers }
|
import akka.util.{ Duration, Switch, Helpers }
|
||||||
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
|
|
||||||
import akka.event._
|
import akka.event._
|
||||||
import akka.event.Logging.Error._
|
|
||||||
import akka.event.Logging.Warning
|
|
||||||
import java.io.Closeable
|
import java.io.Closeable
|
||||||
import com.typesafe.config.Config
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for all ActorRef providers to implement.
|
* Interface for all ActorRef providers to implement.
|
||||||
|
|
@ -105,9 +97,10 @@ trait ActorRefProvider {
|
||||||
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
|
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create AskActorRef to hook up message send to recipient with Future receiver.
|
* Create AskActorRef and register it properly so it can be serialized/deserialized;
|
||||||
|
* caller needs to send the message.
|
||||||
*/
|
*/
|
||||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
|
def ask(within: Timeout): Option[AskActorRef]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This Future is completed upon termination of this ActorRefProvider, which
|
* This Future is completed upon termination of this ActorRefProvider, which
|
||||||
|
|
@ -527,22 +520,20 @@ class LocalActorRefProvider(
|
||||||
|
|
||||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = {
|
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = {
|
||||||
props.routerConfig match {
|
props.routerConfig match {
|
||||||
case NoRouting ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
|
case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
|
||||||
case routedActor ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path)
|
case routedActor ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
|
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
|
||||||
val lookupPath = p.elements.mkString("/", "/", "")
|
val lookupPath = p.elements.mkString("/", "/", "")
|
||||||
val deploy = deployer.lookup(lookupPath)
|
r.adaptFromDeploy(deployer.lookup(lookupPath))
|
||||||
r.adaptFromDeploy(deploy)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
|
def ask(within: Timeout): Option[AskActorRef] = {
|
||||||
import akka.dispatch.DefaultPromise
|
|
||||||
(if (within == null) settings.ActorTimeout else within) match {
|
(if (within == null) settings.ActorTimeout else within) match {
|
||||||
case t if t.duration.length <= 0 ⇒
|
case t if t.duration.length <= 0 ⇒
|
||||||
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
|
None
|
||||||
case t ⇒
|
case t ⇒
|
||||||
val path = tempPath()
|
val path = tempPath()
|
||||||
val name = path.name
|
val name = path.name
|
||||||
|
|
@ -552,8 +543,7 @@ class LocalActorRefProvider(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tempContainer.addChild(name, a)
|
tempContainer.addChild(name, a)
|
||||||
recipient.tell(message, a)
|
Some(a)
|
||||||
a.result
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -156,7 +156,7 @@ object DeploymentConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
def routerTypeFor(routing: Routing): RouterType = routing match {
|
def routerTypeFor(routing: Routing): RouterType = routing match {
|
||||||
case _: NoRouting | NoRouting ⇒ RouterType.NoRouting
|
case _: NoRouting | NoRouting ⇒ RouterType.NoRouter
|
||||||
case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin
|
case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin
|
||||||
case _: Random | Random ⇒ RouterType.Random
|
case _: Random | Random ⇒ RouterType.Random
|
||||||
case _: ScatterGather | ScatterGather ⇒ RouterType.ScatterGather
|
case _: ScatterGather | ScatterGather ⇒ RouterType.ScatterGather
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import akka.dispatch._
|
||||||
import akka.japi.Creator
|
import akka.japi.Creator
|
||||||
import akka.util._
|
import akka.util._
|
||||||
import collection.immutable.Stack
|
import collection.immutable.Stack
|
||||||
import akka.routing.{ NoRouting, RouterConfig, RoutedProps }
|
import akka.routing.{ NoRouter, RouterConfig }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ActorRef configuration object, this is threadsafe and fully sharable
|
* ActorRef configuration object, this is threadsafe and fully sharable
|
||||||
|
|
@ -29,7 +29,7 @@ object Props {
|
||||||
case _ ⇒ Escalate
|
case _ ⇒ Escalate
|
||||||
}
|
}
|
||||||
|
|
||||||
final val defaultRoutedProps: RouterConfig = NoRouting
|
final val defaultRoutedProps: RouterConfig = NoRouter
|
||||||
|
|
||||||
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)
|
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,12 +7,15 @@ package akka.routing
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
|
||||||
import akka.japi.Creator
|
import akka.japi.Creator
|
||||||
import akka.util.ReflectiveAccess
|
|
||||||
import java.lang.reflect.InvocationTargetException
|
import java.lang.reflect.InvocationTargetException
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.routing.Routing.Broadcast
|
|
||||||
import akka.actor.DeploymentConfig.Deploy
|
import akka.actor.DeploymentConfig.Deploy
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import akka.util.ReflectiveAccess
|
||||||
|
import akka.AkkaException
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
import akka.routing.Routing.{ Destination, Broadcast }
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
sealed trait RouterType
|
sealed trait RouterType
|
||||||
|
|
||||||
|
|
@ -26,7 +29,7 @@ object RouterType {
|
||||||
/**
|
/**
|
||||||
* A RouterType that indicates no routing - i.e. direct message.
|
* A RouterType that indicates no routing - i.e. direct message.
|
||||||
*/
|
*/
|
||||||
object NoRouting extends RouterType
|
object NoRouter extends RouterType
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RouterType that randomly selects a connection to send a message to.
|
* A RouterType that randomly selects a connection to send a message to.
|
||||||
|
|
@ -38,16 +41,16 @@ object RouterType {
|
||||||
*/
|
*/
|
||||||
object RoundRobin extends RouterType
|
object RoundRobin extends RouterType
|
||||||
|
|
||||||
/**
|
|
||||||
* A RouterType that selects the connection by using scatter gather.
|
|
||||||
*/
|
|
||||||
object ScatterGather extends RouterType
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RouterType that broadcasts the messages to all connections.
|
* A RouterType that broadcasts the messages to all connections.
|
||||||
*/
|
*/
|
||||||
object Broadcast extends RouterType
|
object Broadcast extends RouterType
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A RouterType that selects the connection by using scatter gather.
|
||||||
|
*/
|
||||||
|
object ScatterGather extends RouterType
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RouterType that selects the connection based on the least amount of cpu usage
|
* A RouterType that selects the connection based on the least amount of cpu usage
|
||||||
*/
|
*/
|
||||||
|
|
@ -67,9 +70,13 @@ object RouterType {
|
||||||
* A user-defined custom RouterType.
|
* A user-defined custom RouterType.
|
||||||
*/
|
*/
|
||||||
case class Custom(implClass: String) extends RouterType
|
case class Custom(implClass: String) extends RouterType
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
||||||
|
*/
|
||||||
|
class RoutingException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains the configuration to create local and clustered routed actor references.
|
* Contains the configuration to create local and clustered routed actor references.
|
||||||
* Routed ActorRef configuration object, this is thread safe and fully sharable.
|
* Routed ActorRef configuration object, this is thread safe and fully sharable.
|
||||||
|
|
@ -84,51 +91,9 @@ case class RoutedProps private[akka] (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///**
|
|
||||||
// * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
|
||||||
// * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
|
|
||||||
// *
|
|
||||||
// * @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
// */
|
|
||||||
//trait Router {
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
|
|
||||||
// * the current connections, signal that there were problems with one of the connections and see if there have
|
|
||||||
// * been changes in the connections.
|
|
||||||
// *
|
|
||||||
// * This method is not threadsafe, and should only be called once
|
|
||||||
// *
|
|
||||||
// * JMM Guarantees:
|
|
||||||
// * This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
|
|
||||||
// */
|
|
||||||
// def init(connectionManager: ConnectionManager)
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Routes the message to one of the connections.
|
|
||||||
// *
|
|
||||||
// * @throws RoutingException if something goes wrong while routing the message
|
|
||||||
// */
|
|
||||||
// def route(message: Any)(implicit sender: ActorRef)
|
|
||||||
//
|
|
||||||
// /**
|
|
||||||
// * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
|
|
||||||
// * completion of the processing of the message.
|
|
||||||
// *
|
|
||||||
// * @throws RoutingExceptionif something goes wrong while routing the message.
|
|
||||||
// */
|
|
||||||
// def route[T](message: Any, timeout: Timeout): Future[T]
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * An {@link AkkaException} thrown when something goes wrong while routing a message
|
|
||||||
// */
|
|
||||||
//class RoutingException(message: String) extends AkkaException(message)
|
|
||||||
//
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
|
||||||
* on (or more) of these actors.
|
* send a message to on (or more) of these actors.
|
||||||
*/
|
*/
|
||||||
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
|
private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath)
|
||||||
extends LocalActorRef(
|
extends LocalActorRef(
|
||||||
|
|
@ -140,40 +105,37 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
|
||||||
val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext)
|
val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext)
|
||||||
|
|
||||||
override def !(message: Any)(implicit sender: ActorRef = null) {
|
override def !(message: Any)(implicit sender: ActorRef = null) {
|
||||||
route(message) match {
|
val s = if (sender eq null) underlying.system.deadLetters else sender
|
||||||
case null ⇒ super.!(message)(sender)
|
|
||||||
case ref: ActorRef ⇒ ref.!(message)(sender)
|
val msg = message match {
|
||||||
case refs: Traversable[ActorRef] ⇒ refs foreach (_.!(message)(sender))
|
case Broadcast(m) ⇒ m
|
||||||
|
case m ⇒ m
|
||||||
|
}
|
||||||
|
|
||||||
|
route(s, message) match {
|
||||||
|
case Nil ⇒ super.!(message)(s)
|
||||||
|
case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait RouterConfig extends Function0[Actor] {
|
trait RouterConfig extends Function0[Actor] {
|
||||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig
|
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig
|
||||||
|
|
||||||
def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Routing.Route
|
def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Routing.Route
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Routing configuration that indicates no routing.
|
* A Router is responsible for sending a message to one (or more) of its connections.
|
||||||
* Oxymoron style.
|
|
||||||
*/
|
|
||||||
case object NoRouting extends RouterConfig {
|
|
||||||
|
|
||||||
def adaptFromDeploy(deploy: Option[Deploy]) = null
|
|
||||||
|
|
||||||
def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null
|
|
||||||
|
|
||||||
def apply(): Actor = null
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
|
||||||
* {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
|
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
trait Router {
|
trait Router {
|
||||||
// TODO (HE): implement failure detection
|
def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||||
|
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||||
|
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut)
|
||||||
|
case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -203,7 +165,20 @@ object Routing {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Route = (Any) ⇒ AnyRef
|
case class Destination(sender: ActorRef, recipient: ActorRef)
|
||||||
|
type Route = (ActorRef, Any) ⇒ Iterable[Destination]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Routing configuration that indicates no routing.
|
||||||
|
* Oxymoron style.
|
||||||
|
*/
|
||||||
|
case object NoRouter extends RouterConfig {
|
||||||
|
def adaptFromDeploy(deploy: Option[Deploy]) = null
|
||||||
|
|
||||||
|
def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null
|
||||||
|
|
||||||
|
def apply(): Actor = null
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -220,10 +195,220 @@ object Routing {
|
||||||
case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||||
extends Router with RouterConfig {
|
extends Router with RouterConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets nrOfInstances to be created.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(nr: Int) = {
|
||||||
|
this(nrOfInstances = nr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets the targets to be used.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(t: java.util.Collection[ActorRef]) = {
|
||||||
|
this(targets = collectionAsScalaIterable(t))
|
||||||
|
}
|
||||||
|
|
||||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||||
deploy match {
|
deploy match {
|
||||||
case Some(d) ⇒ copy(nrOfInstances = d.nrOfInstances.factor)
|
case Some(d) ⇒
|
||||||
case _ ⇒ this
|
// In case there is a config then use this over any programmed settings.
|
||||||
|
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
|
||||||
|
case _ ⇒ this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def apply(): Actor = new Actor {
|
||||||
|
def receive = {
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||||
|
val routees: Vector[ActorRef] =
|
||||||
|
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||||
|
|
||||||
|
val next = new AtomicInteger(0)
|
||||||
|
|
||||||
|
def getNext(): ActorRef = {
|
||||||
|
routees(next.getAndIncrement % routees.size)
|
||||||
|
}
|
||||||
|
|
||||||
|
{ (sender, message) ⇒
|
||||||
|
message match {
|
||||||
|
case msg: AutoReceivedMessage ⇒ Nil
|
||||||
|
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||||
|
case msg ⇒ List(Destination(sender, getNext()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Router that randomly selects one of the target connections to send a message to.
|
||||||
|
* <br>
|
||||||
|
* Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
|
||||||
|
* that the random router should both create new actors and use the 'targets' actor(s).
|
||||||
|
* In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
|
||||||
|
* <br>
|
||||||
|
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
|
||||||
|
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
|
||||||
|
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
|
||||||
|
*/
|
||||||
|
case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||||
|
extends Router with RouterConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets nrOfInstances to be created.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(nr: Int) = {
|
||||||
|
this(nrOfInstances = nr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets the targets to be used.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(t: java.util.Collection[ActorRef]) = {
|
||||||
|
this(targets = collectionAsScalaIterable(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||||
|
deploy match {
|
||||||
|
case Some(d) ⇒
|
||||||
|
// In case there is a config then use this over any programmed settings.
|
||||||
|
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
|
||||||
|
case _ ⇒ this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def apply(): Actor = new Actor {
|
||||||
|
def receive = {
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
import java.security.SecureRandom
|
||||||
|
|
||||||
|
private val random = new ThreadLocal[SecureRandom] {
|
||||||
|
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
|
||||||
|
}
|
||||||
|
|
||||||
|
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||||
|
val routees: Vector[ActorRef] =
|
||||||
|
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||||
|
|
||||||
|
def getNext(): ActorRef = {
|
||||||
|
routees(random.get.nextInt(routees.size))
|
||||||
|
}
|
||||||
|
|
||||||
|
{ (sender, message) ⇒
|
||||||
|
message match {
|
||||||
|
case msg: AutoReceivedMessage ⇒ Nil
|
||||||
|
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||||
|
case msg ⇒ List(Destination(sender, getNext()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Router that uses broadcasts a message to all its connections.
|
||||||
|
* <br>
|
||||||
|
* Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
|
||||||
|
* that the random router should both create new actors and use the 'targets' actor(s).
|
||||||
|
* In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
|
||||||
|
* <br>
|
||||||
|
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
|
||||||
|
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
|
||||||
|
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
|
||||||
|
*/
|
||||||
|
case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil)
|
||||||
|
extends Router with RouterConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets nrOfInstances to be created.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(nr: Int) = {
|
||||||
|
this(nrOfInstances = nr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets the targets to be used.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(t: java.util.Collection[ActorRef]) = {
|
||||||
|
this(targets = collectionAsScalaIterable(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||||
|
deploy match {
|
||||||
|
case Some(d) ⇒
|
||||||
|
// In case there is a config then use this over any programmed settings.
|
||||||
|
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
|
||||||
|
case _ ⇒ this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def apply(): Actor = new Actor {
|
||||||
|
def receive = {
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||||
|
val routees: Vector[ActorRef] =
|
||||||
|
createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets)
|
||||||
|
|
||||||
|
{ (sender, message) ⇒
|
||||||
|
message match {
|
||||||
|
case msg: AutoReceivedMessage ⇒ Nil
|
||||||
|
case Broadcast(msg) ⇒ routees map (Destination(sender, _))
|
||||||
|
case msg ⇒ routees map (Destination(sender, _))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple router that broadcasts the message to all routees, and replies with the first response.
|
||||||
|
* <br>
|
||||||
|
* Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means
|
||||||
|
* that the random router should both create new actors and use the 'targets' actor(s).
|
||||||
|
* In this case the 'nrOfInstances' will be ignored and the 'targets' will be used.
|
||||||
|
* <br>
|
||||||
|
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
|
||||||
|
* if you provide either 'nrOfInstances' or 'targets' to during instantiation they will
|
||||||
|
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
|
||||||
|
*/
|
||||||
|
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets nrOfInstances to be created.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(nr: Int) = {
|
||||||
|
this(nrOfInstances = nr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that sets the targets to be used.
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def this(t: java.util.Collection[ActorRef]) = {
|
||||||
|
this(targets = collectionAsScalaIterable(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||||
|
deploy match {
|
||||||
|
case Some(d) ⇒
|
||||||
|
// In case there is a config then use this over any programmed settings.
|
||||||
|
copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil)
|
||||||
|
case _ ⇒ this
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -236,307 +421,18 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef]
|
||||||
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = {
|
||||||
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
|
val routees: Vector[ActorRef] = (nrOfInstances, targets) match {
|
||||||
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.")
|
||||||
case (x, Nil) ⇒
|
case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut)
|
||||||
println("----> 0, Nil")
|
case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs
|
||||||
(1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouting)))(scala.collection.breakOut)
|
|
||||||
case (x, xs) ⇒
|
|
||||||
println("----> x, xs")
|
|
||||||
Vector.empty[ActorRef] ++ xs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val next = new AtomicInteger(0)
|
{ (sender, message) ⇒
|
||||||
|
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME!
|
||||||
def getNext(): ActorRef = {
|
asker.result.pipeTo(sender)
|
||||||
routees(next.getAndIncrement % routees.size)
|
message match {
|
||||||
}
|
case msg: AutoReceivedMessage ⇒ Nil
|
||||||
|
case Broadcast(msg) ⇒ routees map (Destination(asker, _))
|
||||||
{
|
case msg ⇒ routees map (Destination(asker, _))
|
||||||
case _: AutoReceivedMessage ⇒ null //TODO: handle system specific messages
|
}
|
||||||
case Broadcast(msg) ⇒ routees
|
|
||||||
case msg ⇒ getNext()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
/*
|
|
||||||
private val state = new AtomicReference[RoundRobinState]
|
|
||||||
|
|
||||||
def next: Option[ActorRef] = currentState.next
|
|
||||||
|
|
||||||
@tailrec
|
|
||||||
private def currentState: RoundRobinState = {
|
|
||||||
val current = state.get
|
|
||||||
|
|
||||||
if (current != null && current.version == connectionManager.version) {
|
|
||||||
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
||||||
current
|
|
||||||
} else {
|
|
||||||
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
|
||||||
|
|
||||||
val connections = connectionManager.connections
|
|
||||||
val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version)
|
|
||||||
if (state.compareAndSet(current, newState))
|
|
||||||
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
||||||
newState
|
|
||||||
else //we failed to update the state, lets try again... better luck next time.
|
|
||||||
currentState
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
|
|
||||||
|
|
||||||
private val index = new AtomicInteger(0)
|
|
||||||
|
|
||||||
def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex))
|
|
||||||
|
|
||||||
@tailrec
|
|
||||||
private def nextIndex: Int = {
|
|
||||||
val oldIndex = index.get
|
|
||||||
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
|
|
||||||
|
|
||||||
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex
|
|
||||||
else oldIndex
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
///**
|
|
||||||
// * An Abstract Router implementation that already provides the basic infrastructure so that a concrete
|
|
||||||
// * Router only needs to implement the next method.
|
|
||||||
// */
|
|
||||||
//trait BasicRouter extends Router {
|
|
||||||
//
|
|
||||||
// @volatile
|
|
||||||
// protected var connectionManager: ConnectionManager = _
|
|
||||||
//
|
|
||||||
// def init(connectionManager: ConnectionManager) = {
|
|
||||||
// this.connectionManager = connectionManager
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// def route(message: Any)(implicit sender: ActorRef) = message match {
|
|
||||||
// case Routing.Broadcast(message) ⇒
|
|
||||||
//
|
|
||||||
// //it is a broadcast message, we are going to send to message to all connections.
|
|
||||||
// connectionManager.connections.iterable foreach {
|
|
||||||
// connection ⇒
|
|
||||||
// try {
|
|
||||||
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
|
|
||||||
// } catch {
|
|
||||||
// case e: Exception ⇒
|
|
||||||
// connectionManager.remove(connection)
|
|
||||||
// throw e
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// case _ ⇒
|
|
||||||
// //it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
||||||
// next match {
|
|
||||||
// case Some(connection) ⇒
|
|
||||||
// try {
|
|
||||||
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
|
|
||||||
// } catch {
|
|
||||||
// case e: Exception ⇒
|
|
||||||
// connectionManager.remove(connection)
|
|
||||||
// throw e
|
|
||||||
// }
|
|
||||||
// case None ⇒
|
|
||||||
// throwNoConnectionsError
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// def route[T](message: Any, timeout: Timeout): Future[T] = message match {
|
|
||||||
// case Routing.Broadcast(message) ⇒
|
|
||||||
// throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
|
|
||||||
// case _ ⇒
|
|
||||||
// //it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
||||||
// next match {
|
|
||||||
// case Some(connection) ⇒
|
|
||||||
// try {
|
|
||||||
// connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it??
|
|
||||||
// } catch {
|
|
||||||
// case e: Exception ⇒
|
|
||||||
// connectionManager.remove(connection)
|
|
||||||
// throw e
|
|
||||||
// }
|
|
||||||
// case None ⇒
|
|
||||||
// throwNoConnectionsError
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// protected def next: Option[ActorRef]
|
|
||||||
//
|
|
||||||
// private def throwNoConnectionsError = throw new RoutingException("No replica connections for router")
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * A Router that uses broadcasts a message to all its connections.
|
|
||||||
// */
|
|
||||||
//class BroadcastRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter with Serializable {
|
|
||||||
// override def route(message: Any)(implicit sender: ActorRef) = {
|
|
||||||
// connectionManager.connections.iterable foreach {
|
|
||||||
// connection ⇒
|
|
||||||
// try {
|
|
||||||
// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
|
|
||||||
// } catch {
|
|
||||||
// case e: Exception ⇒
|
|
||||||
// connectionManager.remove(connection)
|
|
||||||
// throw e
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// //protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] =
|
|
||||||
// override def route[T](message: Any, timeout: Timeout): Future[T] = {
|
|
||||||
// import Future._
|
|
||||||
// implicit val t = timeout
|
|
||||||
// val futures = connectionManager.connections.iterable map {
|
|
||||||
// connection ⇒
|
|
||||||
// connection.?(message, timeout).asInstanceOf[Future[T]]
|
|
||||||
// }
|
|
||||||
// Future.firstCompletedOf(futures)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// protected def next: Option[ActorRef] = None
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
|
|
||||||
// *
|
|
||||||
// * @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
// */
|
|
||||||
//class DirectRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter {
|
|
||||||
//
|
|
||||||
// private val state = new AtomicReference[DirectRouterState]
|
|
||||||
//
|
|
||||||
// lazy val next: Option[ActorRef] = {
|
|
||||||
// val current = currentState
|
|
||||||
// if (current.ref == null) None else Some(current.ref)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @tailrec
|
|
||||||
// private def currentState: DirectRouterState = {
|
|
||||||
// val current = state.get
|
|
||||||
//
|
|
||||||
// if (current != null && connectionManager.version == current.version) {
|
|
||||||
// //we are lucky since nothing has changed in the connections.
|
|
||||||
// current
|
|
||||||
// } else {
|
|
||||||
// //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
|
|
||||||
//
|
|
||||||
// val connections = connectionManager.connections
|
|
||||||
//
|
|
||||||
// val connectionCount = connections.iterable.size
|
|
||||||
// if (connectionCount > 1)
|
|
||||||
// throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
|
|
||||||
//
|
|
||||||
// val newState = new DirectRouterState(connections.iterable.head, connections.version)
|
|
||||||
// if (state.compareAndSet(current, newState))
|
|
||||||
// //we are lucky since we just updated the state, so we can send it back as the state to use
|
|
||||||
// newState
|
|
||||||
// else //we failed to update the state, lets try again... better luck next time.
|
|
||||||
// currentState // recur
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private case class DirectRouterState(ref: ActorRef, version: Long)
|
|
||||||
//
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
///**
|
|
||||||
// * A Router that randomly selects one of the target connections to send a message to.
|
|
||||||
// *
|
|
||||||
// * @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
// */
|
|
||||||
//class RandomRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter {
|
|
||||||
//
|
|
||||||
// import java.security.SecureRandom
|
|
||||||
//
|
|
||||||
// private val state = new AtomicReference[RandomRouterState]
|
|
||||||
//
|
|
||||||
// private val random = new ThreadLocal[SecureRandom] {
|
|
||||||
// override def initialValue = SecureRandom.getInstance("SHA1PRNG")
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// def next: Option[ActorRef] = currentState.array match {
|
|
||||||
// case a if a.isEmpty ⇒ None
|
|
||||||
// case a ⇒ Some(a(random.get.nextInt(a.length)))
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// @tailrec
|
|
||||||
// private def currentState: RandomRouterState = {
|
|
||||||
// val current = state.get
|
|
||||||
//
|
|
||||||
// if (current != null && current.version == connectionManager.version) {
|
|
||||||
// //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
||||||
// current
|
|
||||||
// } else {
|
|
||||||
// //there has been a change in connections, or it was the first try, so we need to update the internal state
|
|
||||||
//
|
|
||||||
// val connections = connectionManager.connections
|
|
||||||
// val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version)
|
|
||||||
// if (state.compareAndSet(current, newState))
|
|
||||||
// //we are lucky since we just updated the state, so we can send it back as the state to use
|
|
||||||
// newState
|
|
||||||
// else //we failed to update the state, lets try again... better luck next time.
|
|
||||||
// currentState
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
|
|
||||||
//}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
|
|
||||||
* specified strategy (specific router needs to implement `gather` method).
|
|
||||||
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
|
||||||
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
|
|
||||||
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
|
||||||
*
|
|
||||||
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
||||||
* FIXME: this is also the location where message buffering should be done in case of failure.
|
|
||||||
*/
|
|
||||||
/*
|
|
||||||
trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Aggregates the responses into a single Future.
|
|
||||||
*
|
|
||||||
* @param results Futures of the responses from connections
|
|
||||||
*/
|
|
||||||
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
|
|
||||||
|
|
||||||
private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = {
|
|
||||||
val responses = connectionManager.connections.iterable.flatMap {
|
|
||||||
actor ⇒
|
|
||||||
try {
|
|
||||||
if (actor.isTerminated) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace
|
|
||||||
Some(actor.?(message, timeout).asInstanceOf[Future[S]])
|
|
||||||
} catch {
|
|
||||||
case e: Exception ⇒
|
|
||||||
connectionManager.remove(actor)
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (responses.isEmpty)
|
|
||||||
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
|
|
||||||
else gather(responses)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def route[T](message: Any, timeout: Timeout): Future[T] = message match {
|
|
||||||
case Routing.Broadcast(message) ⇒ scatterGather(message, timeout)
|
|
||||||
case message ⇒ super.route(message, timeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple router that broadcasts the message to all connections, and replies with the first response
|
|
||||||
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
|
||||||
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
|
|
||||||
* mode, the router would behave as {@link RoundRobinRouter}
|
|
||||||
*/
|
|
||||||
/*
|
|
||||||
class ScatterGatherFirstCompletedRouter(implicit dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter {
|
|
||||||
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
@ -7,7 +7,6 @@ package akka.remote
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.actor.Status._
|
import akka.actor.Status._
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.util.duration._
|
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.remote.RemoteProtocol._
|
import akka.remote.RemoteProtocol._
|
||||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||||
|
|
|
||||||
|
|
@ -68,10 +68,15 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
|
||||||
private var _server: RemoteSupport[ParsedTransportAddress] = _
|
private var _server: RemoteSupport[ParsedTransportAddress] = _
|
||||||
def server = _server
|
def server = _server
|
||||||
|
|
||||||
def init(system: ActorSystemImpl) = {
|
@volatile
|
||||||
|
private var _provider: RemoteActorRefProvider = _
|
||||||
|
def provider = _provider
|
||||||
|
|
||||||
|
def init(system: ActorSystemImpl, provider: RemoteActorRefProvider) = {
|
||||||
|
|
||||||
val log = Logging(system, "Remote")
|
val log = Logging(system, "Remote")
|
||||||
|
|
||||||
|
_provider = provider
|
||||||
_serialization = SerializationExtension(system)
|
_serialization = SerializationExtension(system)
|
||||||
_computeGridDispatcher = system.dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
|
_computeGridDispatcher = system.dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
|
||||||
_remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log)
|
_remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log)
|
||||||
|
|
|
||||||
|
|
@ -4,24 +4,14 @@
|
||||||
|
|
||||||
package akka.remote
|
package akka.remote
|
||||||
|
|
||||||
import akka.AkkaException
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.actor.Actor._
|
|
||||||
import akka.actor.Status._
|
|
||||||
import akka.routing._
|
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import akka.util.duration._
|
import akka.event.Logging
|
||||||
import akka.config.ConfigurationException
|
|
||||||
import akka.event.{ DeathWatch, Logging }
|
|
||||||
import akka.serialization.Compression.LZF
|
import akka.serialization.Compression.LZF
|
||||||
import akka.remote.RemoteProtocol._
|
import akka.remote.RemoteProtocol._
|
||||||
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
|
||||||
import com.google.protobuf.ByteString
|
import com.google.protobuf.ByteString
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import akka.dispatch.Promise
|
|
||||||
import java.net.InetAddress
|
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
|
|
@ -62,7 +52,7 @@ class RemoteActorRefProvider(
|
||||||
|
|
||||||
def init(system: ActorSystemImpl) {
|
def init(system: ActorSystemImpl) {
|
||||||
local.init(system)
|
local.init(system)
|
||||||
remote.init(system)
|
remote.init(system, this)
|
||||||
local.registerExtraNames(Map(("remote", remote.remoteDaemon)))
|
local.registerExtraNames(Map(("remote", remote.remoteDaemon)))
|
||||||
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
|
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
|
||||||
}
|
}
|
||||||
|
|
@ -145,7 +135,7 @@ class RemoteActorRefProvider(
|
||||||
|
|
||||||
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
|
||||||
|
|
||||||
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
|
def ask(within: Timeout): Option[AskActorRef] = local.ask(within)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Using (checking out) actor on a specific node.
|
* Using (checking out) actor on a specific node.
|
||||||
|
|
@ -178,7 +168,7 @@ class RemoteActorRefProvider(
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
private[akka] class RemoteActorRef private[akka] (
|
private[akka] class RemoteActorRef private[akka] (
|
||||||
provider: ActorRefProvider,
|
provider: RemoteActorRefProvider,
|
||||||
remote: RemoteSupport[ParsedTransportAddress],
|
remote: RemoteSupport[ParsedTransportAddress],
|
||||||
val path: ActorPath,
|
val path: ActorPath,
|
||||||
val getParent: InternalActorRef,
|
val getParent: InternalActorRef,
|
||||||
|
|
@ -203,7 +193,16 @@ private[akka] class RemoteActorRef private[akka] (
|
||||||
|
|
||||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
||||||
|
|
||||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout)
|
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||||
|
provider.ask(timeout) match {
|
||||||
|
case Some(a) ⇒
|
||||||
|
this.!(message)(a)
|
||||||
|
a.result
|
||||||
|
case None ⇒
|
||||||
|
this.!(message)(null)
|
||||||
|
new DefaultPromise[Any](0)(provider.dispatcher)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -149,5 +149,5 @@ class RemoteConnectionManager(
|
||||||
}
|
}
|
||||||
|
|
||||||
private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) =
|
private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) =
|
||||||
new RemoteActorRef(system.provider, remote.server, actorPath, Nobody, None)
|
new RemoteActorRef(remote.provider, remote.server, actorPath, Nobody, None)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,13 +6,12 @@ package akka.tutorial.first.java;
|
||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.actor.InternalActorRef;
|
import akka.actor.Props;
|
||||||
import akka.actor.UntypedActor;
|
import akka.actor.UntypedActor;
|
||||||
import akka.actor.UntypedActorFactory;
|
import akka.routing.RoundRobinRouter;
|
||||||
import akka.japi.Creator;
|
|
||||||
import akka.routing.*;
|
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
public class Pi {
|
public class Pi {
|
||||||
|
|
@ -105,23 +104,8 @@ public class Pi {
|
||||||
this.nrOfMessages = nrOfMessages;
|
this.nrOfMessages = nrOfMessages;
|
||||||
this.nrOfElements = nrOfElements;
|
this.nrOfElements = nrOfElements;
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
Creator<Router> routerCreator = new Creator<Router>() {
|
|
||||||
public Router create() {
|
|
||||||
// TODO (HE) : implement
|
|
||||||
//return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1));
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
LinkedList<ActorRef> actors = new LinkedList<ActorRef>() {
|
|
||||||
{
|
|
||||||
for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// FIXME routers are intended to be used like this
|
router = this.getContext().actorOf(new Props().withCreator(Worker.class).withRouting(new RoundRobinRouter(5)), "pi");
|
||||||
// TODO (HE): implement
|
|
||||||
//RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true);
|
|
||||||
//router = new RoutedActorRef(getContext().system(), props, (InternalActorRef) getSelf(), "pi");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// message handler
|
// message handler
|
||||||
|
|
@ -170,7 +154,7 @@ public class Pi {
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
// create the master
|
// create the master
|
||||||
ActorRef master = system.actorOf(new UntypedActorFactory() {
|
ActorRef master = system.actorOf(new akka.actor.UntypedActorFactory() {
|
||||||
public UntypedActor create() {
|
public UntypedActor create() {
|
||||||
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
akka.actor.deployment {
|
akka.actor.deployment {
|
||||||
/user/pi2 {
|
/user/master/pi {
|
||||||
router = round-robin
|
router = round-robin
|
||||||
nr-of-instances = 4
|
nr-of-instances = 10
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,12 +6,12 @@ package akka.tutorial.first.scala
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.{ ConfigFactory, Config }
|
||||||
|
|
||||||
object Pi extends App {
|
object Pi extends App {
|
||||||
|
|
||||||
// Initiate the calculation
|
// Initiate the calculation
|
||||||
calculate(nrOfWorkers = 4, nrOfElements = 10, nrOfMessages = 10)
|
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
|
||||||
|
|
||||||
// ====================
|
// ====================
|
||||||
// ===== Messages =====
|
// ===== Messages =====
|
||||||
|
|
@ -39,7 +39,6 @@ object Pi extends App {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Work(start, nrOfElements) ⇒
|
case Work(start, nrOfElements) ⇒
|
||||||
println("*** RECEIVED MESSAGE IN: " + self.path)
|
|
||||||
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
|
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -54,27 +53,8 @@ object Pi extends App {
|
||||||
var nrOfResults: Int = _
|
var nrOfResults: Int = _
|
||||||
var start: Long = _
|
var start: Long = _
|
||||||
|
|
||||||
// create the workers
|
// create a round robin router for the workers
|
||||||
|
val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
|
||||||
var workers = Vector.empty[ActorRef]
|
|
||||||
for (i ← 1 to 2) {
|
|
||||||
workers = context.actorOf[Worker] +: workers
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO (HE) : use this way of creating actors
|
|
||||||
//val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
|
|
||||||
|
|
||||||
/*
|
|
||||||
// wrap them with a load-balancing router
|
|
||||||
// FIXME routers are intended to be used like this
|
|
||||||
implicit val timout = context.system.settings.ActorTimeout
|
|
||||||
implicit val dispatcher = context.dispatcher
|
|
||||||
val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers))
|
|
||||||
val router = new RoutedActorRef(context.system, props, self.asInstanceOf[InternalActorRef], "pi")
|
|
||||||
*/
|
|
||||||
|
|
||||||
//val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
|
|
||||||
val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi")
|
|
||||||
|
|
||||||
// message handler
|
// message handler
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -107,7 +87,7 @@ object Pi extends App {
|
||||||
// ===== Run it =====
|
// ===== Run it =====
|
||||||
// ==================
|
// ==================
|
||||||
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
||||||
val system = ActorSystem("x", ConfigFactory.parseString("akka.actor.debug.lifecycle=true\nakka.loglevel=DEBUG"))
|
val system = ActorSystem()
|
||||||
|
|
||||||
// this latch is only plumbing to know when the calculation is completed
|
// this latch is only plumbing to know when the calculation is completed
|
||||||
val latch = new CountDownLatch(1)
|
val latch = new CountDownLatch(1)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue