!act,rem,clu #3920 Remove deprecated old routers

This commit is contained in:
Patrik Nordwall 2014-03-12 14:43:18 +01:00
parent b5be06e90c
commit 1e445b4eba
37 changed files with 67 additions and 3788 deletions

View file

@ -7,7 +7,7 @@ package akka.actor;
import akka.event.Logging;
import akka.event.Logging.LoggerInitialized;
import akka.japi.Creator;
import akka.routing.CurrentRoutees;
import akka.routing.GetRoutees;
import akka.routing.FromConfig;
import akka.routing.NoRouter;
import akka.testkit.AkkaJUnitActorSystemResource;
@ -38,7 +38,7 @@ public class JavaAPI {
final LoggerInitialized x = Logging.loggerInitialized();
final CurrentRoutees r = CurrentRoutees.getInstance();
final GetRoutees r = GetRoutees.getInstance();
final NoRouter nr = NoRouter.getInstance();
final FromConfig fc = FromConfig.getInstance();
}
@ -48,7 +48,7 @@ public class JavaAPI {
ActorRef ref = system.actorOf(Props.create(JavaAPITestActor.class));
assertNotNull(ref);
}
public static Props mkProps() {
return Props.create(new Creator<Actor>() {
public Actor create() {

View file

@ -42,29 +42,29 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri
}
"An Actor configured with a BalancingDispatcher" must {
"fail verification with a ConfigurationException if also configured with a RoundRobinRouter" in {
"fail verification with a ConfigurationException if also configured with a RoundRobinPool" in {
intercept[ConfigurationException] {
system.actorOf(RoundRobinRouter(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
system.actorOf(RoundRobinPool(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
}
}
"fail verification with a ConfigurationException if also configured with a BroadcastRouter" in {
"fail verification with a ConfigurationException if also configured with a BroadcastPool" in {
intercept[ConfigurationException] {
system.actorOf(BroadcastRouter(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
system.actorOf(BroadcastPool(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
}
}
"fail verification with a ConfigurationException if also configured with a RandomRouter" in {
"fail verification with a ConfigurationException if also configured with a RandomPool" in {
intercept[ConfigurationException] {
system.actorOf(RandomRouter(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
system.actorOf(RandomPool(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
}
}
"fail verification with a ConfigurationException if also configured with a SmallestMailboxRouter" in {
"fail verification with a ConfigurationException if also configured with a SmallestMailboxPool" in {
intercept[ConfigurationException] {
system.actorOf(SmallestMailboxRouter(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
system.actorOf(SmallestMailboxPool(2).withDispatcher("balancing-dispatcher").props(Props[TestActor]))
}
}
"fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedRouter" in {
"fail verification with a ConfigurationException if also configured with a ScatterGatherFirstCompletedPool" in {
intercept[ConfigurationException] {
system.actorOf(ScatterGatherFirstCompletedRouter(nrOfInstances = 2, within = 2 seconds).
system.actorOf(ScatterGatherFirstCompletedPool(nrOfInstances = 2, within = 2 seconds).
withDispatcher("balancing-dispatcher").props(Props[TestActor]))
}
}
@ -74,7 +74,7 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri
}
"An Actor configured with a non-balancing dispatcher" must {
"not fail verification with a ConfigurationException if also configured with a Router" in {
system.actorOf(RoundRobinRouter(2).props(Props[TestActor].withDispatcher("pinned-dispatcher")))
system.actorOf(RoundRobinPool(2).props(Props[TestActor].withDispatcher("pinned-dispatcher")))
}
"fail verification if the dispatcher cannot be found" in {
@ -85,13 +85,13 @@ class ActorConfigurationVerificationSpec extends AkkaSpec(ActorConfigurationVeri
"fail verification if the dispatcher cannot be found for the head of a router" in {
intercept[ConfigurationException] {
system.actorOf(RoundRobinRouter(1, routerDispatcher = "does not exist").props(Props[TestActor]))
system.actorOf(RoundRobinPool(1, routerDispatcher = "does not exist").props(Props[TestActor]))
}
}
"fail verification if the dispatcher cannot be found for the routees of a router" in {
intercept[ConfigurationException] {
system.actorOf(RoundRobinRouter(1).props(Props[TestActor].withDispatcher("does not exist")))
system.actorOf(RoundRobinPool(1).props(Props[TestActor].withDispatcher("does not exist")))
}
}
}

View file

@ -32,33 +32,33 @@ object DeployerSpec {
mailbox = my-mailbox
}
/service-round-robin {
router = round-robin
router = round-robin-pool
}
/service-random {
router = random
router = random-pool
}
/service-scatter-gather {
router = scatter-gather
router = scatter-gather-pool
within = 2 seconds
}
/service-consistent-hashing {
router = consistent-hashing
router = consistent-hashing-pool
}
/service-resizer {
router = round-robin
router = round-robin-pool
resizer {
lower-bound = 1
upper-bound = 10
}
}
/some/random-service {
router = round-robin
router = round-robin-pool
}
"/some/*" {
router = random
router = random-pool
}
"/*/some" {
router = scatter-gather
router = scatter-gather-pool
}
}
""", ConfigParseOptions.defaults)
@ -126,7 +126,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
val invalidDeployerConf = ConfigFactory.parseString("""
akka.actor.deployment {
/service-invalid-number-of-instances {
router = round-robin
router = round-robin-pool
nr-of-instances = boom
}
}
@ -141,7 +141,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
val invalidDeployerConf = ConfigFactory.parseString("""
akka.actor.deployment {
/gul/ubåt {
router = round-robin
router = round-robin-pool
nr-of-instances = 2
}
}

View file

@ -1,289 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.oldrouting
import language.postfixOps
import java.util.concurrent.atomic.AtomicInteger
import org.junit.runner.RunWith
import akka.actor.{ Props, Deploy, Actor, ActorRef }
import akka.ConfigurationException
import scala.concurrent.Await
import akka.pattern.{ ask, gracefulStop }
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
import scala.concurrent.duration._
import akka.actor.UnstartedCell
import akka.routing._
object ConfiguredLocalRoutingSpec {
val config = """
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-max = 16
}
}
deployment {
/config {
router = random
nr-of-instances = 4
}
/weird {
router = round-robin
nr-of-instances = 3
}
"/weird/*" {
router = round-robin
nr-of-instances = 2
}
}
}
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender {
def routerConfig(ref: ActorRef): RouterConfig = ref match {
case r: RoutedActorRef
r.underlying match {
case c: RoutedActorCell c.routerConfig
case _: UnstartedCell awaitCond(r.isStarted, 1 second, 10 millis); routerConfig(ref)
}
}
"RouterConfig" must {
"be picked up from Props" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get" sender() ! context.props
}
}).withRouter(RoundRobinRouter(12)), "someOther")
routerConfig(actor) should be(RoundRobinRouter(12))
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
"be overridable in config" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get" sender() ! context.props
}
}).withRouter(RoundRobinRouter(12)), "config")
routerConfig(actor) should be(RandomPool(4))
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
"be overridable in explicit deployment" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get" sender() ! context.props
}
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther")
routerConfig(actor) should be(RoundRobinRouter(12))
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
"be overridable in config even with explicit deployment" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get" sender() ! context.props
}
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config")
routerConfig(actor) should be(RandomPool(4))
Await.result(gracefulStop(actor, 3 seconds), 3 seconds)
}
"fail with an exception if not correct" in {
intercept[ConfigurationException] {
system.actorOf(Props.empty.withRouter(FromConfig))
}
}
"not get confused when trying to wildcard-configure children" in {
val router = system.actorOf(Props(new Actor {
testActor ! self
def receive = { case _ }
}).withRouter(FromConfig), "weird")
val recv = Set() ++ (for (_ 1 to 3) yield expectMsgType[ActorRef])
val expc = Set('a', 'b', 'c') map (i system.actorFor("/user/weird/$" + i))
recv should be(expc)
expectNoMsg(1 second)
}
}
"round robin router" must {
"be able to shut down its instance" in {
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "hello" helloLatch.countDown()
}
override def postStop() {
stopLatch.countDown()
}
}).withRouter(RoundRobinRouter(5)), "round-robin-shutdown")
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger
var replies = Map.empty[Int, Int]
for (i 0 until connectionCount) {
replies += i -> 0
}
val actor = system.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case "hit" sender() ! id
case "end" doneLatch.countDown()
}
}).withRouter(RoundRobinRouter(connectionCount)), "round-robin")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
}
counter.get should be(connectionCount)
actor ! Broadcast("end")
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ should be(iterationCount) }
}
"deliver a broadcast message using the !" in {
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "hello" helloLatch.countDown()
}
override def postStop() {
stopLatch.countDown()
}
}).withRouter(RoundRobinRouter(5)), "round-robin-broadcast")
actor ! Broadcast("hello")
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
}
"random router" must {
"be able to shut down its instance" in {
val stopLatch = new TestLatch(7)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "hello" sender() ! "world"
}
override def postStop() {
stopLatch.countDown()
}
}).withRouter(RandomRouter(7)), "random-shutdown")
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
actor ! "hello"
within(2 seconds) {
for (i 1 to 5) expectMsg("world")
}
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
"deliver messages in a random fashion" in {
val connectionCount = 10
val iterationCount = 100
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger
var replies = Map.empty[Int, Int]
for (i 0 until connectionCount) {
replies = replies + (i -> 0)
}
val actor = system.actorOf(Props(new Actor {
lazy val id = counter.getAndIncrement()
def receive = {
case "hit" sender() ! id
case "end" doneLatch.countDown()
}
}).withRouter(RandomRouter(connectionCount)), "random")
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
}
counter.get should be(connectionCount)
actor ! Broadcast("end")
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ should be > (0) }
replies.values.sum should be(iterationCount * connectionCount)
}
"deliver a broadcast message using the !" in {
val helloLatch = new TestLatch(6)
val stopLatch = new TestLatch(6)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "hello" helloLatch.countDown()
}
override def postStop() {
stopLatch.countDown()
}
}).withRouter(RandomRouter(6)), "random-broadcast")
actor ! Broadcast("hello")
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
Await.ready(stopLatch, 5 seconds)
}
}
}

View file

