diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 1e449e467a..e38d2aada6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -6,6 +6,7 @@ package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers +import akka.util.duration._ import DeploymentConfig._ class DeployerSpec extends WordSpec with MustMatchers { @@ -19,9 +20,9 @@ class DeployerSpec extends WordSpec with MustMatchers { Deploy( "service-ping", None, - LeastCPU, + RoundRobin, NrOfInstances(3), - BannagePeriodFailureDetector(10), + BannagePeriodFailureDetector(10 seconds), RemoteScope(List( RemoteAddress("wallace", 2552), RemoteAddress("gromit", 2552)))))) // ClusterScope( diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 00a3366e7f..2c93b74165 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -25,7 +25,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, RoundRobin, NrOfInstances(5), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(5) @@ -61,7 +61,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, RoundRobin, NrOfInstances(10), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val connectionCount = 10 @@ -106,7 +106,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, RoundRobin, NrOfInstances(5), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(5) @@ -141,7 +141,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, Random, NrOfInstances(7), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val stopLatch = new CountDownLatch(7) @@ -175,7 +175,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, Random, NrOfInstances(10), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val connectionCount = 10 @@ -220,7 +220,7 @@ class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { None, Random, NrOfInstances(6), - RemoveConnectionOnFirstFailureLocalFailureDetector, + NoOpFailureDetector, LocalScope)) val helloLatch = new CountDownLatch(6) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index ad2600b47a..f7edfe78ea 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -3,12 +3,14 @@ package akka.routing import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.routing._ +import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger import akka.actor.Actor._ import akka.actor.{ ActorRef, Actor } import collection.mutable.LinkedList import akka.routing.Routing.Broadcast import java.util.concurrent.{ CountDownLatch, TimeUnit } +import akka.testkit._ object RoutingSpec { @@ -28,18 +30,18 @@ class RoutingSpec extends WordSpec with MustMatchers { "be started when constructed" in { val actor1 = Actor.actorOf[TestActor] - val props = RoutedProps(() ⇒ new DirectRouter, List(actor1)) + val props = RoutedProps().withDirectRouter.withLocalConnections(List(actor1)) val actor = Routing.actorOf(props, "foo") actor.isShutdown must be(false) } - "throw IllegalArgumentException at construction when no connections" in { + "throw ConfigurationException at construction when no connections" in { try { - val props = RoutedProps(() ⇒ new DirectRouter, List()) + val props = RoutedProps().withDirectRouter Routing.actorOf(props, "foo") fail() } catch { - case e: IllegalArgumentException ⇒ + case e: ConfigurationException ⇒ } } @@ -54,7 +56,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new DirectRouter, List(connection1)) + val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) val routedActor = Routing.actorOf(props, "foo") routedActor ! "hello" routedActor ! "end" @@ -75,7 +77,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new DirectRouter, List(connection1)) + val props = RoutedProps().withDirectRouter.withLocalConnections(List(connection1)) val actor = Routing.actorOf(props, "foo") actor ! Broadcast(1) @@ -92,18 +94,18 @@ class RoutingSpec extends WordSpec with MustMatchers { "be started when constructed" in { val actor1 = Actor.actorOf[TestActor] - val props = RoutedProps(() ⇒ new RoundRobinRouter, List(actor1)) + val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(actor1)) val actor = Routing.actorOf(props, "foo") actor.isShutdown must be(false) } - "throw IllegalArgumentException at construction when no connections" in { + "throw ConfigurationException at construction when no connections" in { try { - val props = RoutedProps(() ⇒ new RoundRobinRouter, List()) + val props = RoutedProps().withRoundRobinRouter Routing.actorOf(props, "foo") fail() } catch { - case e: IllegalArgumentException ⇒ + case e: ConfigurationException ⇒ } } @@ -132,7 +134,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } //create the routed actor. - val props = RoutedProps(() ⇒ new RoundRobinRouter, connections) + val props = RoutedProps().withRoundRobinRouter.withLocalConnections(connections) val actor = Routing.actorOf(props, "foo") //send messages to the actor. @@ -171,7 +173,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new RoundRobinRouter, List(connection1, connection2)) + val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1, connection2)) val actor = Routing.actorOf(props, "foo") actor ! Broadcast(1) @@ -194,7 +196,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new RoundRobinRouter, List(connection1)) + val props = RoutedProps().withRoundRobinRouter.withLocalConnections(List(connection1)) val actor = Routing.actorOf(props, "foo") try { @@ -216,18 +218,18 @@ class RoutingSpec extends WordSpec with MustMatchers { val actor1 = Actor.actorOf[TestActor] - val props = RoutedProps(() ⇒ new RandomRouter, List(actor1)) + val props = RoutedProps().withRandomRouter.withLocalConnections(List(actor1)) val actor = Routing.actorOf(props, "foo") actor.isShutdown must be(false) } - "throw IllegalArgumentException at construction when no connections" in { + "throw ConfigurationException at construction when no connections" in { try { - val props = RoutedProps(() ⇒ new RandomRouter, List()) + val props = RoutedProps().withRandomRouter Routing.actorOf(props, "foo") fail() } catch { - case e: IllegalArgumentException ⇒ + case e: ConfigurationException ⇒ } } @@ -254,7 +256,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new RandomRouter, List(connection1, connection2)) + val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1, connection2)) val actor = Routing.actorOf(props, "foo") actor ! Broadcast(1) @@ -277,7 +279,7 @@ class RoutingSpec extends WordSpec with MustMatchers { } }) - val props = RoutedProps(() ⇒ new RandomRouter, List(connection1)) + val props = RoutedProps().withRandomRouter.withLocalConnections(List(connection1)) val actor = Routing.actorOf(props, "foo") try { @@ -292,4 +294,179 @@ class RoutingSpec extends WordSpec with MustMatchers { counter1.get must be(0) } } + + "Scatter-gather router" must { + + "return response, even if one of the connections has stopped" in { + + val shutdownLatch = new TestLatch(1) + + val props = RoutedProps() + .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + actor ! Broadcast(Stop(Some(0))) + + shutdownLatch.await + + (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) + } + + "throw an exception, if all the connections have stopped" in { + + val shutdownLatch = new TestLatch(2) + + val props = RoutedProps() + .withLocalConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + actor ! Broadcast(Stop()) + + shutdownLatch.await + + (intercept[RoutingException] { + actor ? Broadcast(0) + }) must not be (null) + + } + + "return the first response from connections, when all of them replied" in { + + val props = RoutedProps() + .withLocalConnections(List(newActor(0), newActor(1))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) + + } + + "return the first response from connections, when some of them failed to reply" in { + val props = RoutedProps() + .withLocalConnections(List(newActor(0), newActor(1))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) + } + + "be started when constructed" in { + val props = RoutedProps() + .withLocalConnections(List(newActor(0))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val actor = Routing.actorOf(props, "foo") + + actor.isShutdown must be(false) + + } + + "throw ConfigurationException at construction when no connections" in { + val props = RoutedProps() + .withLocalConnections(List()) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + try { + Routing.actorOf(props, "foo") + fail() + } catch { + case e: ConfigurationException ⇒ + } + } + + "deliver one-way messages in a round robin fashion" in { + val connectionCount = 10 + val iterationCount = 10 + val doneLatch = new TestLatch(connectionCount) + + var connections = new LinkedList[ActorRef] + var counters = new LinkedList[AtomicInteger] + for (i ← 0 until connectionCount) { + counters = counters :+ new AtomicInteger() + + val connection = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counters.get(i).get.addAndGet(msg) + } + }) + connections = connections :+ connection + } + + val props = RoutedProps() + .withLocalConnections(connections) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + for (i ← 0 until iterationCount) { + for (k ← 0 until connectionCount) { + actor ! (k + 1) + } + } + + actor ! Broadcast("end") + + doneLatch.await + + for (i ← 0 until connectionCount) { + val counter = counters.get(i).get + counter.get must be((iterationCount * (i + 1))) + } + } + + "deliver a broadcast message using the !" in { + val doneLatch = new TestLatch(2) + + val counter1 = new AtomicInteger + val connection1 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) + } + }) + + val counter2 = new AtomicInteger + val connection2 = actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) + + val props = RoutedProps.apply() + .withLocalConnections(List(connection1, connection2)) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props, "foo") + + actor ! Broadcast(1) + actor ! Broadcast("end") + + doneLatch.await + + counter1.get must be(1) + counter2.get must be(1) + } + + case class Stop(id: Option[Int] = None) + + def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = actorOf(new Actor { + def receive = { + case Stop(None) ⇒ self.stop() + case Stop(Some(_id)) if (_id == id) ⇒ self.stop() + case _id: Int if (_id == id) ⇒ + case _ ⇒ Thread sleep 100 * id; tryReply(id) + } + + override def postStop = { + shudownLatch foreach (_.countDown()) + } + }) + } } diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala deleted file mode 100644 index 0d7b3f60d5..0000000000 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala +++ /dev/null @@ -1,192 +0,0 @@ -package akka.ticket - -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import akka.routing._ -import akka.actor.Actor._ -import akka.actor.{ ActorRef, Actor } -import java.util.concurrent.atomic.AtomicInteger -import collection.mutable.LinkedList -import akka.routing.Routing.Broadcast -import akka.testkit._ - -class Ticket1111Spec extends WordSpec with MustMatchers { - - "Scatter-gather router" must { - - "return response, even if one of the connections has stopped" in { - - val shutdownLatch = new TestLatch(1) - - val props = RoutedProps() - .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - actor ! Broadcast(Stop(Some(0))) - - shutdownLatch.await - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } - - "throw an exception, if all the connections have stopped" in { - - val shutdownLatch = new TestLatch(2) - - val props = RoutedProps() - .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - actor ! Broadcast(Stop()) - - shutdownLatch.await - - (intercept[RoutingException] { - actor ? Broadcast(0) - }) must not be (null) - - } - - "return the first response from connections, when all of them replied" in { - - val props = RoutedProps() - .withConnections(List(newActor(0), newActor(1))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) - - } - - "return the first response from connections, when some of them failed to reply" in { - val props = RoutedProps() - .withConnections(List(newActor(0), newActor(1))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - - } - - "be started when constructed" in { - val props = RoutedProps() - .withConnections(List(newActor(0))) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf(props, "foo") - - actor.isShutdown must be(false) - - } - - "throw IllegalArgumentException at construction when no connections" in { - val props = RoutedProps() - .withConnections(List()) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - try { - Routing.actorOf(props, "foo") - fail() - } catch { - case e: IllegalArgumentException ⇒ - } - } - - "deliver one-way messages in a round robin fashion" in { - val connectionCount = 10 - val iterationCount = 10 - val doneLatch = new TestLatch(connectionCount) - - var connections = new LinkedList[ActorRef] - var counters = new LinkedList[AtomicInteger] - for (i ← 0 until connectionCount) { - counters = counters :+ new AtomicInteger() - - val connection = actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counters.get(i).get.addAndGet(msg) - } - }) - connections = connections :+ connection - } - - val props = RoutedProps() - .withConnections(connections) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - actor ! (k + 1) - } - } - - actor ! Broadcast("end") - - doneLatch.await - - for (i ← 0 until connectionCount) { - val counter = counters.get(i).get - counter.get must be((iterationCount * (i + 1))) - } - } - - "deliver a broadcast message using the !" in { - val doneLatch = new TestLatch(2) - - val counter1 = new AtomicInteger - val connection1 = actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }) - - val counter2 = new AtomicInteger - val connection2 = actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter2.addAndGet(msg) - } - }) - - val props = RoutedProps.apply() - .withConnections(List(connection1, connection2)) - .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - - val actor = Routing.actorOf(props, "foo") - - actor ! Broadcast(1) - actor ! Broadcast("end") - - doneLatch.await - - counter1.get must be(1) - counter2.get must be(1) - } - - case class Stop(id: Option[Int] = None) - - def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = actorOf(new Actor { - def receive = { - case Stop(None) ⇒ self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ self.stop() - case _id: Int if (_id == id) ⇒ - case _ ⇒ Thread sleep 100 * id; tryReply(id) - } - - override def postStop = { - shudownLatch foreach (_.countDown()) - } - }) - - } - -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ff23e47a6b..47fd36ccf7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -4,7 +4,6 @@ package akka.actor -import DeploymentConfig._ import akka.event.EventHandler import akka.AkkaException import akka.routing._ @@ -150,22 +149,22 @@ class LocalActorRefProvider extends ActorRefProvider { Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor // create a local actor - case None | Some(Deploy(_, _, Direct, _, _, LocalScope)) ⇒ + case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒ Some(new LocalActorRef(props, address, systemService)) // create a local actor // create a routed actor ref - case deploy @ Some(Deploy(_, _, router, nrOfInstances, _, LocalScope)) ⇒ - val routerType = DeploymentConfig.routerTypeFor(router) - - val routerFactory: () ⇒ Router = routerType match { + case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { case RouterType.Direct ⇒ () ⇒ new DirectRouter case RouterType.Random ⇒ () ⇒ new RandomRouter case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ () ⇒ new ScatterGatherFirstCompletedRouter case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") case RouterType.Custom ⇒ sys.error("Router Custom not supported yet") } + val connections: Iterable[ActorRef] = if (nrOfInstances.factor > 0) Vector.fill(nrOfInstances.factor)(new LocalActorRef(props, new UUID().toString, systemService)) @@ -173,7 +172,7 @@ class LocalActorRefProvider extends ActorRefProvider { Some(Routing.actorOf(RoutedProps( routerFactory = routerFactory, - connections = connections))) + connectionManager = new LocalConnectionManager(connections)))) case _ ⇒ None // non-local actor - pass it on } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 8e9d460e1c..6bbae690b1 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -10,6 +10,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.event.EventHandler import akka.actor.DeploymentConfig._ +import akka.util.Duration import akka.util.ReflectiveAccess._ import akka.AkkaException import akka.config.{ Configuration, ConfigurationException, Config } @@ -122,7 +123,7 @@ object Deployer extends ActorDeployer { val addressPath = "akka.actor.deployment." + address configuration.getSection(addressPath) match { case None ⇒ - Some(Deploy(address, None, Direct, NrOfInstances(1), RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) + Some(Deploy(address, None, Direct, NrOfInstances(1), NoOpFailureDetector, LocalScope)) case Some(addressConfig) ⇒ @@ -133,6 +134,7 @@ object Deployer extends ActorDeployer { case "direct" ⇒ Direct case "round-robin" ⇒ RoundRobin case "random" ⇒ Random + case "scatter-gather" ⇒ ScatterGather case "least-cpu" ⇒ LeastCPU case "least-ram" ⇒ LeastRAM case "least-messages" ⇒ LeastMessages @@ -140,7 +142,7 @@ object Deployer extends ActorDeployer { createInstance[AnyRef](customRouterClassName, emptyParams, emptyArguments).fold( e ⇒ throw new ConfigurationException( "Config option [" + addressPath + ".router] needs to be one of " + - "[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or the fully qualified name of Router class]", e), + "[\"direct\", \"round-robin\", \"random\", \"scatter-gather\", \"least-cpu\", \"least-ram\", \"least-messages\" or the fully qualified name of Router class]", e), CustomRouter(_)) } @@ -169,7 +171,7 @@ object Deployer extends ActorDeployer { } // -------------------------------- - // akka.actor.deployment.
.failure-detector.xxx + // akka.actor.deployment.
.failure-detector. // -------------------------------- val failureDetectorOption: Option[FailureDetector] = addressConfig.getSection("failure-detector") match { case Some(failureDetectorConfig) ⇒ @@ -177,22 +179,27 @@ object Deployer extends ActorDeployer { case Nil ⇒ None case detector :: Nil ⇒ detector match { - case "remove-connection-on-first-local-failure" ⇒ - Some(RemoveConnectionOnFirstFailureLocalFailureDetector) + case "no-op" ⇒ + Some(NoOpFailureDetector) case "remove-connection-on-first-failure" ⇒ Some(RemoveConnectionOnFirstFailureFailureDetector) case "bannage-period" ⇒ + throw new ConfigurationException( + "Configuration for [" + addressPath + ".failure-detector.bannage-period] must have a 'time-to-ban' option defined") + + case "bannage-period.time-to-ban" ⇒ failureDetectorConfig.getSection("bannage-period") map { section ⇒ - BannagePeriodFailureDetector(section.getInt("time-to-ban", 10)) + val timeToBan = Duration(section.getInt("time-to-ban", 60), Config.TIME_UNIT) + BannagePeriodFailureDetector(timeToBan) } case "custom" ⇒ failureDetectorConfig.getSection("custom") map { section ⇒ val implementationClass = section.getString("class").getOrElse(throw new ConfigurationException( "Configuration for [" + addressPath + - "failure-detector.custom] must have a 'class' element with the fully qualified name of the failure detector class")) + ".failure-detector.custom] must have a 'class' element with the fully qualified name of the failure detector class")) CustomFailureDetector(implementationClass) } @@ -201,11 +208,11 @@ object Deployer extends ActorDeployer { case detectors ⇒ throw new ConfigurationException( "Configuration for [" + addressPath + - "failure-detector] can not have multiple sections - found [" + detectors.mkString(", ") + "]") + ".failure-detector] can not have multiple sections - found [" + detectors.mkString(", ") + "]") } case None ⇒ None } - val failureDetector = failureDetectorOption getOrElse { BannagePeriodFailureDetector(10) } // fall back to default failure detector + val failureDetector = failureDetectorOption getOrElse { NoOpFailureDetector } // fall back to default failure detector // -------------------------------- // akka.actor.deployment.
.create-as @@ -262,7 +269,7 @@ object Deployer extends ActorDeployer { // -------------------------------- addressConfig.getSection("cluster") match { case None ⇒ - Some(Deploy(address, recipe, router, nrOfInstances, RemoveConnectionOnFirstFailureLocalFailureDetector, LocalScope)) // deploy locally + Some(Deploy(address, recipe, router, nrOfInstances, NoOpFailureDetector, LocalScope)) // deploy locally case Some(clusterConfig) ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index b7b7ffa6e8..8418bab58a 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -5,7 +5,9 @@ package akka.actor import akka.config.Config +import akka.util.Duration import akka.routing.{ RouterType, FailureDetectorType } +import akka.routing.FailureDetectorType._ /** * Module holding the programmatic deployment configuration classes. @@ -24,7 +26,7 @@ object DeploymentConfig { recipe: Option[ActorRecipe], routing: Routing = Direct, nrOfInstances: NrOfInstances = ZeroNrOfInstances, - failureDetector: FailureDetector = RemoveConnectionOnFirstFailureLocalFailureDetector, + failureDetector: FailureDetector = NoOpFailureDetector, scope: Scope = LocalScope) { Address.validate(address) } @@ -44,6 +46,7 @@ object DeploymentConfig { case class Direct() extends Routing case class RoundRobin() extends Routing case class Random() extends Routing + case class ScatterGather() extends Routing case class LeastCPU() extends Routing case class LeastRAM() extends Routing case class LeastMessages() extends Routing @@ -52,6 +55,7 @@ object DeploymentConfig { case object Direct extends Routing case object RoundRobin extends Routing case object Random extends Routing + case object ScatterGather extends Routing case object LeastCPU extends Routing case object LeastRAM extends Routing case object LeastMessages extends Routing @@ -60,15 +64,15 @@ object DeploymentConfig { // --- FailureDetector // -------------------------------- sealed trait FailureDetector - case class BannagePeriodFailureDetector(timeToBan: Long) extends FailureDetector + case class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetector case class CustomFailureDetector(className: String) extends FailureDetector // For Java API - case class RemoveConnectionOnFirstFailureLocalFailureDetector() extends FailureDetector + case class NoOpFailureDetector() extends FailureDetector case class RemoveConnectionOnFirstFailureFailureDetector() extends FailureDetector // For Scala API - case object RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector + case object NoOpFailureDetector extends FailureDetector case object RemoveConnectionOnFirstFailureFailureDetector extends FailureDetector // -------------------------------- @@ -180,13 +184,13 @@ object DeploymentConfig { def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == Config.nodename) def failureDetectorTypeFor(failureDetector: FailureDetector): FailureDetectorType = failureDetector match { - case BannagePeriodFailureDetector(timeToBan) ⇒ FailureDetectorType.BannagePeriodFailureDetector(timeToBan) - case RemoveConnectionOnFirstFailureLocalFailureDetector ⇒ FailureDetectorType.RemoveConnectionOnFirstFailureLocalFailureDetector - case RemoveConnectionOnFirstFailureLocalFailureDetector() ⇒ FailureDetectorType.RemoveConnectionOnFirstFailureLocalFailureDetector - case RemoveConnectionOnFirstFailureFailureDetector ⇒ FailureDetectorType.RemoveConnectionOnFirstFailureFailureDetector + case NoOpFailureDetector ⇒ FailureDetectorType.NoOpFailureDetector + case NoOpFailureDetector() ⇒ FailureDetectorType.NoOpFailureDetector + case BannagePeriodFailureDetector(timeToBan) ⇒ FailureDetectorType.BannagePeriodFailureDetector(timeToBan) + case RemoveConnectionOnFirstFailureFailureDetector ⇒ FailureDetectorType.RemoveConnectionOnFirstFailureFailureDetector case RemoveConnectionOnFirstFailureFailureDetector() ⇒ FailureDetectorType.RemoveConnectionOnFirstFailureFailureDetector - case CustomFailureDetector(implClass) ⇒ FailureDetectorType.CustomFailureDetector(implClass) - case unknown ⇒ throw new UnsupportedOperationException("Unknown FailureDetector [" + unknown + "]") + case CustomFailureDetector(implClass) ⇒ FailureDetectorType.CustomFailureDetector(implClass) + case unknown ⇒ throw new UnsupportedOperationException("Unknown FailureDetector [" + unknown + "]") } def routerTypeFor(routing: Routing): RouterType = routing match { @@ -196,6 +200,8 @@ object DeploymentConfig { case RoundRobin() ⇒ RouterType.RoundRobin case Random ⇒ RouterType.Random case Random() ⇒ RouterType.Random + case ScatterGather ⇒ RouterType.ScatterGather + case ScatterGather() ⇒ RouterType.ScatterGather case LeastCPU ⇒ RouterType.LeastCPU case LeastCPU() ⇒ RouterType.LeastCPU case LeastRAM ⇒ RouterType.LeastRAM diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala new file mode 100644 index 0000000000..2d6d8c549e --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.routing + +import akka.actor._ + +import scala.annotation.tailrec + +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } +import java.net.InetSocketAddress + +/** + * An Iterable that also contains a version. + */ +trait VersionedIterable[A] { + val version: Long + + def iterable: Iterable[A] + + def apply(): Iterable[A] = iterable +} + +/** + * Manages connections (ActorRefs) for a router. + * + * @author Jonas Bonér + */ +trait ConnectionManager { + /** + * A version that is useful to see if there is any change in the connections. If there is a change, a router is + * able to update its internal datastructures. + */ + def version: Long + + /** + * Returns the number of 'available' connections. Value could be stale as soon as received, and this method can't be combined (easily) + * with an atomic read of and size and version. + */ + def size: Int + + /** + * Shuts the connection manager down, which stops all managed actors + */ + def shutdown() + + /** + * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is + * the time element, also the version is included to be able to read the data (the connections) and the version + * in an atomic manner. + * + * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable) + * view of some set of connections. + */ + def connections: VersionedIterable[ActorRef] + + /** + * Removes a connection from the connection manager. + * + * @param ref the dead + */ + def remove(deadRef: ActorRef) + + /** + * Creates a new connection (ActorRef) if it didn't exist. Atomically. + */ + def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef + + /** + * Fails over connections from one address to another. + */ + def failOver(from: InetSocketAddress, to: InetSocketAddress) +} + +/** + * Manages local connections for a router, e.g. local actors. + */ +class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends ConnectionManager { + + case class State(version: Long, connections: Iterable[ActorRef]) extends VersionedIterable[ActorRef] { + def iterable = connections + } + + private val state: AtomicReference[State] = new AtomicReference[State](newState()) + + private def newState() = State(Long.MinValue, initialConnections) + + def version: Long = state.get.version + + def size: Int = state.get.connections.size + + def connections = state.get + + def shutdown() { + state.get.connections foreach (_.stop()) + } + + @tailrec + final def remove(ref: ActorRef) = { + val oldState = state.get + + //remote the ref from the connections. + var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref) + + if (newList.size != oldState.connections.size) { + //one or more occurrences of the actorRef were removed, so we need to update the state. + + val newState = State(oldState.version + 1, newList) + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) remove(ref) + } + } + + def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here + + def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + throw new UnsupportedOperationException("Not supported") + } +} diff --git a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala deleted file mode 100644 index bccbd33b0b..0000000000 --- a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.routing - -import akka.AkkaException -import akka.actor._ -import akka.event.EventHandler -import akka.config.ConfigurationException -import akka.actor.UntypedChannel._ -import akka.dispatch.{ Future, Futures } -import akka.util.ReflectiveAccess - -import java.net.InetSocketAddress -import java.lang.reflect.InvocationTargetException -import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } - -import scala.annotation.tailrec - -sealed trait FailureDetectorType - -/** - * Used for declarative configuration of failure detection. - * - * @author Jonas Bonér - */ -object FailureDetectorType { - case object RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetectorType - case object RemoveConnectionOnFirstFailureFailureDetector extends FailureDetectorType - case class BannagePeriodFailureDetector(timeToBan: Long) extends FailureDetectorType - case class CustomFailureDetector(className: String) extends FailureDetectorType -} - -/** - * Misc helper and factory methods for failure detection. - */ -object FailureDetector { - - def createCustomFailureDetector( - implClass: String, - connections: Map[InetSocketAddress, ActorRef]): FailureDetector = { - - ReflectiveAccess.createInstance( - implClass, - Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), - Array[AnyRef](connections)) match { - case Right(actor) ⇒ actor - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new ConfigurationException( - "Could not instantiate custom FailureDetector of [" + - implClass + "] due to: " + - cause, cause) - } - } -} - -/** - * The FailureDetector acts like a middleman between the Router and - * the actor reference that does the routing and can dectect and act upon failure. - * - * Through the FailureDetector: - *
    - *
  1. - * the actor ref can signal that something has changed in the known set of connections. The Router can see - * when a changed happened (by checking the version) and update its internal datastructures. - *
  2. - *
  3. - * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. - *
  4. - *
- * - * @author Jonas Bonér - */ -trait FailureDetector { - - /** - * Returns true if the 'connection' is considered available. - */ - def isAvailable(connection: InetSocketAddress): Boolean - - /** - * Records a successful connection. - */ - def recordSuccess(connection: InetSocketAddress, timestamp: Long) - - /** - * Records a failed connection. - */ - def recordFailure(connection: InetSocketAddress, timestamp: Long) - - /** - * A version that is useful to see if there is any change in the connections. If there is a change, a router is - * able to update its internal datastructures. - */ - def version: Long - - /** - * Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily) - * with an atomic read of and size and version. - */ - def size: Int - - /** - * Stops all managed actors - */ - def stopAll() - - /** - * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is - * the time element, also the version is included to be able to read the data (the connections) and the version - * in an atomic manner. - * - * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable) - * view of some set of connections. - */ - def versionedIterable: VersionedIterable[ActorRef] - - /** - * A callback that can be used to indicate that a connected actorRef was dead. - *

- * Implementations should make sure that this method can be called without the actorRef being part of the - * current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the - * reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that - * not working. - * - * It could be that even after a remove has been called for a specific ActorRef, that the ActorRef - * is still being used. A good behaving Router will eventually discard this reference, but no guarantees are - * made how long this takes. - * - * @param ref the dead - */ - def remove(deadRef: ActorRef) - - /** - * TODO: document - */ - def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef - - /** - * Fails over connections from one address to another. - */ - def failOver(from: InetSocketAddress, to: InetSocketAddress) -} diff --git a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala index a04a0d9ef3..3f74b0742d 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedProps.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedProps.scala @@ -5,11 +5,26 @@ package akka.routing import akka.actor._ -import akka.util.ReflectiveAccess +import akka.util.{ ReflectiveAccess, Duration } import java.net.InetSocketAddress -import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.collection.JavaConversions.{ iterableAsScalaIterable, mapAsScalaMap } + +sealed trait FailureDetectorType + +/** + * Used for declarative configuration of failure detection. + * + * @author Jonas Bonér + */ +object FailureDetectorType { + // TODO shorten names to NoOp, BannagePeriod etc. + case object NoOpFailureDetector extends FailureDetectorType + case object RemoveConnectionOnFirstFailureFailureDetector extends FailureDetectorType + case class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetectorType + case class CustomFailureDetector(className: String) extends FailureDetectorType +} sealed trait RouterType @@ -32,6 +47,11 @@ object RouterType { */ object RoundRobin extends RouterType + /** + * A RouterType that selects the connection by using scatter gather. + */ + object ScatterGather extends RouterType + /** * A RouterType that selects the connection based on the least amount of cpu usage */ @@ -56,21 +76,6 @@ object RouterType { } -object RoutedProps { - - final val defaultTimeout = Actor.TIMEOUT - final val defaultRouterFactory = () ⇒ new RoundRobinRouter - final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled - final val defaultFailureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) ⇒ new RemoveConnectionOnFirstFailureLocalFailureDetector(connections.values) - - /** - * The default RoutedProps instance, uses the settings from the RoutedProps object starting with default* - */ - final val default = new RoutedProps - - def apply(): RoutedProps = default -} - /** * Contains the configuration to create local and clustered routed actor references. * @@ -85,12 +90,11 @@ object RoutedProps { */ case class RoutedProps( routerFactory: () ⇒ Router, - connections: Iterable[ActorRef], - failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector = RoutedProps.defaultFailureDetectorFactory, + connectionManager: ConnectionManager, timeout: Timeout = RoutedProps.defaultTimeout, localOnly: Boolean = RoutedProps.defaultLocalOnly) { - def this() = this(RoutedProps.defaultRouterFactory, List()) + def this() = this(RoutedProps.defaultRouterFactory, new LocalConnectionManager(List())) /** * Returns a new RoutedProps configured with a random router. @@ -149,28 +153,35 @@ case class RoutedProps( * * Scala API. */ - def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c) + def withLocalConnections(c: Iterable[ActorRef]): RoutedProps = copy(connectionManager = new LocalConnectionManager(c)) /** * Sets the connections to use. * * Java API. */ - def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c)) + def withLocalConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connectionManager = new LocalConnectionManager(iterableAsScalaIterable(c))) /** - * Returns a new RoutedProps configured with a FailureDetector factory. + * Sets the connections to use. * * Scala API. */ - def withFailureDetector(failureDetectorFactory: (Map[InetSocketAddress, ActorRef]) ⇒ FailureDetector): RoutedProps = - copy(failureDetectorFactory = failureDetectorFactory) + // def withRemoteConnections(c: Map[InetSocketAddress, ActorRef]): RoutedProps = copy(connectionManager = new RemoteConnectionManager(c)) /** - * Returns a new RoutedProps configured with a FailureDetector factory. + * Sets the connections to use. * * Java API. */ - def withFailureDetector(failureDetectorFactory: akka.japi.Function[Map[InetSocketAddress, ActorRef], FailureDetector]): RoutedProps = - copy(failureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) ⇒ failureDetectorFactory.apply(connections)) + // def withRemoteConnections(c: java.util.collection.Map[InetSocketAddress, ActorRef]): RoutedProps = copy(connectionManager = new RemoteConnectionManager(mapAsScalaMap(c))) } + +object RoutedProps { + final val defaultTimeout = Actor.TIMEOUT + final val defaultRouterFactory = () ⇒ new RoundRobinRouter + final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled + + def apply() = new RoutedProps() +} + diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 61acc0240a..2fbb92631c 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -36,7 +36,7 @@ trait Router { * JMM Guarantees: * This method guarantees that all changes made in this method, are visible before one of the routing methods is called. */ - def init(connections: FailureDetector) + def init(connectionManager: ConnectionManager) /** * Routes the message to one of the connections. @@ -54,78 +54,11 @@ trait Router { def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] } -/** - * An Iterable that also contains a version. - */ -trait VersionedIterable[A] { - val version: Long - - def iterable: Iterable[A] - - def apply(): Iterable[A] = iterable -} - /** * An {@link AkkaException} thrown when something goes wrong while routing a message */ class RoutingException(message: String) extends AkkaException(message) -/** - * Default "local" failure detector. This failure detector removes an actor from the - * router if an exception occured in the router's thread (e.g. when trying to add - * the message to the receiver's mailbox). - */ -class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector { - - case class State(version: Long, iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef] - - private val state = new AtomicReference[State] - - def this(connectionIterable: Iterable[ActorRef]) = { - this() - state.set(State(Long.MinValue, connectionIterable)) - } - - def isAvailable(connection: InetSocketAddress): Boolean = - state.get.iterable.find(c ⇒ connection == c).isDefined - - def recordSuccess(connection: InetSocketAddress, timestamp: Long) {} - - def recordFailure(connection: InetSocketAddress, timestamp: Long) {} - - def version: Long = state.get.version - - def size: Int = state.get.iterable.size - - def versionedIterable = state.get - - def stopAll() { - state.get.iterable foreach (_.stop()) - } - - @tailrec - final def remove(ref: ActorRef) = { - val oldState = state.get - - //remote the ref from the connections. - var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref) - - if (newList.size != oldState.iterable.size) { - //one or more occurrences of the actorRef were removed, so we need to update the state. - - val newState = State(oldState.version + 1, newList) - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(ref) - } - } - - def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here - - def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { - throw new UnsupportedOperationException("Not supported") - } -} - /** * A Helper class to create actor references that use routing. */ @@ -143,58 +76,12 @@ object Routing { //TODO If address matches an already created actor (Ahead-of-time deployed) return that actor //TODO If address exists in config, it will override the specified Props (should we attempt to merge?) //TODO If the actor deployed uses a different config, then ignore or throw exception? - + if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router") val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled val localOnly = props.localOnly - if (clusteringEnabled && !props.localOnly) - ReflectiveAccess.ClusterModule.newClusteredActorRef(props) - else { - if (props.connections.isEmpty) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing? - throw new IllegalArgumentException("A routed actorRef can't have an empty connection set") - - new RoutedActorRef(props, address) - } - } - - /** - * Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors. - * - * @param actorAddress the address of the ActorRef. - * @param connections an Iterable pointing to all connected actor references. - * @param routerType the type of routing that should be used. - * @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation - * how many connections it can handle. - */ - @deprecated("Use 'Routing.actorOf(props: RoutedProps)' instead.", "2.0") - def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = { - val router = routerType match { - case RouterType.Direct if connections.size > 1 ⇒ - throw new IllegalArgumentException("A direct router can't have more than 1 connection") - - case RouterType.Direct ⇒ - new DirectRouter - - case RouterType.Random ⇒ - new RandomRouter - - case RouterType.RoundRobin ⇒ - new RoundRobinRouter - - case r ⇒ - throw new IllegalArgumentException("Unsupported routerType " + r) - } - - if (connections.size == 0) - throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required") - - new RoutedActorRef( - new RoutedProps( - () ⇒ router, - connections, - RoutedProps.defaultFailureDetectorFactory, - RoutedProps.defaultTimeout, true), - actorAddress) + if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props) + else new RoutedActorRef(props, address) } } @@ -243,7 +130,7 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: St } } - router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections)) + router.init(routedProps.connectionManager) } /** @@ -255,21 +142,21 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: St trait BasicRouter extends Router { @volatile - protected var connections: FailureDetector = _ + protected var connectionManager: ConnectionManager = _ - def init(connections: FailureDetector) = { - this.connections = connections + def init(connectionManager: ConnectionManager) = { + this.connectionManager = connectionManager } def route(message: Any)(implicit sender: Option[ActorRef]) = message match { case Routing.Broadcast(message) ⇒ //it is a broadcast message, we are going to send to message to all connections. - connections.versionedIterable.iterable foreach { connection ⇒ + connectionManager.connections.iterable foreach { connection ⇒ try { connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' } catch { case e: Exception ⇒ - connections.remove(connection) + connectionManager.remove(connection) throw e } } @@ -281,7 +168,7 @@ trait BasicRouter extends Router { connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' } catch { case e: Exception ⇒ - connections.remove(connection) + connectionManager.remove(connection) throw e } case None ⇒ @@ -301,7 +188,7 @@ trait BasicRouter extends Router { connection.?(message, timeout)(sender).asInstanceOf[Future[T]] } catch { case e: Exception ⇒ - connections.remove(connection) + connectionManager.remove(connection) throw e } case None ⇒ @@ -328,33 +215,32 @@ class DirectRouter extends BasicRouter { private val state = new AtomicReference[DirectRouterState] lazy val next: Option[ActorRef] = { - val currentState = getState - if (currentState.ref == null) None else Some(currentState.ref) + val current = currentState + if (current.ref == null) None else Some(current.ref) } - // FIXME rename all 'getState' methods to 'currentState', non-scala @tailrec - private def getState: DirectRouterState = { - val currentState = state.get + private def currentState: DirectRouterState = { + val current = state.get - if (currentState != null && connections.version == currentState.version) { + if (current != null && connectionManager.version == current.version) { //we are lucky since nothing has changed in the connections. - currentState + current } else { //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating. - val versionedIterable = connections.versionedIterable + val connections = connectionManager.connections - val connectionCount = versionedIterable.iterable.size + val connectionCount = connections.iterable.size if (connectionCount > 1) throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount)) - val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version) - if (state.compareAndSet(currentState, newState)) + val newState = new DirectRouterState(connections.iterable.head, connections.version) + if (state.compareAndSet(current, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. - getState + currentState // recur } } @@ -373,28 +259,28 @@ class RandomRouter extends BasicRouter { //FIXME: threadlocal random? private val random = new java.util.Random(System.nanoTime) - def next: Option[ActorRef] = getState.array match { + def next: Option[ActorRef] = currentState.array match { case a if a.isEmpty ⇒ None case a ⇒ Some(a(random.nextInt(a.length))) } @tailrec - private def getState: RandomRouterState = { - val currentState = state.get + private def currentState: RandomRouterState = { + val current = state.get - if (currentState != null && currentState.version == connections.version) { + if (current != null && current.version == connectionManager.version) { //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. - currentState + current } else { //there has been a change in connections, or it was the first try, so we need to update the internal state - val versionedIterable = connections.versionedIterable - val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version) - if (state.compareAndSet(currentState, newState)) + val connections = connectionManager.connections + val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version) + if (state.compareAndSet(current, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. - getState + currentState } } @@ -410,25 +296,25 @@ class RoundRobinRouter extends BasicRouter { private val state = new AtomicReference[RoundRobinState] - def next: Option[ActorRef] = getState.next + def next: Option[ActorRef] = currentState.next @tailrec - private def getState: RoundRobinState = { - val currentState = state.get + private def currentState: RoundRobinState = { + val current = state.get - if (currentState != null && currentState.version == connections.version) { + if (current != null && current.version == connectionManager.version) { //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. - currentState + current } else { //there has been a change in connections, or it was the first try, so we need to update the internal state - val versionedIterable = connections.versionedIterable - val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version) - if (state.compareAndSet(currentState, newState)) + val connections = connectionManager.connections + val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version) + if (state.compareAndSet(current, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. - getState + currentState } } @@ -462,19 +348,20 @@ class RoundRobinRouter extends BasicRouter { trait ScatterGatherRouter extends BasicRouter with Serializable { /** - * Aggregates the responses into a single Future + * Aggregates the responses into a single Future. + * * @param results Futures of the responses from connections */ protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = { - val responses = connections.versionedIterable.iterable.flatMap { actor ⇒ + val responses = connectionManager.connections.iterable.flatMap { actor ⇒ try { if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first") Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]]) } catch { case e: Exception ⇒ - connections.remove(actor) + connectionManager.remove(actor) None } } diff --git a/akka-docs/disabled/examples/Pi.scala b/akka-docs/disabled/examples/Pi.scala index 7a08a449da..d0869426fe 100644 --- a/akka-docs/disabled/examples/Pi.scala +++ b/akka-docs/disabled/examples/Pi.scala @@ -65,7 +65,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") + val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") loadBalancerActor(CyclicIterator(workers)) //#create-workers diff --git a/akka-docs/intro/code/tutorials/first/Pi.scala b/akka-docs/intro/code/tutorials/first/Pi.scala index 09d67e955e..b75813841b 100644 --- a/akka-docs/intro/code/tutorials/first/Pi.scala +++ b/akka-docs/intro/code/tutorials/first/Pi.scala @@ -69,7 +69,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") + val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") //#create-workers //#master-receive diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala new file mode 100644 index 0000000000..918e4b1ef2 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -0,0 +1,230 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import akka.AkkaException +import akka.actor._ +import akka.event.EventHandler +import akka.config.ConfigurationException +import akka.actor.UntypedChannel._ +import akka.dispatch.{ Future, Futures } +import akka.util.ReflectiveAccess +import akka.util.Duration + +import java.net.InetSocketAddress +import java.lang.reflect.InvocationTargetException +import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } + +import scala.collection.immutable.Map +import scala.collection.mutable +import scala.annotation.tailrec + +/** + * The failure detector uses different heuristics (depending on implementation) to try to detect and manage + * failed connections. + * + * @author Jonas Bonér + */ +trait FailureDetector extends NetworkEventStream.Listener { + + def newTimestamp: Long = System.currentTimeMillis + + /** + * Returns true if the 'connection' is considered available. + */ + def isAvailable(connection: InetSocketAddress): Boolean + + /** + * Records a successful connection. + */ + def recordSuccess(connection: InetSocketAddress, timestamp: Long) + + /** + * Records a failed connection. + */ + def recordFailure(connection: InetSocketAddress, timestamp: Long) +} + +/** + * Misc helper and factory methods for failure detection. + */ +object FailureDetector { + + def createCustomFailureDetector(implClass: String): FailureDetector = { + + ReflectiveAccess.createInstance( + implClass, + Array[Class[_]](), + Array[AnyRef]()) match { + case Right(actor) ⇒ actor + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new ConfigurationException( + "Could not instantiate custom FailureDetector of [" + + implClass + "] due to: " + + cause, cause) + } + } +} + +/** + * No-op failure detector. Does not do anything. + */ +class NoOpFailureDetector extends FailureDetector { + + def isAvailable(connection: InetSocketAddress): Boolean = true + + def recordSuccess(connection: InetSocketAddress, timestamp: Long) {} + + def recordFailure(connection: InetSocketAddress, timestamp: Long) {} + + def notify(event: RemoteLifeCycleEvent) {} +} + +/** + * Simple failure detector that removes the failing connection permanently on first error. + */ +class RemoveConnectionOnFirstFailureFailureDetector extends FailureDetector { + + protected case class State(version: Long, banned: Set[InetSocketAddress]) + + protected val state: AtomicReference[State] = new AtomicReference[State](newState()) + + protected def newState() = State(Long.MinValue, Set.empty[InetSocketAddress]) + + def isAvailable(connectionAddress: InetSocketAddress): Boolean = state.get.banned.contains(connectionAddress) + + final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} + + @tailrec + final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { + val oldState = state.get + if (!oldState.banned.contains(connectionAddress)) { + val newBannedConnections = oldState.banned + connectionAddress + val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) + if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) + } + } + + // NetworkEventStream.Listener callback + def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientError(cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientDisconnected(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientShutdown(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case _ ⇒ {} + } +} + +/** + * Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection + * again after the ban period have expired. + * + * @author Jonas Bonér + */ +class BannagePeriodFailureDetector(timeToBan: Duration) extends FailureDetector with NetworkEventStream.Listener { + + // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired + + protected case class State(version: Long, banned: Map[InetSocketAddress, BannedConnection]) + + protected val state: AtomicReference[State] = new AtomicReference[State](newState()) + + case class BannedConnection(bannedSince: Long, address: InetSocketAddress) + + val timeToBanInMillis = timeToBan.toMillis + + protected def newState() = State(Long.MinValue, Map.empty[InetSocketAddress, BannedConnection]) + + private def bannedConnections = state.get.banned + + def isAvailable(connectionAddress: InetSocketAddress): Boolean = bannedConnections.get(connectionAddress).isEmpty + + @tailrec + final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) { + val oldState = state.get + val bannedConnection = oldState.banned.get(connectionAddress) + + if (bannedConnection.isDefined) { // is it banned or not? + val BannedConnection(bannedSince, banned) = bannedConnection.get + val currentlyBannedFor = newTimestamp - bannedSince + + if (currentlyBannedFor > timeToBanInMillis) { + val newBannedConnections = oldState.banned - connectionAddress + + val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) + + if (!state.compareAndSet(oldState, newState)) recordSuccess(connectionAddress, timestamp) + } + } + } + + @tailrec + final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { + val oldState = state.get + val connection = oldState.banned.get(connectionAddress) + + if (connection.isEmpty) { // is it already banned or not? + val bannedConnection = BannedConnection(timestamp, connectionAddress) + val newBannedConnections = oldState.banned + (connectionAddress -> bannedConnection) + + val newState = oldState copy (version = oldState.version + 1, banned = newBannedConnections) + + if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) + } + } + + // NetworkEventStream.Listener callback + def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientStarted(client, connectionAddress) ⇒ + recordSuccess(connectionAddress, newTimestamp) + + case RemoteClientConnected(client, connectionAddress) ⇒ + recordSuccess(connectionAddress, newTimestamp) + + case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientError(cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientDisconnected(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case RemoteClientShutdown(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case _ ⇒ {} + } +} + +/** + * Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections. + * + * class CircuitBreakerNetworkEventStream.Listener(initialConnections: Map[InetSocketAddress, ActorRef]) + * extends RemoteConnectionManager(initialConnections) { + * + * def newState() = State(Long.MinValue, initialConnections, None) + * + * def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined + * + * def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} + * + * def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} + * + * // FIXME implement CircuitBreakerNetworkEventStream.Listener + * } + */ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c51fbabc91..f5cb3ba18b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -6,9 +6,8 @@ package akka.remote import akka.actor._ import akka.routing._ -import DeploymentConfig._ -import Actor._ -import Status._ +import akka.actor.Actor._ +import akka.actor.Status._ import akka.event.EventHandler import akka.util.duration._ import akka.config.ConfigurationException @@ -33,8 +32,7 @@ class RemoteActorRefProvider extends ActorRefProvider { import akka.dispatch.Promise private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] - - private val failureDetector = new BannagePeriodFailureDetector(timeToBan = 60 seconds) // FIXME make timeToBan configurable + private val remoteDaemonConnectionManager = new RemoteConnectionManager(failureDetector = new BannagePeriodFailureDetector(60 seconds)) // FIXME make timeout configurable def actorOf(props: Props, address: String): Option[ActorRef] = { Address.validate(address) @@ -45,7 +43,14 @@ class RemoteActorRefProvider extends ActorRefProvider { if (oldFuture eq null) { // we won the race -- create the actor and resolve the future val actor = try { Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(remoteAddresses))) ⇒ + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + + val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { + case FailureDetectorType.NoOpFailureDetector ⇒ new NoOpFailureDetector + case FailureDetectorType.RemoveConnectionOnFirstFailureFailureDetector ⇒ new RemoveConnectionOnFirstFailureFailureDetector + case FailureDetectorType.BannagePeriodFailureDetector(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) + case FailureDetectorType.CustomFailureDetector(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) + } val thisHostname = Remote.address.getHostName val thisPort = Remote.address.getPort @@ -60,8 +65,7 @@ class RemoteActorRefProvider extends ActorRefProvider { } else { // we are on the single "reference" node uses the remote actors on the replica nodes - val routerType = DeploymentConfig.routerTypeFor(router) - val routerFactory: () ⇒ Router = routerType match { + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { case RouterType.Direct ⇒ if (remoteAddresses.size != 1) throw new ConfigurationException( "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" @@ -80,23 +84,31 @@ class RemoteActorRefProvider extends ActorRefProvider { .format(address, remoteAddresses.mkString(", "))) () ⇒ new RoundRobinRouter + case RouterType.ScatterGather ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new ScatterGatherFirstCompletedRouter + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") case RouterType.Custom ⇒ sys.error("Router Custom not supported yet") } - def provisionActorToNode(remoteAddress: RemoteAddress): RemoteActorRef = { + var connections = Map.empty[InetSocketAddress, ActorRef] + remoteAddresses foreach { remoteAddress: DeploymentConfig.RemoteAddress ⇒ val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) - useActorOnNode(inetSocketAddress, address, props.creator) - RemoteActorRef(inetSocketAddress, address, Actor.TIMEOUT, None) + connections += (inetSocketAddress -> RemoteActorRef(inetSocketAddress, address, Actor.TIMEOUT, None)) } - val connections: Iterable[ActorRef] = remoteAddresses map { provisionActorToNode(_) } + val connectionManager = new RemoteConnectionManager(connections, failureDetector) + + connections.keys foreach { useActorOnNode(_, address, props.creator) } Some(Routing.actorOf(RoutedProps( routerFactory = routerFactory, - connections = connections))) + connectionManager = connectionManager))) } case deploy ⇒ None // non-remote actor @@ -149,7 +161,7 @@ class RemoteActorRefProvider extends ActorRefProvider { Remote.remoteDaemonServiceName, remoteAddress.getHostName, remoteAddress.getPort) // try to get the connection for the remote address, if not already there then create it - val connection = failureDetector.putIfAbsent(remoteAddress, connectionFactory) + val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory) sendCommandToRemoteNode(connection, command, withACK = true) // ensure we get an ACK on the USE command } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala new file mode 100644 index 0000000000..3f552a0ef9 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import akka.actor._ +import akka.actor.Actor._ +import akka.routing._ +import akka.event.EventHandler + +import scala.collection.immutable.Map +import scala.annotation.tailrec + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference + +/** + * Remote connection manager, manages remote connections, e.g. RemoteActorRef's. + * + * @author Jonas Bonér + */ +class RemoteConnectionManager( + initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], + failureDetector: FailureDetector = new NoOpFailureDetector) + extends ConnectionManager { + + case class State(version: Long, connections: Map[InetSocketAddress, ActorRef]) + extends VersionedIterable[ActorRef] { + def iterable: Iterable[ActorRef] = connections.values + } + + private val state: AtomicReference[State] = new AtomicReference[State](newState()) + + // register all initial connections - e.g listen to events from them + initialConnections.keys foreach (NetworkEventStream.register(failureDetector, _)) + + /** + * This method is using the FailureDetector to filter out connections that are considered not available. + */ + private def filterAvailableConnections(current: State): State = { + val availableConnections = current.connections filter { entry ⇒ failureDetector.isAvailable(entry._1) } + current copy (version = current.version, connections = availableConnections) + } + + private def newState() = State(Long.MinValue, initialConnections) + + def version: Long = state.get.version + + def connections = filterAvailableConnections(state.get) + + def size: Int = connections.connections.size + + def shutdown() { + state.get.iterable foreach (_.stop()) // shut down all remote connections + } + + @tailrec + final def failOver(from: InetSocketAddress, to: InetSocketAddress) { + EventHandler.debug(this, "Failing over connection from [%s] to [%s]".format(from, to)) + + val oldState = state.get + var changed = false + + val newMap = oldState.connections map { + case (`from`, actorRef) ⇒ + changed = true + //actorRef.stop() + (to, newConnection(actorRef.address, to)) + case other ⇒ other + } + + if (changed) { + //there was a state change, so we are now going to update the state. + val newState = oldState copy (version = oldState.version + 1, connections = newMap) + + //if we are not able to update, the state, we are going to try again. + if (!state.compareAndSet(oldState, newState)) { + failOver(from, to) // recur + } + } + } + + @tailrec + final def remove(faultyConnection: ActorRef) { + + val oldState = state.get() + var changed = false + + var faultyAddress: InetSocketAddress = null + var newConnections = Map.empty[InetSocketAddress, ActorRef] + + oldState.connections.keys foreach { address ⇒ + val actorRef: ActorRef = oldState.connections.get(address).get + if (actorRef ne faultyConnection) { + newConnections = newConnections + ((address, actorRef)) + } else { + faultyAddress = address + changed = true + } + } + + if (changed) { + //one or more occurrances of the actorRef were removed, so we need to update the state. + val newState = oldState copy (version = oldState.version + 1, connections = newConnections) + + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) { + remove(faultyConnection) // recur + } else { + EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) + NetworkEventStream.unregister(failureDetector, faultyAddress) // unregister the connections - e.g stop listen to events from it + } + } + } + + @tailrec + final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + + val oldState = state.get() + val oldConnections = oldState.connections + + oldConnections.get(address) match { + case Some(connection) ⇒ connection // we already had the connection, return it + case None ⇒ // we need to create it + val newConnection = newConnectionFactory() + val newConnections = oldConnections + (address -> newConnection) + + //one or more occurrances of the actorRef were removed, so we need to update the state. + val newState = oldState copy (version = oldState.version + 1, connections = newConnections) + + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) { + // we failed, need compensating action + newConnection.stop() // stop the new connection actor and try again + putIfAbsent(address, newConnectionFactory) // recur + } else { + // we succeeded + EventHandler.debug(this, "Adding connection [%s]".format(address)) + NetworkEventStream.register(failureDetector, address) // register the connection - e.g listen to events from it + newConnection // return new connection actor + } + } + } + + private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { + RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala deleted file mode 100644 index 02601be601..0000000000 --- a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.remote - -import akka.actor._ -import Actor._ -import akka.routing._ -import akka.dispatch.PinnedDispatcher -import akka.event.EventHandler -import akka.util.{ ListenerManagement, Duration } - -import scala.collection.immutable.Map -import scala.collection.mutable -import scala.annotation.tailrec - -import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference -import System.{ currentTimeMillis ⇒ newTimestamp } - -/** - * Base class for remote failure detection management. - * - * @author Jonas Bonér - */ -abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) - extends FailureDetector - with NetworkEventStream.Listener { - - type T <: AnyRef - - protected case class State( - version: Long, - connections: Map[InetSocketAddress, ActorRef], - meta: T = null.asInstanceOf[T]) - extends VersionedIterable[ActorRef] { - def iterable: Iterable[ActorRef] = connections.values - } - - protected val state: AtomicReference[State] = new AtomicReference[State](newState()) - - // register all initial connections - e.g listen to events from them - initialConnections.keys foreach (NetworkEventStream.register(this, _)) - - /** - * State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field. - */ - protected def newState(): State - - /** - * Returns true if the 'connection' is considered available. - * - * To be implemented by subclass. - */ - def isAvailable(connectionAddress: InetSocketAddress): Boolean - - /** - * Records a successful connection. - * - * To be implemented by subclass. - */ - def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) - - /** - * Records a failed connection. - * - * To be implemented by subclass. - */ - def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) - - def version: Long = state.get.version - - def versionedIterable = state.get - - def size: Int = state.get.connections.size - - def connections: Map[InetSocketAddress, ActorRef] = state.get.connections - - def stopAll() { - state.get.iterable foreach (_.stop()) // shut down all remote connections - } - - @tailrec - final def failOver(from: InetSocketAddress, to: InetSocketAddress) { - EventHandler.debug(this, "RemoteFailureDetector failover from [%s] to [%s]".format(from, to)) - - val oldState = state.get - var changed = false - - val newMap = oldState.connections map { - case (`from`, actorRef) ⇒ - changed = true - //actorRef.stop() - (to, newConnection(actorRef.address, to)) - case other ⇒ other - } - - if (changed) { - //there was a state change, so we are now going to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newMap) - - //if we are not able to update, the state, we are going to try again. - if (!state.compareAndSet(oldState, newState)) { - failOver(from, to) // recur - } - } - } - - @tailrec - final def remove(faultyConnection: ActorRef) { - - val oldState = state.get() - var changed = false - - var faultyAddress: InetSocketAddress = null - var newConnections = Map.empty[InetSocketAddress, ActorRef] - - oldState.connections.keys foreach { address ⇒ - val actorRef: ActorRef = oldState.connections.get(address).get - if (actorRef ne faultyConnection) { - newConnections = newConnections + ((address, actorRef)) - } else { - faultyAddress = address - changed = true - } - } - - if (changed) { - //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newConnections) - - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) { - remove(faultyConnection) // recur - } else { - EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) - NetworkEventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it - } - } - } - - @tailrec - final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { - - val oldState = state.get() - val oldConnections = oldState.connections - - oldConnections.get(address) match { - case Some(connection) ⇒ connection // we already had the connection, return it - case None ⇒ // we need to create it - val newConnection = newConnectionFactory() - val newConnections = oldConnections + (address -> newConnection) - - //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = oldState copy (version = oldState.version + 1, connections = newConnections) - - //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) { - // we failed, need compensating action - newConnection.stop() // stop the new connection actor and try again - putIfAbsent(address, newConnectionFactory) // recur - } else { - // we succeeded - EventHandler.debug(this, "Adding connection [%s]".format(address)) - NetworkEventStream.register(this, address) // register the connection - e.g listen to events from it - newConnection // return new connection actor - } - } - } - - private[remote] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { - RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) - } -} - -/** - * Simple failure detector that removes the failing connection permanently on first error. - */ -class RemoveConnectionOnFirstFailureRemoteFailureDetector( - initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(initialConnections) { - - protected def newState() = State(Long.MinValue, initialConnections) - - def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined - - def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} - - def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} - - def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - removeConnection(connectionAddress) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - removeConnection(connectionAddress) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - removeConnection(connectionAddress) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - removeConnection(connectionAddress) - - case _ ⇒ {} - } - - private def removeConnection(connectionAddress: InetSocketAddress) = - connections.get(connectionAddress) foreach { conn ⇒ remove(conn) } -} - -/** - * Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection - * again after the ban period have expired. - * - * @author Jonas Bonér - */ -class BannagePeriodFailureDetector( - initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], - timeToBan: Duration) - extends RemoteFailureDetectorBase(initialConnections) { - - // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired - - type T = Map[InetSocketAddress, BannedConnection] - - case class BannedConnection(bannedSince: Long, connection: ActorRef) - - val timeToBanInMillis = timeToBan.toMillis - - protected def newState() = - State(Long.MinValue, initialConnections, Map.empty[InetSocketAddress, BannedConnection]) - - private def removeConnection(connectionAddress: InetSocketAddress) = - connections.get(connectionAddress) foreach { conn ⇒ remove(conn) } - - // =================================================================================== - // FailureDetector callbacks - // =================================================================================== - - def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined - - @tailrec - final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) { - val oldState = state.get - val bannedConnection = oldState.meta.get(connectionAddress) - - if (bannedConnection.isDefined) { - val BannedConnection(bannedSince, connection) = bannedConnection.get - val currentlyBannedFor = newTimestamp - bannedSince - - if (currentlyBannedFor > timeToBanInMillis) { - // ban time has expired - add connection to available connections - val newConnections = oldState.connections + (connectionAddress -> connection) - val newBannedConnections = oldState.meta - connectionAddress - - val newState = oldState copy (version = oldState.version + 1, - connections = newConnections, - meta = newBannedConnections) - - if (!state.compareAndSet(oldState, newState)) recordSuccess(connectionAddress, timestamp) - } - } - } - - @tailrec - final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) { - val oldState = state.get - val connection = oldState.connections.get(connectionAddress) - - if (connection.isDefined) { - val newConnections = oldState.connections - connectionAddress - val bannedConnection = BannedConnection(timestamp, connection.get) - val newBannedConnections = oldState.meta + (connectionAddress -> bannedConnection) - - val newState = oldState copy (version = oldState.version + 1, - connections = newConnections, - meta = newBannedConnections) - - if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp) - } - } - - // =================================================================================== - // NetworkEventStream.Listener callback - // =================================================================================== - - def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientStarted(client, connectionAddress) ⇒ - recordSuccess(connectionAddress, newTimestamp) - - case RemoteClientConnected(client, connectionAddress) ⇒ - recordSuccess(connectionAddress, newTimestamp) - - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - recordFailure(connectionAddress, newTimestamp) - - case _ ⇒ {} - } -} - -/** - * Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections. - * - * class CircuitBreakerNetworkEventStream.Listener(initialConnections: Map[InetSocketAddress, ActorRef]) - * extends RemoteFailureDetectorBase(initialConnections) { - * - * def newState() = State(Long.MinValue, initialConnections, None) - * - * def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined - * - * def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {} - * - * def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} - * - * // FIXME implement CircuitBreakerNetworkEventStream.Listener - * } - */ - -/** - * Base trait for remote failure event listener. - */ -trait RemoteFailureListener { - - final private[akka] def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientStarted(client, connectionAddress) ⇒ - remoteClientStarted(client, connectionAddress) - - case RemoteClientConnected(client, connectionAddress) ⇒ - remoteClientConnected(client, connectionAddress) - - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - remoteClientWriteFailed(request, cause, client, connectionAddress) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - remoteClientError(cause, client, connectionAddress) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - remoteClientDisconnected(client, connectionAddress) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - remoteClientShutdown(client, connectionAddress) - - case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ - remoteServerWriteFailed(request, cause, server, clientAddress) - - case RemoteServerError(cause, server) ⇒ - remoteServerError(cause, server) - - case RemoteServerShutdown(server) ⇒ - remoteServerShutdown(server) - } - - def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteServerWriteFailed( - request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {} - - def remoteServerError(cause: Throwable, server: RemoteServerModule) {} - - def remoteServerShutdown(server: RemoteServerModule) {} -} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 888291e9cc..add91d8a82 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -934,7 +934,7 @@ class RemoteServerHandler( try { actor ! PoisonPill } catch { - case e: Exception ⇒ EventHandler.error(e, this, "Couldn't stop %s".format(actor)) + case e: Exception ⇒ EventHandler.error(e, this, "Couldn't stop [%s]".format(actor)) } } @@ -951,7 +951,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { event.getMessage match { case null ⇒ - throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) + throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]") case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ handleRemoteMessageProtocol(remote.getMessage, event.getChannel) @@ -1050,12 +1050,6 @@ class RemoteServerHandler( private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { val uuid = actorInfo.getUuid val address = actorInfo.getAddress - // val address = { - // // strip off clusterActorRefPrefix if needed - // val addr = actorInfo.getAddress - // if (addr.startsWith(Address.clusterActorRefPrefix)) addr.substring(addr.indexOf('.') + 1, addr.length) - // else addr - // } EventHandler.debug(this, "Looking up a remotely available actor for address [%s] on node [%s]" diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index b05e1e800f..70d7b09986 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -13,6 +13,7 @@ import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.routing.RoutedProps; import akka.routing.RouterType; +import akka.routing.LocalConnectionManager; import akka.routing.Routing; import akka.routing.Routing.Broadcast; import scala.collection.JavaConversions; @@ -109,7 +110,7 @@ public class Pi { workers.add(worker); } - router = Routing.actorOf(RoutedProps.apply().withRoundRobinRouter().withConnections(workers), "pi"); + router = Routing.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index eb5db541c9..98a3c87bd0 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -8,7 +8,7 @@ import akka.actor.{ Actor, PoisonPill } import Actor._ import java.util.concurrent.CountDownLatch import akka.routing.Routing.Broadcast -import akka.routing.{ RoutedProps, Routing } +import akka.routing._ object Pi extends App { @@ -58,7 +58,7 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withConnections(workers), "pi") + val router = Routing.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi") // message handler def receive = { diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 391feec26f..0c4c6dd0c6 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -11,6 +11,7 @@ import static java.util.Arrays.asList; import akka.routing.RoutedProps; import akka.routing.Routing; +import akka.routing.LocalConnectionManager; import scala.Option; import akka.actor.ActorRef; import akka.actor.Channel; @@ -103,7 +104,7 @@ public class Pi { workers.add(worker); } - router = Routing.actorOf(RoutedProps.apply().withConnections(workers).withRoundRobinRouter(), "pi"); + router = Routing.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi"); } @Override diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 5fd4559a06..83d0a1d2ff 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -9,7 +9,7 @@ import akka.event.EventHandler import System.{ currentTimeMillis ⇒ now } import akka.routing.Routing.Broadcast import akka.actor.{ Timeout, Channel, Actor, PoisonPill } -import akka.routing.{ RoutedProps, Routing } +import akka.routing._ object Pi extends App { @@ -53,7 +53,9 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) // wrap them with a load-balancing router - val router = Routing.actorOf(RoutedProps().withConnections(workers).withRoundRobinRouter, "pi") + val router = Routing.actorOf(RoutedProps( + routerFactory = () ⇒ new RoundRobinRouter, + connectionManager = new LocalConnectionManager(workers)), "pi") // phase 1, can accept a Calculate message def scatter: Receive = { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index d4fb35bae9..0a5fcb9fa8 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -63,8 +63,8 @@ akka { service-ping { # deployment id pattern - router = "least-cpu" # routing (load-balance) scheme to use - # available: "direct", "round-robin", "random", + router = "round-robin" # routing (load-balance) scheme to use + # available: "direct", "round-robin", "random", "scatter-gather" # "least-cpu", "least-ram", "least-messages" # or: fully qualified class name of the router class # default is "direct"; @@ -76,7 +76,7 @@ akka { # if the "direct" router is used then this element is ignored (always '1') failure-detector { # failure detection scheme to use - bannage-period { # available: remove-connection-on-first-local-failure {} + bannage-period { # available: no-op {} time-to-ban = 10 # remove-connection-on-first-failure {} } # bannage-period { ... }