diff --git a/akka-actor/src/main/java/akka/routing/RouterFactory.java b/akka-actor/src/main/java/akka/routing/RouterFactory.java
new file mode 100644
index 0000000000..ad4d719930
--- /dev/null
+++ b/akka-actor/src/main/java/akka/routing/RouterFactory.java
@@ -0,0 +1,15 @@
+package akka.routing;
+
+/**
+ * A Factory responsible for creating {@link Router} instances. It makes Java compatability possible for users that
+ * want to provide their own router instance.
+ */
+public interface RouterFactory {
+
+ /**
+ * Creates a new Router instance.
+ *
+ * @return the newly created Router instance.
+ */
+ Router newRouter();
+}
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 5fb5986b25..1387d3fe26 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -7,17 +7,13 @@ package akka.routing
import annotation.tailrec
import akka.AkkaException
-import akka.dispatch.Future
import akka.actor._
-import akka.dispatch.Futures
import akka.event.EventHandler
import akka.actor.UntypedChannel._
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
-
-/**
- * An {@link AkkaException} thrown when something goes wrong while routing a message
- */
-class RoutingException(message: String) extends AkkaException(message)
+import akka.dispatch.{ Future, Futures }
+import akka.util.ReflectiveAccess
+import collection.JavaConversions.iterableAsScalaIterable
sealed trait RouterType
@@ -59,6 +55,116 @@ object RouterType {
}
+object RoutedProps {
+
+ final val defaultTimeout = Actor.TIMEOUT
+ final val defaultRouterFactory = () ⇒ new RoundRobinRouter
+ final val defaultDeployId = ""
+ final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled
+
+ /**
+ * The default RoutedProps instance, uses the settings from the RoutedProps object starting with default*
+ */
+ final val default = new RoutedProps()
+
+ def apply(): RoutedProps = default
+}
+
+/**
+ * Contains the configuration to create local and clustered routed actor references.
+ *
+ * Routed ActorRef configuration object, this is thread safe and fully sharable.
+ *
+ * Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing
+ * (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns
+ * a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple
+ * threads safe.
+ *
+ * This configuration object makes it possible to either
+ */
+case class RoutedProps(routerFactory: () ⇒ Router, deployId: String, connections: Iterable[ActorRef], timeout: Timeout, localOnly: Boolean) {
+
+ def this() = this(
+ routerFactory = RoutedProps.defaultRouterFactory,
+ deployId = RoutedProps.defaultDeployId,
+ connections = List(),
+ timeout = RoutedProps.defaultTimeout,
+ localOnly = RoutedProps.defaultLocalOnly)
+
+ /**
+ * Returns a new RoutedProps with the specified deployId set
+ *
+ * Java and Scala API
+ */
+ def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id)
+
+ /**
+ * Returns a new RoutedProps configured with a random router.
+ *
+ * Java and Scala API.
+ */
+ def withRandomRouter(): RoutedProps = copy(routerFactory = () ⇒ new RandomRouter())
+
+ /**
+ * Returns a new RoutedProps configured with a round robin router.
+ *
+ * Java and Scala API.
+ */
+ def withRoundRobinRouter(): RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter())
+
+ /**
+ * Returns a new RoutedProps configured with a direct router.
+ *
+ * Java and Scala API.
+ */
+ def withDirectRouter(): RoutedProps = copy(routerFactory = () ⇒ new DirectRouter())
+
+ /**
+ * Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created.
+ * In some cases you just want to have local actor references, even though the Cluster Module is up and running.
+ *
+ * Java and Scala API.
+ */
+ def withLocalOnly(l: Boolean = true) = copy(localOnly = l)
+
+ /**
+ * Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
+ * Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new
+ * instances.
+ *
+ * Scala API.
+ */
+ def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f)
+
+ /**
+ * Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
+ * Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new
+ * instances.
+ *
+ * Java API.
+ */
+ def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter())
+
+ /**
+ *
+ */
+ def withTimeout(t: Timeout): RoutedProps = copy(timeout = t)
+
+ /**
+ * Sets the connections to use.
+ *
+ * Scala API.
+ */
+ def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c)
+
+ /**
+ * Sets the connections to use.
+ *
+ * Java API.
+ */
+ def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c))
+}
+
/**
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
* {@link RouterConnections} and each Router should be linked to only one {@link RouterConnections}.
@@ -96,7 +202,24 @@ trait Router {
}
/**
+ * An {@link AkkaException} thrown when something goes wrong while routing a message
+ */
+class RoutingException(message: String) extends AkkaException(message)
+
+/**
+ * The RouterConnection acts like a middleman between the Router and the actor reference that does the routing.
+ * Through the RouterConnection:
+ *
+ * -
+ * the actor ref can signal that something has changed in the known set of connections. The Router can see
+ * when a changed happened (by checking the version) and update its internal datastructures.
+ *
+ * -
+ * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
+ *
+ *
*
+ * It is very likely that the implementation of the RouterConnection will be part of the ActorRef itself.
*/
trait RouterConnections {
@@ -107,12 +230,14 @@ trait RouterConnections {
def version: Long
/**
- * Returns a tuple containing the version and Iterable of all connected ActorRefs this Router uses to send messages to.
+ * Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
+ * the time element, also the version is included to be able to read the data (the connections) and the version
+ * in an atomic manner.
*
- * This iterator should be 'persistent'. So it can be handed out to other threads so that they are working on
- * a stable (immutable) view of some set of connections.
+ * This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
+ * view of some set of connections.
*/
- def versionedIterator: (Long, Iterable[ActorRef])
+ def versionedIterable: VersionedIterable[ActorRef]
/**
* A callback that can be used to indicate that a connected actorRef was dead.
@@ -131,12 +256,41 @@ trait RouterConnections {
def signalDeadActor(deadRef: ActorRef): Unit
}
+/**
+ * An Iterable that also contains a version.
+ */
+case class VersionedIterable[A](version: Long, val iterable: Iterable[A])
+
+/**
+ * A Helper class to create actor references that use routing.
+ */
object Routing {
sealed trait RoutingMessage
case class Broadcast(message: Any) extends RoutingMessage
+ /**
+ * todo: will very likely be moved to the ActorRef.
+ */
+ def actorOf(props: RoutedProps): ActorRef = {
+ //TODO Implement support for configuring by deployment ID etc
+ //TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
+ //TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
+ //TODO If the actor deployed uses a different config, then ignore or throw exception?
+
+ val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
+ val localOnly = props.localOnly
+
+ if (!localOnly && !clusteringEnabled)
+ throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled")
+ else if (clusteringEnabled && !props.localOnly) {
+ ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
+ } else {
+ new RoutedActorRef(props)
+ }
+ }
+
/**
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
*
@@ -146,8 +300,9 @@ object Routing {
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
* how many connections it can handle.
*/
+ @deprecated
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
- val ref = routerType match {
+ routerType match {
case RouterType.Direct ⇒
if (connections.size > 1)
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
@@ -159,29 +314,27 @@ object Routing {
actorOf(actorAddress, connections, new RoundRobinRouter())
case _ ⇒ throw new IllegalArgumentException("Unsupported routerType " + routerType)
}
-
- ref.start()
}
+ @deprecated
def actorOf(actorAddress: String, connections: Iterable[ActorRef], router: Router): ActorRef = {
if (connections.size == 0)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
- new RoutedActorRef(actorAddress, router, connections)
+ val props = new RoutedProps(() ⇒ router, actorAddress, connections, RoutedProps.defaultTimeout, true)
+ new RoutedActorRef(props).start()
}
- def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef = {
- actorOf(actorAddress, connections, akka.routing.RouterType.RoundRobin)
- }
}
/**
- * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
- * on (or more) of these actors.
+ * An Abstract convenience implementation for building an ActorReference that uses a Router.
*/
-class RoutedActorRef(val address: String, val router: Router, val connectionIterator: Iterable[ActorRef]) extends UnsupportedActorRef {
+abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
- router.init(new RoutedActorRefConnections(connectionIterator))
+ val router = props.routerFactory.apply()
+
+ def address = props.deployId
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
@@ -200,6 +353,16 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
}
router.route[Any](message, timeout)(sender)
}
+}
+
+/**
+ * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
+ * on (or more) of these actors.
+ */
+private[akka] class RoutedActorRef(val routedProps: RoutedProps)
+ extends AbstractRoutedActorRef(routedProps) {
+
+ router.init(new RoutedActorRefConnections(routedProps.connections))
def start(): this.type = synchronized[this.type] {
_status = ActorRefInternals.RUNNING
@@ -211,47 +374,38 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
-
- // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
-
- //inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
}
}
}
private class RoutedActorRefConnections() extends RouterConnections {
- private val state = new AtomicReference[State]()
+ private val state = new AtomicReference[VersionedIterable[ActorRef]]()
def this(connectionIterable: Iterable[ActorRef]) = {
this()
- state.set(new State(Long.MinValue, connectionIterable))
+ state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable))
}
def version: Long = state.get().version
- def versionedIterator = {
- val s = state.get
- (s.version, s.connectionIterable)
- }
+ def versionedIterable = state.get
@tailrec
final def signalDeadActor(ref: ActorRef) = {
val oldState = state.get()
//remote the ref from the connections.
- var newList = oldState.connectionIterable.filter(currentActorRef ⇒ currentActorRef ne ref)
+ var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref)
- if (newList.size != oldState.connectionIterable.size) {
+ if (newList.size != oldState.iterable.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
- val newState = new State(oldState.version + 1, newList)
+ val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) signalDeadActor(ref)
}
}
-
- class State(val version: Long, val connectionIterable: Iterable[ActorRef])
}
}
@@ -275,7 +429,7 @@ trait BasicRouter extends Router {
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match {
case Routing.Broadcast(message) ⇒
//it is a broadcast message, we are going to send to message to all connections.
- connections.versionedIterator._2.foreach(actor ⇒
+ connections.versionedIterable.iterable.foreach(actor ⇒
try {
actor.!(message)(sender)
} catch {
@@ -353,12 +507,13 @@ class DirectRouter extends BasicRouter {
} else {
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
- val (version, connectionIterable) = connections.versionedIterator
+ val versionedIterable = connections.versionedIterable
- if (connectionIterable.size > 1)
- throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionIterable.size))
+ val connectionCount = versionedIterable.iterable.size
+ if (connectionCount > 1)
+ throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
- val newState = new DirectRouterState(connectionIterable.head, version)
+ val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@@ -405,8 +560,8 @@ class RandomRouter extends BasicRouter {
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
- val (version, connectionIterable) = connections.versionedIterator
- val newState = new RandomRouterState(connectionIterable.toArray[ActorRef], version)
+ val versionedIterable = connections.versionedIterable
+ val newState = new RandomRouterState(versionedIterable.iterable.toArray[ActorRef], versionedIterable.version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@@ -442,8 +597,8 @@ class RoundRobinRouter extends BasicRouter {
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
- val (version, connectionIterable) = connections.versionedIterator
- val newState = new RoundRobinState(connectionIterable.toArray[ActorRef], version)
+ val versionedIterable = connections.versionedIterable
+ val newState = new RoundRobinState(versionedIterable.iterable.toArray[ActorRef], versionedIterable.version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@@ -475,10 +630,10 @@ class RoundRobinRouter extends BasicRouter {
/*
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
* specified strategy (specific router needs to implement `gather` method).
- * Scatter-gather pattern will be applied only to the messages broadcasted using Future
+ * Scatter-gather pattern will be applied only to the messages broadcasted using Future
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
- * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
- *
+ * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
+ *
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
* FIXME: this is also the location where message buffering should be done in case of failure.
*/
@@ -491,7 +646,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
- val responses = connections.versionedIterator._2.flatMap { actor ⇒
+ val responses = connections.versionedIterable.iterable.flatMap { actor ⇒
try {
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
} catch {
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 4a82884f03..ec72d0d4cc 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -13,6 +13,7 @@ import akka.event.EventHandler
import akka.cluster.ClusterNode
import java.net.InetSocketAddress
+import akka.routing.{ RoutedProps, Router }
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
@@ -33,6 +34,21 @@ object ReflectiveAccess {
object ClusterModule {
lazy val isEnabled = Config.isClusterEnabled && clusterInstance.isDefined
+ lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match {
+ case Left(e) ⇒ throw e
+ case Right(b) ⇒ b
+ }
+
+ def newClusteredActorRef(props: RoutedProps): ActorRef = {
+ val params: Array[Class[_]] = Array(classOf[RoutedProps])
+ val args: Array[AnyRef] = Array(props)
+
+ createInstance(clusterRefClass, params, args) match {
+ case Left(e) ⇒ throw e
+ case Right(b) ⇒ b.asInstanceOf[ActorRef]
+ }
+ }
+
def ensureEnabled() {
if (!isEnabled) {
val e = new ModuleNotAvailableException(
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index d28f4e91e4..ca3b943a36 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -411,7 +411,7 @@ class DefaultClusterNode private[akka] (
private val changeListeners = new CopyOnWriteArrayList[ChangeListener]()
// Address -> ClusterActorRef
- private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
+ private[akka] val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// ============================================================================================================
// ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY =============
@@ -979,15 +979,7 @@ class DefaultClusterNode private[akka] (
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
- val addresses = addressesForActor(actorAddress)
- EventHandler.debug(this,
- "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
- .format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
-
- val actorRef = Routing newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
- addresses foreach {
- case (_, address) ⇒ clusterActorRefs.put(address, actorRef)
- }
+ val actorRef = Routing newRouter (router, actorAddress, Actor.TIMEOUT)
actorRef.start()
} else throw new ClusterException("Not connected to cluster")
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 918ec92fe5..d72b875fd3 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -6,16 +6,13 @@ package akka.cluster
import akka.actor._
import akka.util._
import ReflectiveAccess._
-import akka.dispatch.Future
-
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
-import com.eaio.uuid.UUID
import collection.immutable.Map
import annotation.tailrec
import akka.event.EventHandler
-import akka.routing.{ RouterConnections, Router }
+import akka.routing._
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@@ -23,42 +20,27 @@ import akka.routing.{ RouterConnections, Router }
*
* @author Jonas Bonér
*/
-class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- val address: String,
- _timeout: Long,
- val router: Router)
- extends UnsupportedActorRef {
+private[akka] class ClusterActorRef(props: RoutedProps) extends AbstractRoutedActorRef(props) {
ClusterModule.ensureEnabled()
- timeout = _timeout
+ val addresses = Cluster.node.addressesForActor(address)
+ EventHandler.debug(this,
+ "Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
+ .format(address, router, Cluster.node.remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
- val connections = new ClusterActorRefConnections((Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) {
+ addresses foreach {
+ case (_, address) ⇒ Cluster.node.clusterActorRefs.put(address, this)
+ }
+
+ val connections = new ClusterActorRefConnections((Map[InetSocketAddress, ActorRef]() /: addresses) {
case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress))
- })
+ }, props.connections)
router.init(connections)
def connectionsSize(): Int = connections.size
- override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
- val sender = channel match {
- case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
- }
- router.route(message)(sender)
- }
-
- override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
- timeout: Timeout,
- channel: UntypedChannel): Future[Any] = {
- val sender = channel match {
- case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
- }
- router.route[Any](message, timeout.duration.toMillis)(sender)
- }
-
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
}
@@ -92,22 +74,19 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
private val state = new AtomicReference[State]()
- def this(connectionMap: Map[InetSocketAddress, ActorRef]) = {
+ def this(clusteredConnections: Map[InetSocketAddress, ActorRef], explicitConnections: Iterable[ActorRef]) = {
this()
- state.set(new State(Long.MinValue, connectionMap))
+ state.set(new State(Long.MinValue, clusteredConnections, explicitConnections))
}
def version: Long = state.get().version
- def versionedIterator = {
- val s = state.get
- (s.version, s.connections.values)
- }
+ def versionedIterable = state.get
- def size(): Int = state.get().connections.size
+ def size(): Int = state.get().iterable.size
def stopAll() {
- state.get().connections.values foreach (_.stop()) // shut down all remote connections
+ state.get().clusteredConnections.values foreach (_.stop()) // shut down all remote connections
}
@tailrec
@@ -115,18 +94,18 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
EventHandler.debug(this, "ClusterActorRef. %s failover from %s to %s".format(address, from, to))
val oldState = state.get
- var change = false
- val newMap = oldState.connections map {
+ var changed = false
+ val newMap = oldState.clusteredConnections map {
case (`from`, actorRef) ⇒
- change = true
+ changed = true
actorRef.stop()
(to, createRemoteActorRef(actorRef.address, to))
case other ⇒ other
}
- if (change) {
+ if (changed) {
//there was a state change, so we are now going to update the state.
- val newState = new State(oldState.version + 1, newMap)
+ val newState = new State(oldState.version + 1, newMap, oldState.explicitConnections)
//if we are not able to update, the state, we are going to try again.
if (!state.compareAndSet(oldState, newState)) failOver(from, to)
@@ -139,24 +118,43 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
val oldState = state.get()
- //remote the ref from the connections.
+ var changed = false
+
+ //remote the deadRef from the clustered-connections.
var newConnections = Map[InetSocketAddress, ActorRef]()
- oldState.connections.keys.foreach(
+ oldState.clusteredConnections.keys.foreach(
address ⇒ {
- val actorRef: ActorRef = oldState.connections.get(address).get
- if (actorRef ne deadRef) newConnections = newConnections + ((address, actorRef))
+ val actorRef: ActorRef = oldState.clusteredConnections.get(address).get
+ if (actorRef ne deadRef) {
+ newConnections = newConnections + ((address, actorRef))
+ } else {
+ changed = true
+ }
})
- if (newConnections.size != oldState.connections.size) {
+ //remove the deadRef also from the explicit connections.
+ var newExplicitConnections = oldState.explicitConnections.filter(
+ actorRef ⇒
+ if (actorRef == deadRef) {
+ changed = true
+ false
+ } else {
+ true
+ })
+
+ if (changed) {
//one or more occurrances of the actorRef were removed, so we need to update the state.
- val newState = new State(oldState.version + 1, newConnections)
+ val newState = new State(oldState.version + 1, newConnections, newExplicitConnections)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) signalDeadActor(deadRef)
}
}
- class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef])
+ class State(version: Long = Integer.MIN_VALUE,
+ val clusteredConnections: Map[InetSocketAddress, ActorRef],
+ val explicitConnections: Iterable[ActorRef])
+ extends VersionedIterable[ActorRef](version, explicitConnections ++ clusteredConnections.values)
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
index 7aabfbcd59..a2ff6ea8a2 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
@@ -4,31 +4,27 @@ package akka.cluster
* Copyright (C) 2009-2011 Typesafe Inc.
*/
-import akka.routing.RouterType
-import RouterType._
-
-import com.eaio.uuid.UUID
-
-import java.net.InetSocketAddress
-import akka.routing.{ RandomRouter, DirectRouter, RoundRobinRouter }
+import akka.routing.RouterType._
+import akka.routing._
/**
* @author Jonas Bonér
*/
+@deprecated("will be removed as soon as the RoutingProps are in place. ")
object Routing {
- def newRouter(
- routerType: RouterType,
- inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- actorAddress: String,
- timeout: Long): ClusterActorRef = {
- routerType match {
- case Direct ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new DirectRouter())
- case Random ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new RandomRouter())
- case RoundRobin ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, new RoundRobinRouter())
+ def newRouter(routerType: RouterType, actorAddress: String, timeout: Long): ClusterActorRef = {
+
+ val router = routerType match {
+ case Direct ⇒ new DirectRouter()
+ case Random ⇒ new RandomRouter()
+ case RoundRobin ⇒ new RoundRobinRouter()
case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
}
+
+ val props = new RoutedProps(() ⇒ router, actorAddress, List(), timeout, false)
+ new ClusterActorRef(props).start()
}
}
diff --git a/akka-docs/disabled/examples/Pi.scala b/akka-docs/disabled/examples/Pi.scala
index 41f8e88b9f..16c781bd4d 100644
--- a/akka-docs/disabled/examples/Pi.scala
+++ b/akka-docs/disabled/examples/Pi.scala
@@ -1,9 +1,9 @@
//#imports
package akka.tutorial.scala.first
+import _root_.akka.routing.{RoutedProps, Routing, CyclicIterator}
import akka.actor.{Actor, PoisonPill}
import Actor._
-import akka.routing.{Routing, CyclicIterator}
import Routing._
import System.{currentTimeMillis => now}
@@ -65,7 +65,14 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
- val router = Routing.loadBalancerActor(CyclicIterator(workers)).start()
+ val router = Routing.actorOf(
+ RoutedProps.apply
+ .withRoundRobinRouter
+ .withConnections(workers)
+ .withDeployId("pi")
+ )
+
+ loadBalancerActor(CyclicIterator(workers)).start()
//#create-workers
//#master-receive
diff --git a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
index 39c91a06df..4241bebe52 100644
--- a/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
+++ b/akka-samples/akka-sample-camel/src/test/scala/sample/camel/HttpConcurrencyTestStress.scala
@@ -1,5 +1,6 @@
package sample.camel
+import _root_.akka.routing.{RoutedProps, Routing}
import collection.mutable.Set
import java.util.concurrent.CountDownLatch
@@ -11,8 +12,6 @@ import akka.actor.Actor._
import akka.actor.{ ActorRegistry, ActorRef, Actor }
import akka.camel._
import akka.camel.CamelServiceManager._
-import akka.routing.Routing
-
/**
* @author Martin Krasser
*/
@@ -50,8 +49,10 @@ object HttpConcurrencyTestStress {
startCamelService
val workers = for (i ← 1 to 8) yield actorOf[HttpServerWorker].start
- val balancer = Routing.actorOfWithRoundRobin("loadbalancer", workers)
- //service.get.awaitEndpointActivation(1) {
+ val balancer = Routing.actorOf(
+ RoutedProps.apply.withRoundRobinRouter.withConnections(workers).withDeployId("loadbalancer")
+ )
+ //service.get.awaitEndpointActivation(1) {
// actorOf(new HttpServerActor(balancer)).start
//}
}
diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
index 0442fabc45..f975b4e1d1 100644
--- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
+++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
@@ -11,6 +11,7 @@ import static java.util.Arrays.asList;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
+import akka.routing.RoutedProps;
import akka.routing.RouterType;
import akka.routing.Routing;
import akka.routing.Routing.Broadcast;
@@ -108,7 +109,12 @@ public class Pi {
workers.add(worker);
}
- router = Routing.actorOfWithRoundRobin("pi", JavaConversions.asIterable(workers));
+ router = Routing.actor(
+ RoutedProps.apply()
+ .withRoundRobinRouter()
+ .withConnections(workers)
+ .withDeployId("pi")
+ );
}
// message handler
diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
index b97e22f5a0..6eba020b62 100644
--- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala
@@ -4,11 +4,11 @@
package akka.tutorial.first.scala
-import akka.actor.{ Actor, PoisonPill }
+import akka.actor.{Actor, PoisonPill}
import Actor._
import java.util.concurrent.CountDownLatch
import akka.routing.Routing.Broadcast
-import akka.routing.Routing
+import akka.routing.{RoutedProps, Routing}
object Pi extends App {
@@ -18,8 +18,11 @@ object Pi extends App {
// ===== Messages =====
// ====================
sealed trait PiMessage
+
case object Calculate extends PiMessage
+
case class Work(start: Int, nrOfElements: Int) extends PiMessage
+
case class Result(value: Double) extends PiMessage
// ==================
@@ -55,7 +58,12 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
- val router = Routing.actorOfWithRoundRobin("pi", workers)
+ val router = Routing.actor(
+ RoutedProps.default
+ .withRoundRobinRouter()
+ .withConnections(workers)
+ .withDeployId("pi")
+ )
// message handler
def receive = {
diff --git a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
index 2dc7da4399..73cac15433 100644
--- a/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
+++ b/akka-tutorials/akka-tutorial-second/src/main/java/akka/tutorial/java/second/Pi.java
@@ -9,6 +9,7 @@ import static akka.actor.Actors.poisonPill;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
+import akka.routing.RoutedProps;
import akka.routing.Routing;
import scala.Option;
import akka.actor.ActorRef;
@@ -102,7 +103,12 @@ public class Pi {
workers.add(worker);
}
- router = Routing.actorOfWithRoundRobin("pi", JavaConversions.asIterable(workers));
+ router = Routing.actorOf(
+ RoutedProps.apply()
+ .withConnections(workers)
+ .withRoundRobinRouter()
+ .withDeployId("pi")
+ )
}
@Override
diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
index 144457573e..3a5f34c8ce 100644
--- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
+++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala
@@ -5,11 +5,11 @@
package akka.tutorial.second
import akka.actor.Actor._
-import akka.routing.Routing
import akka.event.EventHandler
import System.{ currentTimeMillis ⇒ now }
import akka.routing.Routing.Broadcast
import akka.actor.{ Timeout, Channel, Actor, PoisonPill }
+import akka.routing.{RoutedProps, Routing}
object Pi extends App {
@@ -53,7 +53,13 @@ object Pi extends App {
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start())
// wrap them with a load-balancing router
- val router = Routing.actorOfWithRoundRobin("pi", workers)
+ val router = Routing.actorOf(
+ RoutedProps.apply()
+ .withConnections(workers)
+ .withRoundRobinRouter()
+ .withDeployId("pi")
+
+ )
// phase 1, can accept a Calculate message
def scatter: Receive = {