@ -1,105 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.oldrouting
import scala.concurrent.Await
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.testkit.AkkaSpec
import akka.testkit._
import akka.routing._
object ConsistentHashingRouterSpec {
val config = """
akka.actor.deployment {
/router1 {
router = consistent-hashing
nr-of-instances = 3
virtual-nodes-factor = 17
}
/router2 {
router = consistent-hashing
nr-of-instances = 5
}
}
"""
class Echo extends Actor {
def receive = {
case _ sender() ! self
}
}
final case class Msg(key: Any, data: String) extends ConsistentHashable {
override def consistentHashKey = key
}
final case class MsgKey(name: String)
final case class Msg2(key: Any, data: String)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender {
import akka.routing.ConsistentHashingRouterSpec._
implicit val ec = system.dispatcher
val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1")
"consistent hashing router" must {
"create routees from configuration" in {
val currentRoutees = Await.result(router1 ? CurrentRoutees, remainingOrDefault).asInstanceOf[RouterRoutees]
currentRoutees.routees.size should be(3)
}
"select destination based on consistentHashKey of the message" in {
router1 ! Msg("a", "A")
val destinationA = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
expectMsg(destinationA)
router1 ! Msg(17, "B")
val destinationB = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
expectMsg(destinationB)
router1 ! Msg(MsgKey("c"), "C")
val destinationC = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
expectMsg(destinationC)
}
"select destination with defined consistentHashRoute" in {
def hashMapping: ConsistentHashMapping = {
case Msg2(key, data) key
}
val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(
hashMapping = hashMapping)), "router2")
router2 ! Msg2("a", "A")
val destinationA = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
expectMsg(destinationA)
router2 ! Msg2(17, "B")
val destinationB = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17)
expectMsg(destinationB)
router2 ! Msg2(MsgKey("c"), "C")
val destinationC = expectMsgType[ActorRef]
router2 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c"))
expectMsg(destinationC)
}
}
}

View file

@ -1,218 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.oldrouting
import language.postfixOps
import akka.actor.Actor
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Props
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.collection.immutable
import akka.actor.ActorRef
import akka.pattern.ask
import scala.util.Try
import akka.routing._
object ResizerSpec {
val config = """
akka.actor.serialize-messages = off
akka.actor.deployment {
/router1 {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 3
}
}
}
"""
class TestActor extends Actor {
def receive = {
case latch: TestLatch latch.countDown()
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with ImplicitSender {
import akka.routing.ResizerSpec._
override def atStartup: Unit = {
// when shutting down some Resize messages might hang around
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*Resize")))
}
def routeeSize(router: ActorRef): Int =
Await.result(router ? CurrentRoutees, remainingOrDefault).asInstanceOf[RouterRoutees].routees.size
"DefaultResizer" must {
"use settings to evaluate capacity" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 3)
val c1 = resizer.capacity(Vector.empty[Routee])
c1 should be(2)
val current = Vector(
ActorRefRoutee(system.actorOf(Props[TestActor])),
ActorRefRoutee(system.actorOf(Props[TestActor])))
val c2 = resizer.capacity(current)
c2 should be(0)
}
"use settings to evaluate rampUp" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 10,
rampupRate = 0.2)
resizer.rampup(pressure = 9, capacity = 10) should be(0)
resizer.rampup(pressure = 5, capacity = 5) should be(1)
resizer.rampup(pressure = 6, capacity = 6) should be(2)
}
"use settings to evaluate backoff" in {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 10,
backoffThreshold = 0.3,
backoffRate = 0.1)
resizer.backoff(pressure = 10, capacity = 10) should be(0)
resizer.backoff(pressure = 4, capacity = 10) should be(0)
resizer.backoff(pressure = 3, capacity = 10) should be(0)
resizer.backoff(pressure = 2, capacity = 10) should be(-1)
resizer.backoff(pressure = 0, capacity = 10) should be(-1)
resizer.backoff(pressure = 1, capacity = 9) should be(-1)
resizer.backoff(pressure = 0, capacity = 9) should be(-1)
}
"be possible to define programmatically" in {
val latch = new TestLatch(3)
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 3)
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
router ! latch
router ! latch
router ! latch
Await.ready(latch, remainingOrDefault)
// messagesPerResize is 10 so there is no risk of additional resize
routeeSize(router) should be(2)
}
"be possible to define in configuration" in {
val latch = new TestLatch(3)
val router = system.actorOf(Props[TestActor].withRouter(FromConfig()), "router1")
router ! latch
router ! latch
router ! latch
Await.ready(latch, remainingOrDefault)
routeeSize(router) should be(2)
}
"grow as needed under pressure" in {
// make sure the pool starts at the expected lower limit and grows to the upper as needed
// as influenced by the backlog of blocking pooled actors
val resizer = DefaultResizer(
lowerBound = 3,
upperBound = 5,
rampupRate = 0.1,
backoffRate = 0.0,
pressureThreshold = 1,
messagesPerResize = 1,
backoffThreshold = 0.0)
val router = system.actorOf(Props(new Actor {
def receive = {
case d: FiniteDuration
Thread.sleep(d.dilated.toMillis); sender() ! "done"
case "echo" sender() ! "reply"
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
// first message should create the minimum number of routees
router ! "echo"
expectMsg("reply")
routeeSize(router) should be(resizer.lowerBound)
def loop(loops: Int, d: FiniteDuration) = {
for (m 0 until loops) {
router ! d
// sending in too quickly will result in skipped resize due to many resizeInProgress conflicts
Thread.sleep(20.millis.dilated.toMillis)
}
within((d * loops / resizer.lowerBound) + 2.seconds.dilated) {
for (m 0 until loops) expectMsg("done")
}
}
// 2 more should go thru without triggering more
loop(2, 200 millis)
routeeSize(router) should be(resizer.lowerBound)
// a whole bunch should max it out
loop(20, 500 millis)
routeeSize(router) should be(resizer.upperBound)
}
"backoff" in within(10 seconds) {
val resizer = DefaultResizer(
lowerBound = 2,
upperBound = 5,
rampupRate = 1.0,
backoffRate = 1.0,
backoffThreshold = 0.40,
pressureThreshold = 1,
messagesPerResize = 2)
val router = system.actorOf(Props(new Actor {
def receive = {
case n: Int if n <= 0 // done
case n: Int Thread.sleep((n millis).dilated.toMillis)
}
}).withRouter(RoundRobinRouter(resizer = Some(resizer))))
// put some pressure on the router
for (m 0 until 15) {
router ! 150
Thread.sleep((20 millis).dilated.toMillis)
}
val z = routeeSize(router)
z should be > (2)
Thread.sleep((300 millis).dilated.toMillis)
// let it cool down
awaitCond({
router ! 0 // trigger resize
Thread.sleep((20 millis).dilated.toMillis)
routeeSize(router) < z
}, interval = 500.millis.dilated)
}
}
}

View file

@ -1,52 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.oldrouting
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.LocalActorRef
import scala.concurrent.duration._
import akka.routing._
class RouteeCreationSpec extends AkkaSpec {
"Creating Routees" must {
"result in visible routees" in {
val N = 100
system.actorOf(Props(new Actor {
testActor ! system.actorFor(self.path)
def receive = Actor.emptyBehavior
}).withRouter(RoundRobinRouter(N)))
for (i 1 to N) {
expectMsgType[ActorRef] match {
case _: LocalActorRef // fine
case x fail(s"routee $i was a ${x.getClass}")
}
}
}
"allow sending to context.parent" in {
val N = 100
system.actorOf(Props(new Actor {
context.parent ! "one"
def receive = {
case "one" testActor forward "two"
}
}).withRouter(RoundRobinRouter(N)))
val gotit = receiveWhile(messages = N) {
case "two" lastSender.toString
}
expectNoMsg(100.millis)
if (gotit.size != N) {
fail(s"got only ${gotit.size} from \n${gotit mkString "\n"}")
}
}
}
}

View file

@ -1,586 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.oldrouting
import language.postfixOps
import akka.actor._
import scala.collection.immutable
import akka.testkit._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.ConfigurationException
import com.typesafe.config.ConfigFactory
import akka.pattern.{ ask, pipe }
import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import akka.util.Collections.EmptyImmutableSeq
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import akka.routing._
object RoutingSpec {
val config = """
akka.actor.serialize-messages = off
akka.actor.deployment {
/router1 {
router = round-robin
nr-of-instances = 3
}
/router2 {
router = round-robin
nr-of-instances = 3
}
/router3 {
router = round-robin
nr-of-instances = 0
}
}
"""
class TestActor extends Actor {
def receive = { case _ }
}
class Echo extends Actor {
def receive = {
case _ sender() ! self
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with ImplicitSender {
implicit val ec = system.dispatcher
import RoutingSpec._
muteDeadLetters(classOf[akka.dispatch.sysmsg.DeathWatchNotification])()
"routers in general" must {
"evict terminated routees" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)))
router ! ""
router ! ""
val c1, c2 = expectMsgType[ActorRef]
watch(router)
watch(c2)
system.stop(c2)
expectTerminated(c2).existenceConfirmed should be(true)
// it might take a while until the Router has actually processed the Terminated message
awaitCond {
router ! ""
router ! ""
val res = receiveWhile(100 millis, messages = 2) {
case x: ActorRef x
}
res == Seq(c1, c1)
}
system.stop(c1)
expectTerminated(router).existenceConfirmed should be(true)
}
"not terminate when resizer is used" in {
val latch = TestLatch(1)
val resizer = new Resizer {
def isTimeForResize(messageCounter: Long): Boolean = messageCounter == 0
def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = {
latch.countDown()
2
}
}
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))))
watch(router)
Await.ready(latch, remainingOrDefault)
router ! CurrentRoutees
val routees = expectMsgType[RouterRoutees].routees
routees.size should be(2)
routees foreach system.stop
// expect no Terminated
expectNoMsg(2.seconds)
}
"be able to send their routees" in {
final case class TestRun(id: String, names: immutable.Iterable[String], actors: Int)
val actor = system.actorOf(Props(new Actor {
def receive = {
case TestRun(id, names, actors)
val routerProps = Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = names map { context.actorOf(Props(new TestActor), _) },
within = 5 seconds))
1 to actors foreach { i context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) }
}
}))
val actors = 15
val names = 1 to 20 map { "routee" + _ } toSet
actor ! TestRun("test", names, actors)
1 to actors foreach { _
val routees = expectMsgType[RouterRoutees].routees
routees.map(_.path.name).toSet should be(names)
}
expectNoMsg(500.millis)
}
"use configured nr-of-instances when FromConfig" in {
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size should be(3)
watch(router)
system.stop(router)
expectTerminated(router)
}
"use configured nr-of-instances when router is specified" in {
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2")
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size should be(3)
system.stop(router)
}
"use specified resizer when resizer not configured" in {
val latch = TestLatch(1)
val resizer = new Resizer {
def isTimeForResize(messageCounter: Long): Boolean = messageCounter == 0
def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = {
latch.countDown()
3
}
}
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3")
Await.ready(latch, remainingOrDefault)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size should be(3)
system.stop(router)
}
"set supplied supervisorStrategy" in {
//#supervision
val escalator = OneForOneStrategy() {
//#custom-strategy
case e testActor ! e; SupervisorStrategy.Escalate
//#custom-strategy
}
val router = system.actorOf(Props.empty.withRouter(
RoundRobinRouter(1, supervisorStrategy = escalator)))
//#supervision
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 1) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
val router2 = system.actorOf(Props.empty.withRouter(RoundRobinRouter(1).withSupervisorStrategy(escalator)))
router2 ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 1) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
}
"set supplied supervisorStrategy for FromConfig" in {
val escalator = OneForOneStrategy() {
case e testActor ! e; SupervisorStrategy.Escalate
}
val router = system.actorOf(Props.empty.withRouter(FromConfig.withSupervisorStrategy(escalator)), "router1")
router ! CurrentRoutees
EventFilter[ActorKilledException](occurrences = 1) intercept {
expectMsgType[RouterRoutees].routees.head ! Kill
}
expectMsgType[ActorKilledException]
}
"default to all-for-one-always-escalate strategy" in {
val restarter = OneForOneStrategy() {
case e testActor ! e; SupervisorStrategy.Restart
}
val supervisor = system.actorOf(Props(new Supervisor(restarter)))
supervisor ! Props(new Actor {
def receive = {
case x: String throw new Exception(x)
}
override def postRestart(reason: Throwable): Unit = testActor ! "restarted"
}).withRouter(RoundRobinRouter(3))
val router = expectMsgType[ActorRef]
EventFilter[Exception]("die", occurrences = 1) intercept {
router ! "die"
}
expectMsgType[Exception].getMessage should be("die")
expectMsg("restarted")
expectMsg("restarted")
expectMsg("restarted")
}
"start in-line for context.actorOf()" in {
system.actorOf(Props(new Actor {
def receive = {
case "start"
context.actorOf(Props(new Actor {
def receive = { case x sender() ! x }
}).withRouter(RoundRobinRouter(2))) ? "hello" pipeTo sender()
}
})) ! "start"
expectMsg("hello")
}
}
"no router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(NoRouter))
routedActor.isTerminated should be(false)
}
"send message to connection" in {
class Actor1 extends Actor {
def receive = {
case msg testActor forward msg
}
}
val routedActor = system.actorOf(Props(new Actor1).withRouter(NoRouter))
routedActor ! "hello"
routedActor ! "end"
expectMsg("hello")
expectMsg("end")
}
}
"round robin router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 1)))
routedActor.isTerminated should be(false)
}
//In this test a bunch of actors are created and each actor has its own counter.
//to test round robin, the routed actor receives the following sequence of messages 1 2 3 .. 1 2 3 .. 1 2 3 which it
//uses to increment his counter.
//So after n iteration, the first actor his counter should be 1*n, the second 2*n etc etc.
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new TestLatch(connectionCount)
//lets create some connections.
@volatile var actors = immutable.IndexedSeq[ActorRef]()
@volatile var counters = immutable.IndexedSeq[AtomicInteger]()
for (i 0 until connectionCount) {
counters = counters :+ new AtomicInteger()
val actor = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counters(i).addAndGet(msg)
}
}))
actors = actors :+ actor
}
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = actors)))
//send messages to the actor.
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
routedActor ! (k + 1)
}
}
routedActor ! Broadcast("end")
//now wait some and do validations.
Await.ready(doneLatch, remainingOrDefault)
for (i 0 until connectionCount)
counters(i).get should be((iterationCount * (i + 1)))
}
"deliver a broadcast message using the !" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val routedActor = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(routees = List(actor1, actor2))))
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
Await.ready(doneLatch, remainingOrDefault)
counter1.get should be(1)
counter2.get should be(1)
}
}
"random router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(nrOfInstances = 1)))
routedActor.isTerminated should be(false)
}
"deliver a broadcast message" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val routedActor = system.actorOf(Props[TestActor].withRouter(RandomRouter(routees = List(actor1, actor2))))
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
Await.ready(doneLatch, remainingOrDefault)
counter1.get should be(1)
counter2.get should be(1)
}
}
"smallest mailbox router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(SmallestMailboxRouter(nrOfInstances = 1)))
routedActor.isTerminated should be(false)
}
"deliver messages to idle actor" in {
val usedActors = new ConcurrentHashMap[Int, String]()
val router = system.actorOf(Props(new Actor {
def receive = {
case (busy: TestLatch, receivedLatch: TestLatch)
usedActors.put(0, self.path.toString)
self ! "another in busy mailbox"
receivedLatch.countDown()
Await.ready(busy, TestLatch.DefaultTimeout)
case (msg: Int, receivedLatch: TestLatch)
usedActors.put(msg, self.path.toString)
receivedLatch.countDown()
case s: String
}
}).withRouter(SmallestMailboxRouter(3)))
val busy = TestLatch(1)
val received0 = TestLatch(1)
router ! ((busy, received0))
Await.ready(received0, TestLatch.DefaultTimeout)
val received1 = TestLatch(1)
router ! ((1, received1))
Await.ready(received1, TestLatch.DefaultTimeout)
val received2 = TestLatch(1)
router ! ((2, received2))
Await.ready(received2, TestLatch.DefaultTimeout)
val received3 = TestLatch(1)
router ! ((3, received3))
Await.ready(received3, TestLatch.DefaultTimeout)
busy.countDown()
val busyPath = usedActors.get(0)
busyPath should not be (null)
val path1 = usedActors.get(1)
val path2 = usedActors.get(2)
val path3 = usedActors.get(3)
path1 should not be (busyPath)
path2 should not be (busyPath)
path3 should not be (busyPath)
}
}
"broadcast router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(nrOfInstances = 1)))
routedActor.isTerminated should be(false)
}
"broadcast message using !" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
routedActor ! 1
routedActor ! "end"
Await.ready(doneLatch, remainingOrDefault)
counter1.get should be(1)
counter2.get should be(1)
}
"broadcast message using ?" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int
counter1.addAndGet(msg)
sender() ! "ack"
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val routedActor = system.actorOf(Props[TestActor].withRouter(BroadcastRouter(routees = List(actor1, actor2))))
routedActor ? 1
routedActor ! "end"
Await.ready(doneLatch, remainingOrDefault)
counter1.get should be(1)
counter2.get should be(1)
}
}
"Scatter-gather router" must {
"be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(newActor(0)), within = 1 seconds)))
routedActor.isTerminated should be(false)
}
"deliver a broadcast message using the !" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 1 seconds)))
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
Await.ready(doneLatch, TestLatch.DefaultTimeout)
counter1.get should be(1)
counter2.get should be(1)
}
"return response, even if one of the actors has stopped" in {
val shutdownLatch = new TestLatch(1)
val actor1 = newActor(1, Some(shutdownLatch))
val actor2 = newActor(14, Some(shutdownLatch))
val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 3 seconds)))
routedActor ! Broadcast(Stop(Some(1)))
Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
Await.result(routedActor ? Broadcast(0), timeout.duration) should be(14)
}
final case class Stop(id: Option[Int] = None)
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor {
def receive = {
case Stop(None) context.stop(self)
case Stop(Some(_id)) if (_id == id) context.stop(self)
case _id: Int if (_id == id)
case x {
Thread sleep 100 * id
sender() ! id
}
}
override def postStop = {
shudownLatch foreach (_.countDown())
}
}), "Actor:" + id)
}
"router FromConfig" must {
"throw suitable exception when not configured" in {
val e = intercept[ConfigurationException] {
system.actorOf(Props[TestActor].withRouter(FromConfig), "routerNotDefined")
}
e.getMessage should include("routerNotDefined")
}
"allow external configuration" in {
val sys = ActorSystem("FromConfig", ConfigFactory
.parseString("akka.actor.deployment./routed.router=round-robin")
.withFallback(system.settings.config))
try {
sys.actorOf(Props.empty.withRouter(FromConfig), "routed")
} finally {
shutdown(sys)
}
}
}
}

