diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala index a6ba651279..a1d7607b6b 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket1111Spec.scala @@ -18,25 +18,30 @@ class Ticket1111Spec extends WordSpec with MustMatchers { val shutdownLatch = new CountDownLatch(1) - val actor = Routing.actorOf("foo", List(newActor(0, Some(shutdownLatch)), - newActor(1, Some(shutdownLatch))), - new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props) actor ! Broadcast(Stop(Some(0))) shutdownLatch.await(5, TimeUnit.SECONDS) must be(true) (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } "throw an exception, if all the connections have stopped" in { val shutdownLatch = new CountDownLatch(2) - val actor = Routing.actorOf("foo", List(newActor(0, Some(shutdownLatch)), - newActor(1, Some(shutdownLatch))), - new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch)))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props) actor ! Broadcast(Stop()) @@ -50,35 +55,48 @@ class Ticket1111Spec extends WordSpec with MustMatchers { "return the first response from connections, when all of them replied" in { - val actor = Routing.actorOf("foo", List(newActor(0), newActor(1)), - new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(newActor(0), newActor(1))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props) (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) } "return the first response from connections, when some of them failed to reply" in { + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(newActor(0), newActor(1))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) - val actor = Routing.actorOf("foo", List(newActor(0), newActor(1)), - new ScatterGatherFirstCompletedRouter()).start() + val actor = Routing.actorOf(props) (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) } "be started when constructed" in { - - val actor = Routing.actorOf("foo", List(newActor(0)), - new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(newActor(0))) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + val actor = Routing.actorOf(props) actor.isRunning must be(true) } "throw IllegalArgumentException at construction when no connections" in { + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List()) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + try { - Routing.actorOf("foo", List(), - new ScatterGatherFirstCompletedRouter()).start() + Routing.actorOf(props) fail() } catch { case e: IllegalArgumentException ⇒ @@ -104,7 +122,12 @@ class Ticket1111Spec extends WordSpec with MustMatchers { connections = connections :+ connection } - val actor = Routing.actorOf("foo", connections, new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(connections) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props) for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -141,7 +164,12 @@ class Ticket1111Spec extends WordSpec with MustMatchers { } }).start() - val actor = Routing.actorOf("foo", List(connection1, connection2), new ScatterGatherFirstCompletedRouter()).start() + val props = RoutedProps.apply() + .withDeployId("foo") + .withConnections(List(connection1, connection2)) + .withRouter(() ⇒ new ScatterGatherFirstCompletedRouter()) + + val actor = Routing.actorOf(props) actor ! Broadcast(1) actor ! Broadcast("end") diff --git a/akka-actor/src/main/java/akka/routing/RouterFactory.java b/akka-actor/src/main/java/akka/routing/RouterFactory.java new file mode 100644 index 0000000000..ad4d719930 --- /dev/null +++ b/akka-actor/src/main/java/akka/routing/RouterFactory.java @@ -0,0 +1,15 @@ +package akka.routing; + +/** + * A Factory responsible for creating {@link Router} instances. It makes Java compatability possible for users that + * want to provide their own router instance. + */ +public interface RouterFactory { + + /** + * Creates a new Router instance. + * + * @return the newly created Router instance. + */ + Router newRouter(); +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 57c837e0c7..c7cfe98655 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -17,7 +17,6 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Map ⇒ JMap } -import scala.reflect.BeanProperty import scala.collection.immutable.Stack import scala.annotation.tailrec import java.lang.{ UnsupportedOperationException, IllegalStateException } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 7bc99c6d2e..638b4a7a0f 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,18 +7,14 @@ package akka.routing import annotation.tailrec import akka.AkkaException -import akka.dispatch.Future import akka.actor._ -import akka.dispatch.Futures import akka.event.EventHandler import akka.actor.UntypedChannel._ import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } - -/** - * An {@link AkkaException} thrown when something goes wrong while routing a message - */ -class RoutingException(message: String) extends AkkaException(message) +import akka.dispatch.{ Future, Futures } +import akka.util.ReflectiveAccess +import collection.JavaConversions.iterableAsScalaIterable sealed trait RouterType @@ -65,6 +61,116 @@ object RouterType { } +object RoutedProps { + + final val defaultTimeout = Actor.TIMEOUT + final val defaultRouterFactory = () ⇒ new RoundRobinRouter + final val defaultDeployId = "" + final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled + + /** + * The default RoutedProps instance, uses the settings from the RoutedProps object starting with default* + */ + final val default = new RoutedProps() + + def apply(): RoutedProps = default +} + +/** + * Contains the configuration to create local and clustered routed actor references. + * + * Routed ActorRef configuration object, this is thread safe and fully sharable. + * + * Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing + * (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns + * a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple + * threads safe. + * + * This configuration object makes it possible to either + */ +case class RoutedProps(routerFactory: () ⇒ Router, deployId: String, connections: Iterable[ActorRef], timeout: Timeout, localOnly: Boolean) { + + def this() = this( + routerFactory = RoutedProps.defaultRouterFactory, + deployId = RoutedProps.defaultDeployId, + connections = List(), + timeout = RoutedProps.defaultTimeout, + localOnly = RoutedProps.defaultLocalOnly) + + /** + * Returns a new RoutedProps with the specified deployId set + * + * Java and Scala API + */ + def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id) + + /** + * Returns a new RoutedProps configured with a random router. + * + * Java and Scala API. + */ + def withRandomRouter(): RoutedProps = copy(routerFactory = () ⇒ new RandomRouter()) + + /** + * Returns a new RoutedProps configured with a round robin router. + * + * Java and Scala API. + */ + def withRoundRobinRouter(): RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter()) + + /** + * Returns a new RoutedProps configured with a direct router. + * + * Java and Scala API. + */ + def withDirectRouter(): RoutedProps = copy(routerFactory = () ⇒ new DirectRouter()) + + /** + * Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created. + * In some cases you just want to have local actor references, even though the Cluster Module is up and running. + * + * Java and Scala API. + */ + def withLocalOnly(l: Boolean = true) = copy(localOnly = l) + + /** + * Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new + * Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new + * instances. + * + * Scala API. + */ + def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f) + + /** + * Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new + * Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new + * instances. + * + * Java API. + */ + def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter()) + + /** + * + */ + def withTimeout(t: Timeout): RoutedProps = copy(timeout = t) + + /** + * Sets the connections to use. + * + * Scala API. + */ + def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c) + + /** + * Sets the connections to use. + * + * Java API. + */ + def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c)) +} + /** * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the * {@link RouterConnections} and each Router should be linked to only one {@link RouterConnections}. @@ -102,7 +208,24 @@ trait Router { } /** + * An {@link AkkaException} thrown when something goes wrong while routing a message + */ +class RoutingException(message: String) extends AkkaException(message) + +/** + * The RouterConnection acts like a middleman between the Router and the actor reference that does the routing. + * Through the RouterConnection: + *
    + *
  1. + * the actor ref can signal that something has changed in the known set of connections. The Router can see + * when a changed happened (by checking the version) and update its internal datastructures. + *
  2. + *
  3. + * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. + *
  4. + *
* + * It is very likely that the implementation of the RouterConnection will be part of the ActorRef itself. */ trait RouterConnections { @@ -113,17 +236,20 @@ trait RouterConnections { def version: Long /** - * Returns the number of connections. + * Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily) + * with an atomic read of and size and version. */ def size: Int /** - * Returns a tuple containing the version and Iterable of all connected ActorRefs this Router uses to send messages to. + * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is + * the time element, also the version is included to be able to read the data (the connections) and the version + * in an atomic manner. * - * This iterator should be 'persistent'. So it can be handed out to other threads so that they are working on - * a stable (immutable) view of some set of connections. + * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable) + * view of some set of connections. */ - def versionedIterator: (Long, Iterable[ActorRef]) + def versionedIterable: VersionedIterable[ActorRef] /** * A callback that can be used to indicate that a connected actorRef was dead. @@ -135,19 +261,51 @@ trait RouterConnections { * * It could be that even after a signalDeadActor has been called for a specific ActorRef, that the ActorRef * is still being used. A good behaving Router will eventually discard this reference, but no guarantees are - * made how long this takes place. + * made how long this takes. * * @param ref the dead */ def signalDeadActor(deadRef: ActorRef): Unit } +/** + * An Iterable that also contains a version. + */ +case class VersionedIterable[A](version: Long, val iterable: Iterable[A]) + +/** + * A Helper class to create actor references that use routing. + */ object Routing { sealed trait RoutingMessage case class Broadcast(message: Any) extends RoutingMessage + /** + * todo: will very likely be moved to the ActorRef. + */ + def actorOf(props: RoutedProps): ActorRef = { + //TODO Implement support for configuring by deployment ID etc + //TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor + //TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?) + //TODO If the actor deployed uses a different config, then ignore or throw exception? + + val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled + val localOnly = props.localOnly + + if (!localOnly && !clusteringEnabled) + throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled") + else if (clusteringEnabled && !props.localOnly) { + ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start() + } else { + if (props.connections.isEmpty) + throw new IllegalArgumentException("A routed actorRef can't have an empty connection set") + + new RoutedActorRef(props).start() + } + } + /** * Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors. * @@ -157,6 +315,7 @@ object Routing { * @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation * how many connections it can handle. */ + @deprecated def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = { val router = routerType match { case RouterType.Direct if connections.size > 1 ⇒ @@ -171,26 +330,22 @@ object Routing { throw new IllegalArgumentException("Unsupported routerType " + r) } - actorOf(actorAddress, connections, router).start() - } - - def actorOf(actorAddress: String, connections: Iterable[ActorRef], router: Router): ActorRef = - if (connections.isEmpty) + if (connections.size == 0) throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required") - else - new RoutedActorRef(actorAddress, router, connections) - def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef = - actorOf(actorAddress, connections, akka.routing.RouterType.RoundRobin) + val props = new RoutedProps(() ⇒ router, actorAddress, connections, RoutedProps.defaultTimeout, true) + new RoutedActorRef(props).start() + } } /** - * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to - * on (or more) of these actors. + * An Abstract convenience implementation for building an ActorReference that uses a Router. */ -class RoutedActorRef(val address: String, val router: Router, val connectionIterator: Iterable[ActorRef]) extends UnsupportedActorRef { +abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef { - router.init(new RoutedActorRefConnections(connectionIterator)) + val router = props.routerFactory.apply() + + def address = props.deployId override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { val sender = channel match { @@ -208,6 +363,16 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter } router.route[Any](message, timeout)(sender) } +} + +/** + * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to + * on (or more) of these actors. + */ +private[akka] class RoutedActorRef(val routedProps: RoutedProps) + extends AbstractRoutedActorRef(routedProps) { + + router.init(new RoutedActorRefConnections(routedProps.connections)) def start(): this.type = synchronized[this.type] { if (_status == ActorRefInternals.UNSTARTED) @@ -220,49 +385,40 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter if (_status == ActorRefInternals.RUNNING) { _status = ActorRefInternals.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) - - // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket) - - //inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections } } } private class RoutedActorRefConnections() extends RouterConnections { - private val state = new AtomicReference[State]() + private val state = new AtomicReference[VersionedIterable[ActorRef]]() def this(connectionIterable: Iterable[ActorRef]) = { this() - state.set(new State(Long.MinValue, connectionIterable)) + state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable)) } def version: Long = state.get().version - def size: Int = state.get().connections.size + def size: Int = state.get().iterable.size - def versionedIterator = { - val s = state.get - (s.version, s.connections) - } + def versionedIterable = state.get @tailrec final def signalDeadActor(ref: ActorRef) = { val oldState = state.get() //remote the ref from the connections. - var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref) + var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref) - if (newList.size != oldState.connections.size) { + if (newList.size != oldState.iterable.size) { //one or more occurrences of the actorRef were removed, so we need to update the state. - val newState = new State(oldState.version + 1, newList) + val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList) //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) signalDeadActor(ref) } } - - case class State(val version: Long, val connections: Iterable[ActorRef]) } } @@ -285,7 +441,7 @@ trait BasicRouter extends Router { def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match { case Routing.Broadcast(message) ⇒ //it is a broadcast message, we are going to send to message to all connections. - connections.versionedIterator._2.foreach(actor ⇒ + connections.versionedIterable.iterable.foreach(actor ⇒ try { actor.!(message)(sender) } catch { @@ -338,7 +494,8 @@ trait BasicRouter extends Router { } /** - * A DirectRouter is FIXME + * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef. + * * * @author Jonas Bonér */ @@ -361,13 +518,15 @@ class DirectRouter extends BasicRouter { } else { //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating. - val (version, connectionIterable) = connections.versionedIterator + val versionedIterable = connections.versionedIterable - if (connectionIterable.size > 1) - throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionIterable.size)) + val connectionCount = versionedIterable.iterable.size + if (connectionCount > 1) + throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount)) - val newState = new DirectRouterState(connectionIterable.head, version) - if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use + val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version) + if (state.compareAndSet(currentState, newState)) + //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. getState() @@ -404,10 +563,11 @@ class RandomRouter extends BasicRouter { currentState } else { //there has been a change in connections, or it was the first try, so we need to update the internal state - val (version, connectionIterable) = connections.versionedIterator - val newState = new RandomRouterState(connectionIterable.toIndexedSeq, version) - if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use + val versionedIterable = connections.versionedIterable + val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version) + if (state.compareAndSet(currentState, newState)) + //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. getState() @@ -437,10 +597,11 @@ class RoundRobinRouter extends BasicRouter { currentState } else { //there has been a change in connections, or it was the first try, so we need to update the internal state - val (version, connectionIterable) = connections.versionedIterator - val newState = new RoundRobinState(connectionIterable.toIndexedSeq, version) - if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use + val versionedIterable = connections.versionedIterable + val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version) + if (state.compareAndSet(currentState, newState)) + //we are lucky since we just updated the state, so we can send it back as the state to use newState else //we failed to update the state, lets try again... better luck next time. getState() @@ -467,10 +628,10 @@ class RoundRobinRouter extends BasicRouter { /* * ScatterGatherRouter broadcasts the message to all connections and gathers results according to the * specified strategy (specific router needs to implement `gather` method). - * Scatter-gather pattern will be applied only to the messages broadcasted using Future + * Scatter-gather pattern will be applied only to the messages broadcasted using Future * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget - * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type - * + * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type + * * FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected. * FIXME: this is also the location where message buffering should be done in case of failure. */ @@ -483,7 +644,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = { - val responses = connections.versionedIterator._2.flatMap { actor ⇒ + val responses = connections.versionedIterable.iterable.flatMap { actor ⇒ try { Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]]) } catch { diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index f7a2f2a249..fb43966ee9 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -13,6 +13,7 @@ import akka.event.EventHandler import akka.cluster.ClusterNode import java.net.InetSocketAddress +import akka.routing.{ RoutedProps, Router } /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -33,6 +34,21 @@ object ReflectiveAccess { object ClusterModule { lazy val isEnabled = Config.isClusterEnabled //&& clusterInstance.isDefined + lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match { + case Left(e) ⇒ throw e + case Right(b) ⇒ b + } + + def newClusteredActorRef(props: RoutedProps): ActorRef = { + val params: Array[Class[_]] = Array(classOf[RoutedProps]) + val args: Array[AnyRef] = Array(props) + + createInstance(clusterRefClass, params, args) match { + case Left(e) ⇒ throw e + case Right(b) ⇒ b.asInstanceOf[ActorRef] + } + } + def ensureEnabled() { if (!isEnabled) { val e = new ModuleNotAvailableException( diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b4af807a69..863f63a87c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -340,7 +340,7 @@ class DefaultClusterNode private[akka] ( private val changeListeners = new CopyOnWriteArrayList[ChangeListener]() // Address -> ClusterActorRef - private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef] + private[akka] val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef] case class VersionedConnectionState(version: Long, connections: Map[String, Tuple2[InetSocketAddress, ActorRef]]) @@ -895,18 +895,7 @@ class DefaultClusterNode private[akka] ( /** * Creates an ActorRef with a Router to a set of clustered actors. */ - def ref(actorAddress: String, router: RouterType): ActorRef = { - val inetSocketAddresses = inetSocketAddressesForActor(actorAddress) - EventHandler.debug(this, - "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]" - .format(actorAddress, router, remoteServerAddress, inetSocketAddresses.map(_._2).mkString("\n\t"))) - - val actorRef = ClusterActorRef.newRef(router, inetSocketAddresses, actorAddress, Actor.TIMEOUT) - inetSocketAddresses foreach { - case (_, inetSocketAddress) ⇒ clusterActorRefs.put(inetSocketAddress, actorRef) - } - actorRef.start() - } + def ref(actorAddress: String, router: RouterType): ActorRef = ClusterActorRef.newRef(router, actorAddress, Actor.TIMEOUT) /** * Returns the UUIDs of all actors checked out on this node. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 6061b514db..1313e199a6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -26,19 +26,19 @@ import annotation.tailrec */ object ClusterActorRef { - def newRef( - routerType: RouterType, - inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], - actorAddress: String, - timeout: Long): ClusterActorRef = { - routerType match { - case Direct ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new DirectRouter()) - case Random ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new RandomRouter()) - case RoundRobin ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new RoundRobinRouter()) + def newRef(routerType: RouterType, actorAddress: String, timeout: Long): ClusterActorRef = { + + val routerFactory: () ⇒ Router = routerType match { + case Direct ⇒ () ⇒ new DirectRouter + case Random ⇒ () ⇒ new RandomRouter() + case RoundRobin ⇒ () ⇒ new RoundRobinRouter() case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") } + + val props = RoutedProps.apply().withDeployId(actorAddress).withTimeout(timeout).withRouter(routerFactory) + new ClusterActorRef(props).start() } /** @@ -53,40 +53,27 @@ object ClusterActorRef { * * @author Jonas Bonér */ -class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], - val address: String, - protected[akka] override val timeout: Long, - val router: Router) - extends UnsupportedActorRef { +private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedActorRef(props) { ClusterModule.ensureEnabled() - val connections = new ClusterActorRefConnections((Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) { + val addresses = Cluster.node.inetSocketAddressesForActor(address) + EventHandler.debug(this, + "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]" + .format(address, router, Cluster.node.remoteServerAddress, addresses.map(_._2).mkString("\n\t"))) + + addresses foreach { + case (_, address) ⇒ Cluster.node.clusterActorRefs.put(address, this) + } + + val connections = new ClusterActorRefConnections((Map[InetSocketAddress, ActorRef]() /: addresses) { case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress)) - }) + }, props.connections) router.init(connections) def connectionsSize(): Int = connections.size - override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { - val sender = channel match { - case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None - } - router.route(message)(sender) - } - - override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = { - val sender = channel match { - case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None - } - router.route[Any](message, timeout.duration.toMillis)(sender) - } - private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = { RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) } @@ -120,22 +107,19 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine private val state = new AtomicReference[State]() - def this(connectionMap: Map[InetSocketAddress, ActorRef]) = { + def this(clusteredConnections: Map[InetSocketAddress, ActorRef], explicitConnections: Iterable[ActorRef]) = { this() - state.set(new State(Long.MinValue, connectionMap)) + state.set(new State(Long.MinValue, clusteredConnections, explicitConnections)) } def version: Long = state.get().version - def versionedIterator = { - val s = state.get - (s.version, s.connections.values) - } + def versionedIterable = state.get - def size: Int = state.get().connections.size + def size(): Int = state.get().iterable.size def stopAll() { - state.get().connections.values foreach (_.stop()) // shut down all remote connections + state.get().clusteredConnections.values foreach (_.stop()) // shut down all remote connections } @tailrec @@ -143,18 +127,18 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine EventHandler.debug(this, "ClusterActorRef. %s failover from %s to %s".format(address, from, to)) val oldState = state.get - var change = false - val newMap = oldState.connections map { + var changed = false + val newMap = oldState.clusteredConnections map { case (`from`, actorRef) ⇒ - change = true - // actorRef.stop() + changed = true + //actorRef.stop() (to, createRemoteActorRef(actorRef.address, to)) case other ⇒ other } - if (change) { + if (changed) { //there was a state change, so we are now going to update the state. - val newState = new State(oldState.version + 1, newMap) + val newState = new State(oldState.version + 1, newMap, oldState.explicitConnections) //if we are not able to update, the state, we are going to try again. if (!state.compareAndSet(oldState, newState)) failOver(from, to) @@ -167,23 +151,42 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine val oldState = state.get() - //remote the ref from the connections. + var changed = false + + //remote the deadRef from the clustered-connections. var newConnections = Map[InetSocketAddress, ActorRef]() - oldState.connections.keys.foreach( + oldState.clusteredConnections.keys.foreach( address ⇒ { - val actorRef: ActorRef = oldState.connections.get(address).get - if (actorRef ne deadRef) newConnections = newConnections + ((address, actorRef)) + val actorRef: ActorRef = oldState.clusteredConnections.get(address).get + if (actorRef ne deadRef) { + newConnections = newConnections + ((address, actorRef)) + } else { + changed = true + } }) - if (newConnections.size != oldState.connections.size) { + //remove the deadRef also from the explicit connections. + var newExplicitConnections = oldState.explicitConnections.filter( + actorRef ⇒ + if (actorRef == deadRef) { + changed = true + false + } else { + true + }) + + if (changed) { //one or more occurrances of the actorRef were removed, so we need to update the state. - val newState = new State(oldState.version + 1, newConnections) + val newState = new State(oldState.version + 1, newConnections, newExplicitConnections) //if we are not able to update the state, we just try again. if (!state.compareAndSet(oldState, newState)) signalDeadActor(deadRef) } } - case class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef]) + class State(version: Long = Integer.MIN_VALUE, + val clusteredConnections: Map[InetSocketAddress, ActorRef], + val explicitConnections: Iterable[ActorRef]) + extends VersionedIterable[ActorRef](version, explicitConnections ++ clusteredConnections.values) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala deleted file mode 100644 index 29b45fee79..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ /dev/null @@ -1,6 +0,0 @@ -package akka.cluster - -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - diff --git a/akka-docs/disabled/examples/Pi.scala b/akka-docs/disabled/examples/Pi.scala index 41f8e88b9f..16c781bd4d 100644 --- a/akka-docs/disabled/examples/Pi.scala +++ b/akka-docs/disabled/examples/Pi.scala @@ -1,9 +1,9 @@ //#imports package akka.tutorial.scala.first +import _root_.akka.routing.{RoutedProps, Routing, CyclicIterator} import akka.actor.{Actor, PoisonPill} import Actor._ -import akka.routing.{Routing, CyclicIterator} import Routing._ import System.{currentTimeMillis => now} @@ -65,7 +65,14 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) // wrap them with a load-balancing router - val router = Routing.loadBalancerActor(CyclicIterator(workers)).start() + val router = Routing.actorOf( + RoutedProps.apply + .withRoundRobinRouter + .withConnections(workers) + .withDeployId("pi") + ) + + loadBalancerActor(CyclicIterator(workers)).start() //#create-workers //#master-receive diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala index 39c91a06df..cdcee916f9 100644 --- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala +++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala @@ -1,5 +1,6 @@ package sample.camel +import _root_.akka.routing.{ RoutedProps, Routing } import collection.mutable.Set import java.util.concurrent.CountDownLatch @@ -11,8 +12,6 @@ import akka.actor.Actor._ import akka.actor.{ ActorRegistry, ActorRef, Actor } import akka.camel._ import akka.camel.CamelServiceManager._ -import akka.routing.Routing - /** * @author Martin Krasser */ @@ -50,7 +49,8 @@ object HttpConcurrencyTestStress { startCamelService val workers = for (i ← 1 to 8) yield actorOf[HttpServerWorker].start - val balancer = Routing.actorOfWithRoundRobin("loadbalancer", workers) + val balancer = Routing.actorOf( + RoutedProps.apply.withRoundRobinRouter.withConnections(workers).withDeployId("loadbalancer")) //service.get.awaitEndpointActivation(1) { // actorOf(new HttpServerActor(balancer)).start //} diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index b0f973d985..ce1c0dd1f7 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -11,6 +11,7 @@ import static java.util.Arrays.asList; import akka.actor.ActorRef; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; +import akka.routing.RoutedProps; import akka.routing.RouterType; import akka.routing.Routing; import akka.routing.Routing.Broadcast; @@ -108,7 +109,12 @@ public class Pi { workers.add(worker); } - router = Routing.actorOfWithRoundRobin("pi", JavaConversions.collectionAsScalaIterable(workers)); + router = Routing.actorOf( + RoutedProps.apply() + .withRoundRobinRouter() + .withConnections(workers) + .withDeployId("pi") + ); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index b97e22f5a0..2603d4e5a3 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -8,7 +8,7 @@ import akka.actor.{ Actor, PoisonPill } import Actor._ import java.util.concurrent.CountDownLatch import akka.routing.Routing.Broadcast -import akka.routing.Routing +import akka.routing.{ RoutedProps, Routing } object Pi extends App { @@ -18,8 +18,11 @@ object Pi extends App { // ===== Messages ===== // ==================== sealed trait PiMessage + case object Calculate extends PiMessage + case class Work(start: Int, nrOfElements: Int) extends PiMessage + case class Result(value: Double) extends PiMessage // ================== @@ -55,7 +58,11 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) // wrap them with a load-balancing router - val router = Routing.actorOfWithRoundRobin("pi", workers) + val router = Routing.actorOf( + RoutedProps.default + .withRoundRobinRouter() + .withConnections(workers) + .withDeployId("pi")) // message handler def receive = { diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java index 98cd30353d..1a6986c22c 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java +++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java @@ -9,6 +9,7 @@ import static akka.actor.Actors.poisonPill; import static java.lang.System.currentTimeMillis; import static java.util.Arrays.asList; +import akka.routing.RoutedProps; import akka.routing.Routing; import scala.Option; import akka.actor.ActorRef; @@ -102,7 +103,12 @@ public class Pi { workers.add(worker); } - router = Routing.actorOfWithRoundRobin("pi", JavaConversions.collectionAsScalaIterable(workers)); + router = Routing.actorOf( + RoutedProps.apply() + .withConnections(workers) + .withRoundRobinRouter() + .withDeployId("pi") + ); } @Override diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 144457573e..d9e133df98 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -5,11 +5,11 @@ package akka.tutorial.second import akka.actor.Actor._ -import akka.routing.Routing import akka.event.EventHandler import System.{ currentTimeMillis ⇒ now } import akka.routing.Routing.Broadcast import akka.actor.{ Timeout, Channel, Actor, PoisonPill } +import akka.routing.{ RoutedProps, Routing } object Pi extends App { @@ -53,7 +53,11 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start()) // wrap them with a load-balancing router - val router = Routing.actorOfWithRoundRobin("pi", workers) + val router = Routing.actorOf( + RoutedProps.apply() + .withConnections(workers) + .withRoundRobinRouter() + .withDeployId("pi")) // phase 1, can accept a Calculate message def scatter: Receive = {