diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index b981c954ce..218fa0893c 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -82,7 +82,11 @@ case class MaximumNumberOfRestartsWithinTimeRangeReached(
// Exceptions for Actors
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
-class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
+
+class ActorKilledException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause){
+ def this(msg: String) = this(msg, null);
+}
+
class ActorInitializationException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause)
diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
index f5206b7668..ed1eefcd7a 100644
--- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
+++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
@@ -133,8 +133,10 @@ object DeploymentConfig {
case Host("localhost") ⇒ Config.nodename
case IP("0.0.0.0") ⇒ Config.nodename
case IP("127.0.0.1") ⇒ Config.nodename
- case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
- case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ case Host(hostname) ⇒ throw new UnsupportedOperationException(
+ "Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ case IP(address) ⇒ throw new UnsupportedOperationException(
+ "Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == Config.nodename)
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 39c57c0778..980e5ef9e5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -933,7 +933,8 @@ class DefaultClusterNode private[akka](
*/
def release(actorAddress: String) {
- // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
+ // FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no
+ // longer available. Then what to do? Should we even remove this method?
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 3dc5f14b78..5afe318daa 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -3,22 +3,19 @@
*/
package akka.cluster
-import Cluster._
-
import akka.actor._
-import Actor._
import akka.dispatch._
import akka.util._
import ReflectiveAccess._
-import ClusterModule._
-import akka.event.EventHandler
import akka.dispatch.Future
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
-import java.util.{ Map ⇒ JMap }
+import java.util.{Map ⇒ JMap}
import com.eaio.uuid.UUID
+import collection.immutable.Map
+import annotation.tailrec
/**
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
@@ -26,11 +23,11 @@ import com.eaio.uuid.UUID
*
* @author Jonas Bonér
*/
-class ClusterActorRef private[akka] (
- inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- val address: String,
- _timeout: Long)
- extends ActorRef with ScalaActorRef { this: Router.Router ⇒
+class ClusterActorRef private[akka](inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
+ val address: String,
+ _timeout: Long)
+ extends ActorRef with ScalaActorRef {
+ this: Router.Router ⇒
timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
@@ -45,29 +42,71 @@ class ClusterActorRef private[akka] (
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val sender = channel match {
case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
+ case _ ⇒ None
}
route(message)(sender)
}
- override def postMessageToMailboxAndCreateFutureResultWithTimeout(
- message: Any,
- timeout: Long,
- channel: UntypedChannel): Future[Any] = {
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
+ timeout: Long,
+ channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef ⇒ Some(ref)
- case _ ⇒ None
+ case _ ⇒ None
}
route[Any](message, timeout)(sender)
}
- private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) {
- inetSocketAddressToActorRefMap set (inetSocketAddressToActorRefMap.get map {
- case (`fromInetSocketAddress`, actorRef) ⇒
- actorRef.stop()
- (toInetSocketAddress, createRemoteActorRef(actorRef.address, toInetSocketAddress))
- case other ⇒ other
- })
+ private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ @tailrec
+ def doFailover(from: InetSocketAddress, to: InetSocketAddress): Unit = {
+ val oldValue = inetSocketAddressToActorRefMap.get
+
+ val newValue = oldValue map {
+ case (`from`, actorRef) ⇒
+ actorRef.stop()
+ (to, createRemoteActorRef(actorRef.address, to))
+ case other ⇒ other
+ }
+
+ if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
+ doFailover(from, to)
+ }
+
+ doFailover(from, to)
+ }
+
+ /**
+ * Removes the given address (and the corresponding actorref) from this ClusteredActorRef.
+ *
+ * Call can safely be made when the address is missing.
+ *
+ * Call is threadsafe.
+ */
+ @tailrec
+ private def remove(address: InetSocketAddress): Unit = {
+ val oldValue = inetSocketAddressToActorRefMap.get()
+
+ var newValue = oldValue - address
+
+ if (!inetSocketAddressToActorRefMap.compareAndSet(oldValue, newValue))
+ remove(address)
+ }
+
+ def signalDeadActor(ref: ActorRef): Unit = {
+ //since the number remote actor refs for a clustered actor ref is quite low, we can deal with the O(N) complexity
+ //of the following removal.
+ val it = connections.keySet.iterator
+
+ while (it.hasNext) {
+ val address = it.next()
+ val foundRef: ActorRef = connections.get(address).get
+
+ if (foundRef == ref) {
+ remove(address)
+ return
+ }
+ }
}
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
index c165e699fe..c9db1e7208 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
@@ -3,13 +3,10 @@
*/
package akka.cluster
-import Cluster._
-
import akka.actor._
-import Actor._
import akka.dispatch.Future
import akka.event.EventHandler
-import akka.routing.{ RouterType, RoutingException }
+import akka.routing.{RouterType, RoutingException}
import RouterType._
import com.eaio.uuid.UUID
@@ -24,16 +21,16 @@ import java.util.concurrent.atomic.AtomicReference
*/
object Router {
def newRouter(
- routerType: RouterType,
- inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- actorAddress: String,
- timeout: Long): ClusterActorRef = {
+ routerType: RouterType,
+ inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
+ actorAddress: String,
+ timeout: Long): ClusterActorRef = {
routerType match {
- case Direct ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Direct
- case Random ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Random
- case RoundRobin ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with RoundRobin
- case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
- case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
+ case Direct ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Direct
+ case Random ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with Random
+ case RoundRobin ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout) with RoundRobin
+ case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
+ case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
}
}
@@ -42,22 +39,49 @@ object Router {
* @author Jonas Bonér
*/
trait Router {
+
def connections: Map[InetSocketAddress, ActorRef]
+ def signalDeadActor(ref: ActorRef): Unit
+
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
+ /**
+ * An Abstract Router implementation that already provides the basic infrastructure so that a concrete
+ * Router only needs to implement the next method.
+ *
+ * This also is the location where a failover is done in the future if an ActorRef fails and a different
+ * one needs to be selected.
+ */
trait BasicRouter extends Router {
+
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match {
- case Some(actor) ⇒ actor.!(message)(sender)
- case _ ⇒ throwNoConnectionsError()
+ case Some(actor) ⇒ {
+ try {
+ actor.!(message)(sender)
+ } catch {
+ case e: Exception =>
+ signalDeadActor(actor)
+ throw e
+ }
+ }
+ case _ ⇒ throwNoConnectionsError()
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match {
- case Some(actor) ⇒ actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
- case _ ⇒ throwNoConnectionsError()
+ case Some(actor) ⇒ {
+ try {
+ actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
+ } catch {
+ case e: Throwable =>
+ signalDeadActor(actor)
+ throw e
+ }
+ }
+ case _ ⇒ throwNoConnectionsError()
}
protected def next: Option[ActorRef]
@@ -73,6 +97,7 @@ object Router {
* @author Jonas Bonér
*/
trait Direct extends BasicRouter {
+
lazy val next: Option[ActorRef] = {
val connection = connections.values.headOption
if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection")
@@ -90,7 +115,9 @@ object Router {
if (connections.isEmpty) {
EventHandler.warning(this, "Router has no replica connections")
None
- } else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
+ } else {
+ Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next())
+ }
}
/**
@@ -109,7 +136,7 @@ object Router {
val currentItems = current.get
val newItems = currentItems match {
case Nil ⇒ items
- case xs ⇒ xs
+ case xs ⇒ xs
}
if (newItems.isEmpty) {
@@ -124,4 +151,5 @@ object Router {
findNext
}
}
+
}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
index aebfd8b651..6393502377 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala
@@ -36,6 +36,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
}
trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
+
override def beforeAll() = {
ClusterTestNode.waitForReady(getClass.getName)
}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
new file mode 100644
index 0000000000..518aed1cd0
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
new file mode 100644
index 0000000000..518aed1cd0
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
new file mode 100644
index 0000000000..882d9cb7db
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
@@ -0,0 +1,6 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "ERROR"
+akka.actor.deployment.service-test.router = "round-robin"
+akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
+akka.actor.deployment.service-test.clustered.replication-factor = 2
+akka.cluster.client.buffering.retry-message-send-on-failure = false
\ No newline at end of file
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts
new file mode 100644
index 0000000000..202496ad31
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
new file mode 100644
index 0000000000..18c8da87da
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2009-2011 Typesafe Inc.
+ */
+
+package akka.cluster.reflogic
+
+import akka.cluster._
+import akka.cluster.Cluster._
+import akka.actor.Actor
+import akka.routing.RoutingException
+import java.nio.channels.{ClosedChannelException, NotYetConnectedException}
+
+object ClusterActorRefCleanupMultiJvmSpec {
+
+ val NrOfNodes = 3
+
+ class TestActor extends Actor with Serializable {
+ println("--------------------------------------")
+ println("TestActor created")
+ println("--------------------------------------")
+
+ def receive = {
+ case "Die" =>
+ println("Killing JVM: " + Cluster.node.nodeAddress)
+ System.exit(0)
+ case _ =>
+ println("Hello")
+ }
+ }
+
+}
+
+class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
+
+ import ClusterActorRefCleanupMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "ClusterActorRef" must {
+ "cleanup itself" in {
+ node.start
+ barrier("awaitStarted", NrOfNodes).await()
+
+ val ref = Actor.actorOf[ClusterActorRefCleanupMultiJvmSpec.TestActor]("service-test")
+
+ ref.isInstanceOf[ClusterActorRef] must be(true)
+
+ val clusteredRef = ref.asInstanceOf[ClusterActorRef]
+
+ //verify that all remote actors are there.
+ clusteredRef.connections.size must be(2)
+
+ //let one of the actors die.
+ clusteredRef ! "Die"
+
+ //just some waiting to make sure that the node has died.
+ Thread.sleep(5000)
+
+ //send some request, this should trigger the cleanup
+ try {
+ clusteredRef ! "hello"
+ clusteredRef ! "hello"
+ } catch {
+ case e: NotYetConnectedException =>
+ }
+
+ //since the call to the node failed, the node must have been removed from the list.
+ clusteredRef.connections.size must be(1)
+
+ //send a message to this node,
+ clusteredRef ! "hello"
+
+ //now kill another node
+ clusteredRef ! "Die"
+
+ //just some waiting to make sure that the node has died.
+ Thread.sleep(5000)
+
+ //trigger the cleanup.
+ try {
+ clusteredRef ! "hello"
+ } catch {
+ case e: ClosedChannelException =>
+ case e: NotYetConnectedException =>
+ case e: RoutingException =>
+ }
+
+ //now there must not be any remaining connections after the dead of the last actor.
+ clusteredRef.connections.size must be(0)
+
+ //and lets make sure we now get the correct exception if we try to use the ref.
+ try {
+ clusteredRef ! "Hello"
+ assert(false)
+ } catch {
+ case e: RoutingException =>
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ClusterActorRefCleanupMultiJvmNode2 extends ClusterTestNode {
+
+ import ClusterActorRefCleanupMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ //we are only using the nodes for their capacity, not for testing on this node itself.
+ "___" must {
+ "___" in {
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ ClusterTestNode.exit(classOf[ClusterActorRefCleanupMultiJvmNode2].getName)
+ }
+ })
+
+ node.start()
+ barrier("awaitStarted", NrOfNodes).await()
+
+ barrier("finished", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}
+
+class ClusterActorRefCleanupMultiJvmNode3 extends ClusterTestNode {
+
+ import ClusterActorRefCleanupMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ //we are only using the nodes for their capacity, not for testing on this node itself.
+ "___" must {
+ "___" in {
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ ClusterTestNode.exit(classOf[ClusterActorRefCleanupMultiJvmNode3].getName)
+ }
+ })
+
+ node.start()
+ barrier("awaitStarted", NrOfNodes).await()
+
+ barrier("finished", NrOfNodes).await()
+ node.shutdown()
+ }
+ }
+}