View file

@ -239,10 +239,10 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
"allow external configuration" in {
val sys = ActorSystem("FromConfig", ConfigFactory
.parseString("akka.actor.deployment./routed.router=round-robin")
.parseString("akka.actor.deployment./routed.router=round-robin-pool")
.withFallback(system.settings.config))
try {
sys.actorOf(FromConfig.props(), "routed")
sys.actorOf(FromConfig.props(routeeProps = Props[TestActor]), "routed")
} finally {
shutdown(sys)
}

View file

@ -195,11 +195,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
resizerEnabled.withFallback(deployment)
else deployment
val fqn = routerTypeMapping.getOrElse(routerType,
if (deployment.getStringList("routees.paths").isEmpty())
routerTypeMapping.getOrElse(routerType + "-pool", routerType)
else
routerTypeMapping.getOrElse(routerType + "-group", routerType))
val fqn = routerTypeMapping.getOrElse(routerType, routerType)
def throwCannotInstantiateRouter(args: Seq[(Class[_], AnyRef)], cause: Throwable) =
throw new IllegalArgumentException(

View file

@ -6,7 +6,6 @@ package akka.io
import scala.util.control.NonFatal
import akka.actor._
import akka.routing.RandomRouter
import akka.io.SelectionHandler.WorkerForCommand
import akka.event.Logging

View file

@ -18,7 +18,7 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.util.Helpers.Requiring
import akka.util.SerializedSuspendableExecutionContext
import akka.actor._
import akka.routing.RandomRouter
import akka.routing.RandomPool
import akka.event.Logging
abstract class SelectionHandlerSettings(config: Config) {
@ -80,7 +80,7 @@ private[io] object SelectionHandler {
override def supervisorStrategy = connectionSupervisorStrategy
val selectorPool = context.actorOf(
props = RandomRouter(nrOfSelectors).props(Props(classOf[SelectionHandler], selectorSettings)).withDeploy(Deploy.local),
props = RandomPool(nrOfSelectors).props(Props(classOf[SelectionHandler], selectorSettings)).withDeploy(Deploy.local),
name = "selectors")
final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry Props]): Receive = {

View file

@ -98,19 +98,6 @@ object ConsistentHashingRouter {
mapper.hashKey(message)
}
/**
* Creates a new ConsistentHashingRouter, routing to the specified routees
*/
@deprecated("Use ConsistentHashingGroup", "2.3")
def apply(routees: immutable.Iterable[ActorRef]): ConsistentHashingRouter =
new ConsistentHashingRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
@deprecated("Use ConsistentHashingGroup", "2.3")
def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = apply(immutableSeq(routees))
}
object ConsistentHashingRoutingLogic {
@ -330,10 +317,9 @@ final case class ConsistentHashingPool(
* Uses the the `hashMapping` defined in code, since that can't be defined in configuration.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter this.overrideUnsetConfig(other)
case otherRouter: ConsistentHashingPool (copy(hashMapping = otherRouter.hashMapping)).overrideUnsetConfig(other)
case otherRouter: ConsistentHashingRouter (copy(hashMapping = otherRouter.hashMapping)).overrideUnsetConfig(other)
case _ throw new IllegalArgumentException("Expected ConsistentHashingPool, got [%s]".format(other))
case _: FromConfig | _: NoRouter this.overrideUnsetConfig(other)
case otherRouter: ConsistentHashingPool (copy(hashMapping = otherRouter.hashMapping)).overrideUnsetConfig(other)
case _ throw new IllegalArgumentException("Expected ConsistentHashingPool, got [%s]".format(other))
}
}
@ -429,143 +415,3 @@ private[akka] final case class ConsistentRoutee(routee: Routee, selfAddress: Add
}
}
}
/**
* A Router that uses consistent hashing to select a connection based on the
* sent message.
*
* There is 3 ways to define what data to use for the consistent hash key.
*
* 1. You can define `hashMapping` / `withHashMapper`
* of the router to map incoming messages to their consistent hash key.
* This makes the decision transparent for the sender.
*
* 2. The messages may implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]].
* The key is part of the message and it's convenient to define it together
* with the message definition.
*
* 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]]
* to define what data to use for the consistent hash key. The sender knows
* the key to use.
*
* These ways to define the consistent hash key can be use together and at
* the same time for one router. The `hashMapping` is tried first.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
* @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistentHash]]
* @param hashMapping partial function from message to the data to
* use for the consistent hash key
*/
@SerialVersionUID(1L)
@deprecated("Use ConsistentHashingPool or ConsistentHashingGroup", "2.3")
final case class ConsistentHashingRouter(
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
val virtualNodesFactor: Int = 0,
val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[ConsistentHashingRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): ConsistentHashingRouter = copy(resizer = Some(resizer))
/**
* Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistentHash]]
*/
def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes)
/**
* Java API for setting the mapping from message to the data to use for the consistent hash key.
*/
def withHashMapper(mapping: ConsistentHashingRouter.ConsistentHashMapper) =
copy(hashMapping = ConsistentHashingRouter.hashMappingAdapter(mapping))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
* Uses the the `hashMapping` defined in code, since that can't be defined in configuration.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter this.overrideUnsetConfig(other)
case otherRouter: ConsistentHashingRouter (copy(hashMapping = otherRouter.hashMapping)).overrideUnsetConfig(other)
case _ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other))
}
override def createRouter(system: ActorSystem): Router =
new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping))
}
/**
* INTERNAL API
* Important to use ActorRef with full address, with host and port, in the hash ring,
* so that same ring is produced on different nodes.
* The ConsistentHash uses toString of the ring nodes, and the ActorRef itself
* isn't a good representation, because LocalActorRef doesn't include the
* host and port.
*/
@deprecated("Replaced by ConsistentRoutee", "2.3")
private[akka] final case class ConsistentActorRef(actorRef: ActorRef, selfAddress: Address) {
override def toString: String = {
actorRef.path.address match {
case Address(_, _, None, None) actorRef.path.toStringWithAddress(selfAddress)
case a actorRef.path.toString
}
}
}

View file

@ -1,545 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import language.implicitConversions
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import akka.ConfigurationException
import akka.dispatch.{ Envelope, Dispatchers }
import akka.pattern.pipe
import akka.japi.Util.immutableSeq
import com.typesafe.config.Config
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.TimeUnit
import akka.event.Logging.Warning
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import akka.event.Logging.Warning
import akka.dispatch.{ MailboxType, MessageDispatcher }
/**
* Sending this message to a router will make it send back its currently used routees.
* A RouterRoutees message is sent asynchronously to the "requester" containing information
* about what routees the router is routing over.
*/
@deprecated("Use GetRoutees", "2.3")
@SerialVersionUID(1L) abstract class CurrentRoutees extends RouterManagementMesssage
@deprecated("Use GetRoutees", "2.3")
@SerialVersionUID(1L) case object CurrentRoutees extends CurrentRoutees {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Message used to carry information about what routees the router is currently using.
*/
@deprecated("Use GetRoutees", "2.3")
@SerialVersionUID(1L)
final case class RouterRoutees(routees: immutable.IndexedSeq[ActorRef]) {
/**
* Java API
*/
def getRoutees: java.util.List[ActorRef] = {
import scala.collection.JavaConverters._
routees.asJava
}
}
@deprecated("Use Pool or Group", "2.3")
trait DeprecatedRouterConfig extends Group with Pool
@deprecated("Use RoundRobinPool or RoundRobinGroup", "2.3")
object RoundRobinRouter {
/**
* Creates a new RoundRobinRouter, routing to the specified routees
*/
def apply(routees: immutable.Iterable[ActorRef]): RoundRobinRouter =
new RoundRobinRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): RoundRobinRouter =
apply(immutableSeq(routees))
}
/**
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use RoundRobinPool or RoundRobinGroup", "2.3")
final case class RoundRobinRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[RoundRobinRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): RoundRobinRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): RoundRobinRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): RoundRobinRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic())
}
@deprecated("Use RandomPool or RandomGroup", "2.3")
object RandomRouter {
/**
* Creates a new RandomRouter, routing to the specified routees
*/
def apply(routees: immutable.Iterable[ActorRef]): RandomRouter = new RandomRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): RandomRouter =
apply(immutableSeq(routees))
}
/**
* A Router that randomly selects one of the target connections to send a message to.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use RandomPool or RandomGroup", "2.3")
final case class RandomRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[RandomRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): RandomRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): RandomRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): RandomRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic())
}
@deprecated("Use SmallestMailboxPool", "2.3")
object SmallestMailboxRouter {
/**
* Creates a new SmallestMailboxRouter, routing to the specified routees
*/
def apply(routees: immutable.Iterable[ActorRef]): SmallestMailboxRouter =
new SmallestMailboxRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): SmallestMailboxRouter =
apply(immutableSeq(routees))
}
/**
* A Router that tries to send to the non-suspended routee with fewest messages in mailbox.
* The selection is done in this order:
* <ul>
* <li>pick any idle routee (not processing message) with empty mailbox</li>
* <li>pick any routee with empty mailbox</li>
* <li>pick routee with fewest pending messages in mailbox</li>
* <li>pick any remote routee, remote actors are consider lowest priority,
* since their mailbox size is unknown</li>
* </ul>
*
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use SmallestMailboxPool", "2.3")
final case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[SmallestMailboxRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): SmallestMailboxRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): SmallestMailboxRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): SmallestMailboxRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic())
}
@deprecated("Use BroadcastPool or BroadcastGroup", "2.3")
object BroadcastRouter {
/**
* Creates a new BroadcastRouter, routing to the specified routees
*/
def apply(routees: immutable.Iterable[ActorRef]): BroadcastRouter = new BroadcastRouter(routees = routees map (_.path.toString))
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef]): BroadcastRouter =
apply(immutableSeq(routees))
}
/**
* A Router that uses broadcasts a message to all its connections.
* <br>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use BroadcastPool or BroadcastGroup", "2.3")
final case class BroadcastRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[BroadcastRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int) = this(nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String]) = this(routees = immutableSeq(routeePaths))
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer) = this(resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): BroadcastRouter = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): BroadcastRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): BroadcastRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic())
}
@deprecated("Use ScatterGatherFirstCompletedPool or ScatterGatherFirstCompletedGroup", "2.3")
object ScatterGatherFirstCompletedRouter {
/**
* Creates a new ScatterGatherFirstCompletedRouter, routing to the specified routees, timing out after the specified Duration
*/
def apply(routees: immutable.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)
/**
* Java API to create router with the supplied 'routees' actors.
*/
def create(routees: java.lang.Iterable[ActorRef], within: FiniteDuration): ScatterGatherFirstCompletedRouter =
apply(immutableSeq(routees), within)
}
/**
* Simple router that broadcasts the message to all routees, and replies with the first response.
* <br/>
* You have to defin the 'within: Duration' parameter (f.e: within = 10 seconds).
* <br/>
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical sense as this means
* that the router should both create new actors and use the 'routees' actor(s).
* In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br/>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use ScatterGatherFirstCompletedPool or ScatterGatherFirstCompletedGroup", "2.3")
final case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil, within: FiniteDuration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[ScatterGatherFirstCompletedRouter] {
if (within <= Duration.Zero) throw new IllegalArgumentException(
"[within: Duration] can not be zero or negative, was [" + within + "]")
/**
* Java API: Constructor that sets nrOfInstances to be created.
*/
def this(nr: Int, w: FiniteDuration) = this(nrOfInstances = nr, within = w)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param routeePaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(routeePaths: java.lang.Iterable[String], w: FiniteDuration) = this(routees = immutableSeq(routeePaths), within = w)
/**
* Java API: Constructor that sets the resizer to be used.
*/
def this(resizer: Resizer, w: FiniteDuration) = this(resizer = Some(resizer), within = w)
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String) = copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): ScatterGatherFirstCompletedRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): ScatterGatherFirstCompletedRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic())
}

View file

@ -105,12 +105,6 @@ private[akka] class RoutedActorCell(
// create the initial routees before scheduling the Router actor
_router = routerConfig.createRouter(system)
routerConfig match {
case old: DeprecatedRouterConfig
if (old.nrOfInstances > 0)
addRoutees(Vector.fill(old.nrOfInstances)(old.newRoutee(routeeProps, this)))
val paths = old.paths
if (paths.nonEmpty)
addRoutees(paths.map(p old.routeeFor(p, this))(collection.breakOut))
case pool: Pool
if (pool.nrOfInstances > 0)
addRoutees(Vector.fill(pool.nrOfInstances)(pool.newRoutee(routeeProps, this)))
@ -136,9 +130,6 @@ private[akka] class RoutedActorCell(
/**
* Route the message via the router to the selected destination.
*
* When [[akka.routing.CurrentRoutees]] is sent to the RoutedActorRef it
* replies with [[akka.routing.RouterRoutees]].
*/
override def sendMessage(envelope: Envelope): Unit = {
if (routerConfig.isManagementMessage(envelope.message))
@ -167,8 +158,6 @@ private[akka] class RouterActor extends Actor {
def receive = {
case GetRoutees
sender() ! Routees(cell.router.routees)
case CurrentRoutees
context.actorOf(Props(classOf[CollectRouteeRefs], cell.router.routees, sender()))
case AddRoutee(routee)
cell.addRoutee(routee)
case RemoveRoutee(routee)
@ -187,34 +176,6 @@ private[akka] class RouterActor extends Actor {
}
}
/**
* INTERNAL API
* Backwards compatibility glue to support CurrentRoutees/RouterRoutees containing refs of
* the routees. This class is not needed when CurrentRoutees/RouterRoutees are removed.
*/
private[akka] class CollectRouteeRefs(routees: immutable.IndexedSeq[Routee], replyTo: ActorRef) extends Actor {
var collected = Vector.empty[ActorRef]
var count = 0
routees.foreach(_.send(Identify(None), self))
import context.dispatcher
context.system.scheduler.scheduleOnce(10.seconds, self, ReceiveTimeout)
def receive = {
case ActorIdentity(_, refOption)
refOption foreach { ref collected = collected :+ ref }
count += 1
if (count == routees.size) done()
case ReceiveTimeout done()
}
def done(): Unit = {
replyTo ! RouterRoutees(collected)
context.stop(self)
}
}
/**
* INTERNAL API
*/

View file

@ -10,7 +10,7 @@ import scala.concurrent.{ Promise, Await, Future }
import scala.collection.immutable
import akka.camel.TestSupport.NonSharedCamelSystem
import akka.actor.{ ActorRef, Props, Actor }
import akka.routing.BroadcastRouter
import akka.routing.BroadcastGroup
import scala.concurrent.duration._
import akka.testkit._
import akka.util.Timeout
@ -85,7 +85,7 @@ class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[L
var allActivationFutures = List[Future[List[ActorRef]]]()
var allDeactivationFutures = List[Future[List[ActorRef]]]()
val routees = (1 to number).map { i
val routeePaths = (1 to number).map { i
val activationListPromise = Promise[List[ActorRef]]()
val deactivationListPromise = Promise[List[ActorRef]]()
val activationListFuture = activationListPromise.future
@ -93,11 +93,12 @@ class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[L
allActivationFutures = allActivationFutures :+ activationListFuture
allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture
context.actorOf(Props(classOf[Registrar], i, number, activationListPromise, deactivationListPromise), "registrar-" + i)
val routee = context.actorOf(Props(classOf[Registrar], i, number, activationListPromise, deactivationListPromise), "registrar-" + i)
routee.path.toString
}
promise.success(Future.sequence(allActivationFutures) -> Future.sequence(allDeactivationFutures))
broadcaster = Some(context.actorOf(Props.empty withRouter (BroadcastRouter(routees)), "registrarRouter"))
broadcaster = Some(context.actorOf(BroadcastGroup(routeePaths).props(), "registrarRouter"))
case reg: Any
broadcaster.foreach(_.forward(reg))
}

View file

@ -211,9 +211,6 @@ akka {
# Useful for master-worker scenario where all routees are remote.
allow-local-routees = on
# Deprecated in 2.3, use routees.paths instead
routees-path = ""
# Use members with specified role, or all members if undefined or empty.
use-role = ""

View file

@ -22,7 +22,6 @@ import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterConfig
import akka.routing.DefaultResizer
import akka.cluster.routing.AdaptiveLoadBalancingRouter
import akka.cluster.routing.MixMetricsSelector
import akka.cluster.routing.HeapMetricsSelector
import akka.cluster.routing.SystemLoadAverageMetricsSelector
@ -36,7 +35,6 @@ import akka.routing.Group
import akka.cluster.routing.ClusterRouterPool
import akka.cluster.routing.ClusterRouterGroup
import com.typesafe.config.ConfigFactory
import akka.routing.DeprecatedRouterConfig
import akka.cluster.routing.ClusterRouterPoolSettings
import akka.cluster.routing.ClusterRouterGroupSettings
@ -90,13 +88,7 @@ private[akka] class ClusterActorRefProvider(
private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) {
override def parseConfig(path: String, config: Config): Option[Deploy] = {
// For backwards compatibility we must transform 'cluster.routees-path' to 'routees.paths'
val config2 =
if (config.hasPath("cluster.routees-path"))
config.withFallback(ConfigFactory.parseString(s"""routees.paths=["${config.getString("cluster.routees-path")}"]"""))
else config
super.parseConfig(path, config2) match {
super.parseConfig(path, config) match {
case d @ Some(deploy)
if (deploy.config.getBoolean("cluster.enabled")) {
if (deploy.scope != NoScopeGiven)
@ -105,13 +97,6 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig))
deploy.routerConfig match {
case r: DeprecatedRouterConfig
if (config.hasPath("cluster.routees-path"))
Some(deploy.copy(
routerConfig = ClusterRouterGroup(r, ClusterRouterGroupSettings.fromConfig(deploy.config)), scope = ClusterScope))
else
Some(deploy.copy(
routerConfig = ClusterRouterPool(r, ClusterRouterPoolSettings.fromConfig(deploy.config)), scope = ClusterScope))
case r: Pool
Some(deploy.copy(
routerConfig = ClusterRouterPool(r, ClusterRouterPoolSettings.fromConfig(deploy.config)), scope = ClusterScope))

View file

@ -178,9 +178,6 @@ final case class AdaptiveLoadBalancingPool(
case otherRouter: AdaptiveLoadBalancingPool
if (otherRouter.supervisorStrategy eq Pool.defaultSupervisorStrategy) this
else this.withSupervisorStrategy(otherRouter.supervisorStrategy)
case otherRouter: AdaptiveLoadBalancingRouter
if (otherRouter.supervisorStrategy eq Pool.defaultSupervisorStrategy) this
else this.withSupervisorStrategy(otherRouter.supervisorStrategy)
case _ throw new IllegalArgumentException("Expected AdaptiveLoadBalancingPool, got [%s]".format(other))
}
@ -512,111 +509,3 @@ private[akka] class AdaptiveLoadBalancingMetricsListener(routingLogic: AdaptiveL
}
/**
* A Router that performs load balancing of messages to cluster nodes based on
* cluster metric data.
*
* It uses random selection of routees based on probabilities derived from
* the remaining capacity of corresponding node.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
* <br>
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
* be ignored if the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param metricsSelector decides what probability to use for selecting a routee, based
* on remaining capacity as indicated by the node metrics
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
@SerialVersionUID(1L)
@deprecated("Use AdaptiveLoadBalancingPool or AdaptiveLoadBalancingGroup", "2.3")
final case class AdaptiveLoadBalancingRouter(
metricsSelector: MetricsSelector = MixMetricsSelector,
nrOfInstances: Int = 0, routees: immutable.Iterable[String] = Nil,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy)
extends DeprecatedRouterConfig with PoolOverrideUnsetConfig[AdaptiveLoadBalancingRouter] {
/**
* Java API: Constructor that sets nrOfInstances to be created.
*
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param nr number of routees to create
*/
def this(selector: MetricsSelector, nr: Int) = this(metricsSelector = selector, nrOfInstances = nr)
/**
* Java API: Constructor that sets the routees to be used.
*
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
* @param routeesPaths string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
def this(selector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) =
this(metricsSelector = selector, routees = immutableSeq(routeesPaths))
/**
* Java API: Constructor that sets the resizer to be used.
*
* @param selector the selector is responsible for producing weighted mix of routees from the node metrics
*/
def this(selector: MetricsSelector, resizer: Resizer) =
this(metricsSelector = selector, resizer = Some(resizer))
override def paths: immutable.Iterable[String] = routees
/**
* Java API for setting routerDispatcher
*/
def withDispatcher(dispatcherId: String): AdaptiveLoadBalancingRouter =
copy(routerDispatcher = dispatcherId)
/**
* Java API for setting the supervisor strategy to be used for the head
* Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): AdaptiveLoadBalancingRouter =
copy(supervisorStrategy = strategy)
/**
* Java API for setting the resizer to be used.
*/
def withResizer(resizer: Resizer): AdaptiveLoadBalancingRouter = copy(resizer = Some(resizer))
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = other match {
case _: FromConfig | _: NoRouter | _: AdaptiveLoadBalancingRouter this.overrideUnsetConfig(other)
case _ throw new IllegalArgumentException("Expected AdaptiveLoadBalancingRouter, got [%s]".format(other))
}
override def createRouter(system: ActorSystem): Router =
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))
override def routingLogicController(routingLogic: RoutingLogic): Option[Props] =
Some(Props(classOf[AdaptiveLoadBalancingMetricsListener],
routingLogic.asInstanceOf[AdaptiveLoadBalancingRoutingLogic]))
}

View file

@ -39,7 +39,6 @@ import akka.actor.ActorSystem
import akka.routing.RoutingLogic
import akka.actor.RelativeActorPath
import com.typesafe.config.Config
import akka.routing.DeprecatedRouterConfig
import akka.japi.Util.immutableSeq
object ClusterRouterGroupSettings {
@ -61,27 +60,12 @@ final case class ClusterRouterGroupSettings(
allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase {
@deprecated("Use constructor with routeesPaths Seq", "2.3")
def this(
totalInstances: Int,
routeesPath: String,
allowLocalRoutees: Boolean,
useRole: Option[String]) =
this(totalInstances, List(routeesPath), allowLocalRoutees, useRole)
/**
* Java API
*/
def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
/**
* Java API
*/
@deprecated("Use constructor with routeesPaths Iterable", "2.3")
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, routeesPath, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "")
throw new IllegalArgumentException("routeesPaths must be defined")
@ -150,7 +134,7 @@ private[akka] trait ClusterRouterSettingsBase {
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
* Delegates other duties to the local [[akka.routing.RouterConfig]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
* [[akka.routing.RoundRobinGroup]] or custom routers.
*/
@SerialVersionUID(1L)
final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase {
@ -167,8 +151,6 @@ final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSe
"ClusterRouterGroup is not allowed to wrap a ClusterRouterGroup")
case ClusterRouterGroup(local, _)
copy(local = this.local.withFallback(local).asInstanceOf[Group])
case ClusterRouterConfig(local, _)
copy(local = this.local.withFallback(local).asInstanceOf[Group])
case _
copy(local = this.local.withFallback(other).asInstanceOf[Group])
}
@ -179,7 +161,7 @@ final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSe
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
* Delegates other duties to the local [[akka.routing.RouterConfig]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
* [[akka.routing.RoundRobinGroup]] or custom routers.
*/
@SerialVersionUID(1L)
final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSettings) extends Pool with ClusterRouterConfigBase {
@ -217,8 +199,6 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti
"ClusterRouterPool is not allowed to wrap a ClusterRouterPool")
case ClusterRouterPool(otherLocal, _)
copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool])
case ClusterRouterConfig(otherLocal, _)
copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool])
case _
copy(local = this.local.withFallback(other).asInstanceOf[Pool])
}
@ -302,7 +282,7 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett
val group = cell.routerConfig match {
case x: Group x
case other
throw ActorInitializationException("ClusterRouterGroupActor can only be used with Nozle, not " + other.getClass)
throw ActorInitializationException("ClusterRouterGroupActor can only be used with group, not " + other.getClass)
}
override def receive = clusterReceive orElse super.receive

View file

@ -1,167 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import scala.collection.immutable
import akka.routing.RouterConfig
import akka.routing.Router
import akka.actor.Props
import akka.actor.ActorContext
import akka.routing.Routee
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Address
import akka.actor.ActorCell
import akka.actor.Deploy
import com.typesafe.config.ConfigFactory
import akka.routing.ActorRefRoutee
import akka.remote.RemoteScope
import akka.actor.Actor
import akka.actor.SupervisorStrategy
import akka.routing.Resizer
import akka.routing.RouterConfig
import akka.routing.Pool
import akka.routing.Group
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterActor
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorRef
import akka.cluster.Member
import scala.annotation.tailrec
import akka.actor.RootActorPath
import akka.cluster.MemberStatus
import akka.routing.ActorSelectionRoutee
import akka.actor.ActorInitializationException
import akka.routing.RouterPoolActor
import akka.actor.ActorSystem
import akka.actor.ActorSystem
import akka.routing.RoutingLogic
import akka.actor.RelativeActorPath
import com.typesafe.config.Config
import akka.routing.DeprecatedRouterConfig
@deprecated("Use ClusterRouterPoolSettings or ClusterRouterGroupSettings", "2.3")
object ClusterRouterSettings {
/**
* Settings for create and deploy of the routees
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, useRole)
/**
* Settings for remote deployment of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true, useRole)
/**
* Settings for lookup of the routees
*/
def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, useRole)
/**
* Settings for lookup of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, routeesPath: String, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, routeesPath, allowLocalRoutees = true, useRole)
def useRoleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
}
/**
* `totalInstances` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
*/
@SerialVersionUID(1L)
@deprecated("Use ClusterRouterPoolSettings or ClusterRouterGroupSettings", "2.3")
final case class ClusterRouterSettings private[akka] (
totalInstances: Int,
maxInstancesPerNode: Int,
routeesPath: String,
allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase {
/**
* Java API: Settings for create and deploy of the routees
*/
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
/**
* Java API: Settings for lookup of the routees
*/
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be > 0")
if (isRouteesPathDefined && maxInstancesPerNode != 1)
throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined")
routeesPath match {
case RelativeActorPath(elements) // good
case _
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
}
def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != ""
}
@deprecated("Use ClusterRouterPool or ClusterRouterGroup", "2.3")
@SerialVersionUID(1L)
final case class ClusterRouterConfig(local: DeprecatedRouterConfig, settings: ClusterRouterSettings) extends DeprecatedRouterConfig with ClusterRouterConfigBase {
require(local.resizer.isEmpty, "Resizer can't be used together with cluster router")
@transient private val childNameCounter = new AtomicInteger
/**
* INTERNAL API
*/
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
val name = "c" + childNameCounter.incrementAndGet
val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps, name, systemService = false)
ActorRefRoutee(ref)
}
override def nrOfInstances: Int = if (settings.allowLocalRoutees) settings.maxInstancesPerNode else 0
override def paths: immutable.Iterable[String] =
if (settings.allowLocalRoutees && settings.routeesPath.nonEmpty) List(settings.routeesPath) else Nil
override def resizer: Option[Resizer] = local.resizer
/**
* INTERNAL API
*/
override private[akka] def createRouterActor(): RouterActor =
if (settings.routeesPath.isEmpty)
new ClusterRouterPoolActor(local.supervisorStrategy, ClusterRouterPoolSettings(settings.totalInstances,
settings.maxInstancesPerNode, settings.allowLocalRoutees, settings.useRole))
else
new ClusterRouterGroupActor(ClusterRouterGroupSettings(settings.totalInstances, List(settings.routeesPath),
settings.allowLocalRoutees, settings.useRole))
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterConfig(_: ClusterRouterConfig, _) throw new IllegalStateException(
"ClusterRouterConfig is not allowed to wrap a ClusterRouterConfig")
case ClusterRouterConfig(local, _)
copy(local = this.local.withFallback(local).asInstanceOf[DeprecatedRouterConfig])
case _
copy(local = this.local.withFallback(other).asInstanceOf[DeprecatedRouterConfig])
}
}

View file

@ -1,212 +0,0 @@
/*
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.oldrouting
import language.postfixOps
import java.lang.management.ManagementFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.NodeMetrics
import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.routing.CurrentRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
import akka.cluster.routing._
object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
class Routee extends Actor {
def receive = {
case _ sender() ! Reply(Cluster(context.system).selfAddress)
}
}
class Memory extends Actor with ActorLogging {
var usedMemory: Array[Array[Int]] = _
def receive = {
case AllocateMemory
val heap = ManagementFactory.getMemoryMXBean.getHeapMemoryUsage
// getMax can be undefined (-1)
val max = math.max(heap.getMax, heap.getCommitted)
val used = heap.getUsed
log.debug("used heap before: [{}] bytes, of max [{}]", used, heap.getMax)
// allocate 70% of free space
val allocateBytes = (0.7 * (max - used)).toInt
val numberOfArrays = allocateBytes / 1024
usedMemory = Array.ofDim(numberOfArrays, 248) // each 248 element Int array will use ~ 1 kB
log.debug("used heap after: [{}] bytes", ManagementFactory.getMemoryMXBean.getHeapMemoryUsage.getUsed)
sender() ! "done"
}
}
case object AllocateMemory
final case class Reply(address: Address)
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster.metrics.collect-interval = 1s
akka.cluster.metrics.gossip-interval = 1s
akka.cluster.metrics.moving-average-half-life = 2s
akka.actor.deployment {
/router3 = {
router = adaptive
metrics-selector = cpu
nr-of-instances = 9
}
/router4 = {
router = adaptive
metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector"
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
}
""")).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class TestCustomMetricsSelector(config: Config) extends MetricsSelector {
override def weights(nodeMetrics: Set[NodeMetrics]): Map[Address, Int] = Map.empty
}
class AdaptiveLoadBalancingRouterMultiJvmNode1 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode2 extends AdaptiveLoadBalancingRouterSpec
class AdaptiveLoadBalancingRouterMultiJvmNode3 extends AdaptiveLoadBalancingRouterSpec
abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoadBalancingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import AdaptiveLoadBalancingRouterMultiJvmSpec._
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, timeout.duration).asInstanceOf[RouterRoutees].routees
def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(address) address
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def startRouter(name: String): ActorRef = {
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = AdaptiveLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), name)
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router).size should be(roles.size) }
currentRoutees(router).map(fullAddress).toSet should be(roles.map(address).toSet)
router
}
"A cluster with a AdaptiveLoadBalancingRouter" must {
"start cluster nodes" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("after-1")
}
"use all nodes in the cluster when not overloaded" taggedAs LongRunningTest in {
runOn(first) {
val router1 = startRouter("router1")
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 100
1 to iterationCount foreach { _
router1 ! "hit"
// wait a while between each message, since metrics is collected periodically
Thread.sleep(10)
}
val replies = receiveReplies(iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be > (0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-2")
}
"prefer node with more free heap capacity" taggedAs LongRunningTest in {
System.gc()
enterBarrier("gc")
runOn(second) {
within(20.seconds) {
system.actorOf(Props[Memory], "memory") ! AllocateMemory
expectMsg("done")
}
}
enterBarrier("heap-allocated")
runOn(first) {
val router2 = startRouter("router2")
// collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
val iterationCount = 3000
1 to iterationCount foreach { _
router2 ! "hit"
}
val replies = receiveReplies(iterationCount)
replies(third) should be > (replies(second))
replies.values.sum should be(iterationCount)
}
enterBarrier("after-3")
}
"create routees from configuration" taggedAs LongRunningTest in {
runOn(first) {
val router3 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router3")
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router3).size should be(9) }
currentRoutees(router3).map(fullAddress).toSet should be(Set(address(first)))
}
enterBarrier("after-4")
}
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in {
runOn(first) {
val router4 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router4")
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router4).size should be(6) }
currentRoutees(router4).map(fullAddress).toSet should be(Set(
address(first), address(second), address(third)))
}
enterBarrier("after-5")
}
}
}

View file

@ -1,175 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.oldrouting
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.routing.CurrentRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit._
import akka.cluster.routing._
object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {
class Echo extends Actor {
def receive = {
case _ sender() ! self
}
}
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
common-router-settings = {
router = consistent-hashing
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
akka.actor.deployment {
/router1 = ${common-router-settings}
/router3 = ${common-router-settings}
/router4 = ${common-router-settings}
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterConsistentHashingRouterMultiJvmNode1 extends ClusterConsistentHashingRouterSpec
class ClusterConsistentHashingRouterMultiJvmNode2 extends ClusterConsistentHashingRouterSpec
class ClusterConsistentHashingRouterMultiJvmNode3 extends ClusterConsistentHashingRouterSpec
abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterConsistentHashingRouterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterConsistentHashingRouterMultiJvmSpec._
lazy val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1")
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, timeout.duration).asInstanceOf[RouterRoutees].routees
/**
* Fills in self address for local ActorRef
*/
private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
"A cluster router with a consistent hashing router" must {
"start cluster with 2 nodes" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
enterBarrier("after-1")
}
"create routees from configuration" in {
runOn(first) {
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router1).size should be(4) }
currentRoutees(router1).map(fullAddress).toSet should be(Set(address(first), address(second)))
}
enterBarrier("after-2")
}
"select destination based on hashKey" in {
runOn(first) {
router1 ! ConsistentHashableEnvelope(message = "A", hashKey = "a")
val destinationA = expectMsgType[ActorRef]
router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a")
expectMsg(destinationA)
}
enterBarrier("after-2")
}
"deploy routees to new member nodes in the cluster" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(first) {
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router1).size should be(6) }
currentRoutees(router1).map(fullAddress).toSet should be(roles.map(address).toSet)
}
enterBarrier("after-3")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2, useRole = None))), "router2")
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router2).size should be(6) }
currentRoutees(router2).map(fullAddress).toSet should be(roles.map(address).toSet)
}
enterBarrier("after-4")
}
"handle combination of configured router and programatically defined hashMapping" taggedAs LongRunningTest in {
runOn(first) {
def hashMapping: ConsistentHashMapping = {
case s: String s
}
val router3 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(hashMapping = hashMapping)), "router3")
assertHashMapping(router3)
}
enterBarrier("after-5")
}
"handle combination of configured router and programatically defined hashMapping and ClusterRouterConfig" taggedAs LongRunningTest in {
runOn(first) {
def hashMapping: ConsistentHashMapping = {
case s: String s
}
val router4 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(
local = ConsistentHashingRouter(hashMapping = hashMapping),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), "router4")
assertHashMapping(router4)
}
enterBarrier("after-6")
}
def assertHashMapping(router: ActorRef): Unit = {
// it may take some time until router receives cluster member events
awaitAssert { currentRoutees(router).size should be(6) }
currentRoutees(router).map(fullAddress).toSet should be(roles.map(address).toSet)
router ! "a"
val destinationA = expectMsgType[ActorRef]
router ! "a"
expectMsg(destinationA)
}
}
}

View file

@ -1,366 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.oldrouting
import language.postfixOps
import scala.concurrent.Await
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.CurrentRoutees
import akka.routing.RoundRobinRouter
import akka.routing.RoutedActorRef
import akka.routing.RouterRoutees
import akka.testkit._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.cluster.routing._
object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor(routeeType: RouteeType) extends Actor {
def this() = this(DeployRoutee)
def receive = {
case "hit" sender() ! Reply(routeeType, self)
}
}
final case class Reply(routeeType: RouteeType, ref: ActorRef)
sealed trait RouteeType extends Serializable
object DeployRoutee extends RouteeType
object LookupRoutee extends RouteeType
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.actor.deployment {
/router1 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 2
}
}
/router3 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 1
allow-local-routees = off
}
}
/router4 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
routees-path = "/user/myservice"
}
}
/router5 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
use-role = a
}
}
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
nodeConfig(third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))
testTransport(on = true)
}
class ClusterRoundRobinRoutedActorMultiJvmNode1 extends ClusterRoundRobinRoutedActorSpec
class ClusterRoundRobinRoutedActorMultiJvmNode2 extends ClusterRoundRobinRoutedActorSpec
class ClusterRoundRobinRoutedActorMultiJvmNode3 extends ClusterRoundRobinRoutedActorSpec
class ClusterRoundRobinRoutedActorMultiJvmNode4 extends ClusterRoundRobinRoutedActorSpec
abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRoundRobinRoutedActorMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import ClusterRoundRobinRoutedActorMultiJvmSpec._
lazy val router1 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router1")
lazy val router2 = system.actorOf(Props[SomeActor].withRouter(ClusterRouterConfig(RoundRobinRouter(),
ClusterRouterSettings(totalInstances = 3, maxInstancesPerNode = 1, useRole = None))), "router2")
lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3")
lazy val router4 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router4")
lazy val router5 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router5")
def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(`routeeType`, ref) fullAddress(ref)
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def currentRoutees(router: ActorRef) =
Await.result(router ? CurrentRoutees, timeout.duration).asInstanceOf[RouterRoutees].routees
"A cluster router with a RoundRobin router" must {
"start cluster with 2 nodes" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
enterBarrier("after-1")
}
"deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
router1.isInstanceOf[RoutedActorRef] should be(true)
// max-nr-of-instances-per-node=2 times 2 nodes
awaitAssert(currentRoutees(router1).size should be(4))
val iterationCount = 10
for (i 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-2")
}
"lookup routees on the member nodes in the cluster" taggedAs LongRunningTest in {
// cluster consists of first and second
system.actorOf(Props(classOf[SomeActor], LookupRoutee), "myservice")
enterBarrier("myservice-started")
runOn(first) {
// 2 nodes, 1 routee on each node
awaitAssert(currentRoutees(router4).size should be(2))
val iterationCount = 10
for (i 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(LookupRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-3")
}
"deploy routees to new nodes in the cluster" taggedAs LongRunningTest in {
// add third and fourth
awaitClusterUp(first, second, third, fourth)
runOn(first) {
// max-nr-of-instances-per-node=2 times 4 nodes
awaitAssert(currentRoutees(router1).size should be(8))
val iterationCount = 10
for (i 0 until iterationCount) {
router1 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
replies.values.foreach { _ should be > (0) }
replies.values.sum should be(iterationCount)
}
enterBarrier("after-4")
}
"lookup routees on new nodes in the cluster" taggedAs LongRunningTest in {
// cluster consists of first, second, third and fourth
runOn(first) {
// 4 nodes, 1 routee on each node
awaitAssert(currentRoutees(router4).size should be(4))
val iterationCount = 10
for (i 0 until iterationCount) {
router4 ! "hit"
}
val replies = receiveReplies(LookupRoutee, iterationCount)
replies.values.foreach { _ should be > (0) }
replies.values.sum should be(iterationCount)
}
enterBarrier("after-5")
}
"deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in within(15.seconds) {
runOn(first) {
// max-nr-of-instances-per-node=1 times 3 nodes
awaitAssert(currentRoutees(router3).size should be(3))
val iterationCount = 10
for (i 0 until iterationCount) {
router3 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
replies(first) should be(0)
replies(second) should be > (0)
replies(third) should be > (0)
replies(fourth) should be > (0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-6")
}
"deploy routees to specified node role" taggedAs LongRunningTest in {
runOn(first) {
awaitAssert(currentRoutees(router5).size should be(2))
val iterationCount = 10
for (i 0 until iterationCount) {
router5 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
replies(first) should be > (0)
replies(second) should be > (0)
replies(third) should be(0)
replies(fourth) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-7")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
router2.isInstanceOf[RoutedActorRef] should be(true)
// totalInstances = 3, maxInstancesPerNode = 1
awaitAssert(currentRoutees(router2).size should be(3))
val iterationCount = 10
for (i 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
// note that router2 has totalInstances = 3, maxInstancesPerNode = 1
val routees = currentRoutees(router2)
val routeeAddresses = routees map fullAddress
routeeAddresses.size should be(3)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-8")
}
"remove routees for unreachable nodes, and add when reachable again" taggedAs LongRunningTest in within(30.seconds) {
// myservice is already running
def routees = currentRoutees(router4)
def routeeAddresses = (routees map fullAddress).toSet
runOn(first) {
// 4 nodes, 1 routee on each node
awaitAssert(currentRoutees(router4).size should be(4))
testConductor.blackhole(first, second, Direction.Both).await
awaitAssert(routees.size should be(3))
routeeAddresses should not contain (address(second))
testConductor.passThrough(first, second, Direction.Both).await
awaitAssert(routees.size should be(4))
routeeAddresses should contain(address(second))
}
enterBarrier("after-9")
}
"deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
muteMarkingAsUnreachable()
runOn(first) {
def routees = currentRoutees(router2)
def routeeAddresses = (routees map fullAddress).toSet
routees foreach watch
val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head
val downAddress = routeeAddresses.find(_ != address(first)).get
val downRoutee = routees.find(_.path.address == downAddress).get
cluster.down(downAddress)
expectMsgType[Terminated](15.seconds).actor should be(downRoutee)
awaitAssert {
routeeAddresses should contain(notUsedAddress)
routeeAddresses should not contain (downAddress)
}
val iterationCount = 10
for (i 0 until iterationCount) {
router2 ! "hit"
}
val replies = receiveReplies(DeployRoutee, iterationCount)
routeeAddresses.size should be(3)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-10")
}
}
}

View file

@ -18,7 +18,6 @@ import akka.pattern.ask
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig }
import akka.routing.GetRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit.{ LongRunningTest, DefaultTimeout, ImplicitSender }
import akka.routing.ActorRefRoutee
import akka.routing.Routees
@ -62,12 +61,12 @@ object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig {
akka.cluster.metrics.moving-average-half-life = 2s
akka.actor.deployment {
/router3 = {
router = adaptive
router = adaptive-pool
metrics-selector = cpu
nr-of-instances = 9
}
/router4 = {
router = adaptive
router = adaptive-pool
metrics-selector = "akka.cluster.routing.TestCustomMetricsSelector"
nr-of-instances = 10
cluster {

View file

@ -19,7 +19,6 @@ import akka.routing.ConsistentHashingRouter.ConsistentHashMapping
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import akka.routing.GetRoutees
import akka.routing.FromConfig
import akka.routing.RouterRoutees
import akka.testkit._
import akka.routing.ActorRefRoutee
import akka.routing.ConsistentHashingPool
@ -40,7 +39,7 @@ object ClusterConsistentHashingRouterMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
common-router-settings = {
router = consistent-hashing
router = consistent-hashing-pool
nr-of-instances = 10
cluster {
enabled = on

View file

@ -20,7 +20,6 @@ import akka.testkit._
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.routing.FromConfig
import akka.routing.RoundRobinPool
import akka.routing.RouterRoutees
import akka.routing.ActorRefRoutee
import akka.routing.ActorSelectionRoutee
import akka.routing.RoutedActorRef
@ -52,7 +51,7 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {
withFallback(ConfigFactory.parseString("""
akka.actor.deployment {
/router1 {
router = round-robin
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
@ -60,7 +59,7 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {
}
}
/router3 {
router = round-robin
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
@ -69,13 +68,13 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig {
}
}
/router4 {
router = round-robin
router = round-robin-group
nr-of-instances = 10
routees.paths = ["/user/myserviceA", "/user/myserviceB"]
cluster.enabled = on
}
/router5 {
router = round-robin
router = round-robin-pool
nr-of-instances = 10
cluster {
enabled = on
@ -173,7 +172,9 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult
runOn(first) {
// 2 nodes, 2 routees on each node
awaitAssert(currentRoutees(router4).size should be(4))
within(10.seconds) {
awaitAssert(currentRoutees(router4).size should be(4))
}
val iterationCount = 10
for (i 0 until iterationCount) {

View file

@ -17,7 +17,7 @@ object ClusterDeployerSpec {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.actor.deployment {
/user/service1 {
router = round-robin
router = round-robin-pool
nr-of-instances = 20
cluster.enabled = on
cluster.max-nr-of-instances-per-node = 3
@ -26,22 +26,12 @@ object ClusterDeployerSpec {
/user/service2 {
dispatcher = mydispatcher
mailbox = mymailbox
router = round-robin
router = round-robin-group
nr-of-instances = 20
routees.paths = ["/user/myservice"]
cluster.enabled = on
cluster.allow-local-routees = off
}
# deprecated cluster.routees-path
/user/service3 {
dispatcher = mydispatcher
mailbox = mymailbox
router = round-robin
nr-of-instances = 20
cluster.enabled = on
cluster.allow-local-routees = off
cluster.routees-path = "/user/myservice"
}
}
akka.remote.netty.tcp.port = 0
""", ConfigParseOptions.defaults)
@ -89,22 +79,6 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
"mymailbox")))
}
"be able to parse 'akka.actor.deployment._' with deprecated 'cluster.routees-path'" in {
val service = "/user/service3"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1))
deployment should not be (None)
deployment should be(Some(
Deploy(
service,
deployment.get.config,
ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings(
totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false, useRole = None)),
ClusterScope,
"mydispatcher",
"mymailbox")))
}
"have correct router mappings" in {
val mapping = system.asInstanceOf[ActorSystemImpl].provider.deployer.routerTypeMapping
mapping("adaptive-pool") should be(classOf[akka.cluster.routing.AdaptiveLoadBalancingPool].getName)

View file

@ -1,52 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.oldrouting
import akka.testkit._
import akka.actor._
import akka.routing.RoundRobinRouter
import akka.actor.OneForOneStrategy
import akka.cluster.routing._
object ClusterRouterSupervisorSpec {
class KillableActor(testActor: ActorRef) extends Actor {
def receive = {
case "go away"
throw new IllegalArgumentException("Goodbye then!")
}
}
}
class ClusterRouterSupervisorSpec extends AkkaSpec("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port = 0
""") {
import ClusterRouterSupervisorSpec._
"Cluster aware routers" must {
"use provided supervisor strategy" in {
val router = system.actorOf(Props(classOf[KillableActor], testActor).withRouter(
ClusterRouterConfig(RoundRobinRouter(supervisorStrategy = OneForOneStrategy() {
case _
testActor ! "supervised"
SupervisorStrategy.Stop
}), ClusterRouterSettings(
totalInstances = 1,
maxInstancesPerNode = 1,
allowLocalRoutees = true,
useRole = None))), name = "therouter")
router ! "go away"
expectMsg("supervised")
}
}
}

View file

@ -49,24 +49,6 @@ class ClusterRouterSupervisorSpec extends AkkaSpec("""
expectMsg("supervised")
}
"use provided supervisor strategy of deprecated router" in {
val router = system.actorOf(
ClusterRouterPool(RoundRobinPool(nrOfInstances = 1, supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) {
case _
testActor ! "supervised"
SupervisorStrategy.Stop
}), ClusterRouterPoolSettings(
totalInstances = 1,
maxInstancesPerNode = 1,
allowLocalRoutees = true,
useRole = None)).
props(Props(classOf[KillableActor], testActor)), name = "theoldrouter")
router ! "go away"
expectMsg("supervised")
}
}
}

View file

@ -25,7 +25,16 @@ Removed Deprecated Features
The following, previously deprecated, features have been removed:
* akka-dataflow
* akka-transactor
* durable mailboxes (akka-mailboxes-common, akka-file-mailbox)
* Cluster.publishCurrentClusterState
* akka.cluster.auto-down, replaced by akka.cluster.auto-down-unreachable-after in Akka 2.3
* Old routers and configuration.
Note that in router configuration you must now specify if it is a ``pool`` or a ``group``
in the way that was introduced in Akka 2.3.

View file

@ -1,95 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.oldrouting
import language.postfixOps
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Address
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.routing.Broadcast
import akka.routing.RandomRouter
import akka.routing.RoutedActorRef
import akka.testkit._
import scala.concurrent.duration._
object RandomRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor {
def receive = {
case "hit" sender() ! self
}
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "random"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class RandomRoutedRemoteActorMultiJvmNode1 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode2 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode3 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorMultiJvmNode4 extends RandomRoutedRemoteActorSpec
class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import RandomRoutedRemoteActorMultiJvmSpec._
def initialParticipants = 4
"A new remote actor configured with a Random router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
runOn(first, second, third) {
enterBarrier("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
enterBarrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] should be(true)
val connectionCount = 3
val iterationCount = 100
for (i 0 until iterationCount; k 0 until connectionCount) {
actor ! "hit"
}
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
case ref: ActorRef ref.path.address
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
enterBarrier("broadcast-end")
actor ! Broadcast(PoisonPill)
enterBarrier("end")
// since it's random we can't be too strict in the assert
replies.values count (_ > 0) should be > (connectionCount - 2)
replies.get(node(fourth).address) should be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
enterBarrier("done")
}
}
}
}

View file

@ -1,146 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.oldrouting
import language.postfixOps
import scala.collection.immutable
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Address
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.routing.Broadcast
import akka.routing.CurrentRoutees
import akka.routing.RouterRoutees
import akka.routing.RoundRobinRouter
import akka.routing.RoutedActorRef
import akka.routing.Resizer
import akka.testkit._
import scala.concurrent.duration._
import akka.routing.Routee
object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor {
def receive = {
case "hit" sender() ! self
}
}
class TestResizer extends Resizer {
def isTimeForResize(messageCounter: Long): Boolean = messageCounter <= 10
def resize(currentRoutees: immutable.IndexedSeq[Routee]): Int = 1
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "round-robin"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
/service-hello2.router = "round-robin"
/service-hello2.nr-of-instances = 0
/service-hello2.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class RoundRobinRoutedRemoteActorMultiJvmNode1 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode2 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode3 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorMultiJvmNode4 extends RoundRobinRoutedRemoteActorSpec
class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemoteActorMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
def initialParticipants = 4
"A new remote actor configured with a RoundRobin router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
runOn(first, second, third) {
enterBarrier("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
enterBarrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] should be(true)
val connectionCount = 3
val iterationCount = 10
for (i 0 until iterationCount; k 0 until connectionCount) {
actor ! "hit"
}
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
case ref: ActorRef ref.path.address
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
enterBarrier("broadcast-end")
actor ! Broadcast(PoisonPill)
enterBarrier("end")
replies.values foreach { _ should be(iterationCount) }
replies.get(node(fourth).address) should be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
enterBarrier("done")
}
}
}
"A new remote actor configured with a RoundRobin router and Resizer" must {
"be locally instantiated on a remote node after several resize rounds" taggedAs LongRunningTest in within(5 seconds) {
runOn(first, second, third) {
enterBarrier("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
enterBarrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter(
resizer = Some(new TestResizer))), "service-hello2")
actor.isInstanceOf[RoutedActorRef] should be(true)
actor ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size should be(1)
val repliesFrom: Set[ActorRef] =
(for (n 2 to 8) yield {
actor ! "hit"
awaitCond(Await.result(actor ? CurrentRoutees, timeout.duration).asInstanceOf[RouterRoutees].routees.size == n)
expectMsgType[ActorRef]
}).toSet
enterBarrier("broadcast-end")
actor ! Broadcast(PoisonPill)
enterBarrier("end")
repliesFrom.size should be(7)
val repliesFromAddresses = repliesFrom.map(_.path.address)
repliesFromAddresses should be(Set(node(first), node(second), node(third)).map(_.address))
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
enterBarrier("done")
}
}
}
}

View file

@ -1,99 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.oldrouting
import language.postfixOps
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import scala.concurrent.Await
import akka.pattern.ask
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec }
import akka.routing.Broadcast
import akka.routing.ScatterGatherFirstCompletedRouter
import akka.routing.RoutedActorRef
import akka.testkit._
import akka.testkit.TestEvent._
import scala.concurrent.duration._
import akka.actor.PoisonPill
import akka.actor.Address
object ScatterGatherRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
class SomeActor extends Actor {
def receive = {
case "hit" sender() ! self
}
}
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false))
deployOnAll("""
/service-hello.router = "scatter-gather"
/service-hello.nr-of-instances = 3
/service-hello.target.nodes = ["@first@", "@second@", "@third@"]
""")
}
class ScatterGatherRoutedRemoteActorMultiJvmNode1 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode2 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends ScatterGatherRoutedRemoteActorSpec
class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRoutedRemoteActorMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender with DefaultTimeout {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
def initialParticipants = roles.size
"A new remote actor configured with a ScatterGatherFirstCompleted router" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*")))
runOn(first, second, third) {
enterBarrier("start", "broadcast-end", "end", "done")
}
runOn(fourth) {
enterBarrier("start")
val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello")
actor.isInstanceOf[RoutedActorRef] should be(true)
val connectionCount = 3
val iterationCount = 10
for (i 0 until iterationCount; k 0 until connectionCount) {
actor ! "hit"
}
val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) {
case ref: ActorRef ref.path.address
}).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
enterBarrier("broadcast-end")
actor ! Broadcast(PoisonPill)
enterBarrier("end")
replies.values.sum should be(connectionCount * iterationCount)
replies.get(node(fourth).address) should be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node
system.stop(actor)
enterBarrier("done")
}
enterBarrier("done")
}
}
}

View file

@ -27,7 +27,7 @@ import akka.japi.Util.immutableSeq
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined
* target nodes. Delegates other duties to the local [[akka.routing.Pool]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
* [[akka.routing.RoundRobinGroup]] or custom routers.
*/
@SerialVersionUID(1L)
final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) extends Pool {

View file

@ -1,232 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.oldrouting
import akka.testkit._
import akka.routing._
import akka.actor._
import akka.remote.routing._
import com.typesafe.config._
import akka.remote.RemoteScope
object RemoteRouterSpec {
class Echo extends Actor {
def receive = {
case _ sender() ! self
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteRouterSpec extends AkkaSpec("""
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.tcp {
hostname = localhost
port = 0
}
akka.actor.deployment {
/remote-override {
router = round-robin
nr-of-instances = 4
}
}""") {
import RemoteRouterSpec._
val port = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
val sysName = system.name
val conf = ConfigFactory.parseString(
s"""
akka {
actor.deployment {
/blub {
router = round-robin
nr-of-instances = 2
target.nodes = ["akka.tcp://${sysName}@localhost:${port}"]
}
/elastic-blub {
router = round-robin
resizer {
lower-bound = 2
upper-bound = 3
}
target.nodes = ["akka.tcp://${sysName}@localhost:${port}"]
}
/remote-blub {
remote = "akka.tcp://${sysName}@localhost:${port}"
router = round-robin
nr-of-instances = 2
}
/local-blub {
remote = "akka://MasterRemoteRouterSpec"
router = round-robin
nr-of-instances = 2
target.nodes = ["akka.tcp://${sysName}@localhost:${port}"]
}
/local-blub2 {
router = round-robin
nr-of-instances = 4
target.nodes = ["akka.tcp://${sysName}@localhost:${port}"]
}
}
}""").withFallback(system.settings.config)
val masterSystem = ActorSystem("Master" + sysName, conf)
override def afterTermination() {
shutdown(masterSystem)
}
"A Remote Router" must {
"deploy its children on remote host driven by configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)), "blub")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 2
children.map(_.parent) should have size 1
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"deploy its children on remote host driven by programatic definition" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2),
Seq(Address("akka.tcp", sysName, "localhost", port)))), "blub2")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 2
children.map(_.parent) should have size 1
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"deploy dynamic resizable number of children on remote host driven by configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub")
val replies = for (i 1 to 5000) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children.size should be >= 2
children.map(_.parent) should have size 1
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"deploy remote routers based on configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub")
router.path.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 2
val parents = children.map(_.parent)
parents should have size 1
parents.head should be(router.path)
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"deploy remote routers based on explicit deployment" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "remote-blub2")
router.path.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 2
val parents = children.map(_.parent)
parents should have size 1
parents.head should be(router.path)
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"let remote deployment be overridden by local configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "local-blub")
router.path.address.toString should be("akka://MasterRemoteRouterSpec")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 2
val parents = children.map(_.parent)
parents should have size 1
parents.head.address should be(Address("akka.tcp", sysName, "localhost", port))
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"let remote deployment router be overridden by local configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "local-blub2")
router.path.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 4
val parents = children.map(_.parent)
parents should have size 1
parents.head should be(router.path)
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"let remote deployment be overridden by remote configuration" in {
val probe = TestProbe()(masterSystem)
val router = masterSystem.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(s"akka.tcp://${sysName}@localhost:${port}")))), "remote-override")
router.path.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}")
val replies = for (i 1 to 5) yield {
router.tell("", probe.ref)
probe.expectMsgType[ActorRef].path
}
val children = replies.toSet
children should have size 4
val parents = children.map(_.parent)
parents should have size 1
parents.head should be(router.path)
children foreach (_.address.toString should be(s"akka.tcp://${sysName}@localhost:${port}"))
masterSystem.stop(router)
}
"set supplied supervisorStrategy" in {
val probe = TestProbe()(masterSystem)
val escalator = OneForOneStrategy() {
case e probe.ref ! e; SupervisorStrategy.Escalate
}
val router = masterSystem.actorOf(Props.empty.withRouter(new RemoteRouterConfig(
RoundRobinRouter(1, supervisorStrategy = escalator),
Seq(Address("akka.tcp", sysName, "localhost", port)))), "blub3")
router.tell(CurrentRoutees, probe.ref)
EventFilter[ActorKilledException](occurrences = 1).intercept {
probe.expectMsgType[RouterRoutees].routees.head ! Kill
}(masterSystem)
probe.expectMsgType[ActorKilledException]
}
}
}

View file

@ -11,7 +11,7 @@ import com.typesafe.config.ConfigFactory
import akka.testkit.AkkaSpec
import akka.actor.{ Actor, Address, Props, Deploy, OneForOneStrategy, SupervisorStrategy }
import akka.remote.{ DaemonMsgCreate, RemoteScope }
import akka.routing.{ RoundRobinRouter, FromConfig }
import akka.routing.{ RoundRobinPool, FromConfig }
import scala.concurrent.duration._
object DaemonMsgCreateSerializerSpec {
@ -64,7 +64,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec {
val deploy1 = Deploy(
path = "path1",
config = ConfigFactory.parseString("a=1"),
routerConfig = RoundRobinRouter(nrOfInstances = 5, supervisorStrategy = supervisorStrategy),
routerConfig = RoundRobinPool(nrOfInstances = 5, supervisorStrategy = supervisorStrategy),
scope = RemoteScope(Address("akka", "Test", "host1", 1921)),
dispatcher = "mydispatcher")
val deploy2 = Deploy(