diff --git a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala
index daf6f29067..1b3fae6c5b 100644
--- a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala
+++ b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala
@@ -59,6 +59,8 @@ object FailureDetector {
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
*
*
+ *
+ * @author Jonas Bonér
*/
trait FailureDetector {
@@ -120,6 +122,11 @@ trait FailureDetector {
*/
def remove(deadRef: ActorRef)
+ /**
+ * TODO: document
+ */
+ def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef
+
/**
* Fails over connections from one address to another.
*/
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index b7c5eddeaa..ec0be0374b 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -103,8 +103,6 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector
state.get.iterable foreach (_.stop())
}
- def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here
-
@tailrec
final def remove(ref: ActorRef) = {
val oldState = state.get
@@ -120,6 +118,12 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector
if (!state.compareAndSet(oldState, newState)) remove(ref)
}
}
+
+ def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here
+
+ def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = {
+ throw new UnsupportedOperationException("Not supported")
+ }
}
/**
diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala
index 04aec93ae0..7792313f6e 100644
--- a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala
+++ b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala
@@ -20,52 +20,10 @@ import java.util.concurrent.atomic.AtomicReference
import System.{ currentTimeMillis ⇒ newTimestamp }
import akka.dispatch.PinnedDispatcher
-/**
- * Holds error event channel Actor instance and provides API for channel listener management.
- */
-object RemoteFailureDetector {
-
- private sealed trait RemoteFailureDetectorChannelEvent
-
- private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
- extends RemoteFailureDetectorChannelEvent
-
- private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
- extends RemoteFailureDetectorChannelEvent
-
- private[akka] val channel = new LocalActorRef(Props[Channel].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true)
-
- def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
- channel ! Register(listener, connectionAddress)
-
- def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
- channel ! Unregister(listener, connectionAddress)
-
- private class Channel extends Actor {
-
- val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() {
- override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener]
- }
-
- def receive = {
- case event: RemoteClientLifeCycleEvent ⇒
- listeners(event.remoteAddress) foreach (_ notify event)
-
- case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent
-
- case Register(listener, connectionAddress) ⇒
- listeners(connectionAddress) += listener
-
- case Unregister(listener, connectionAddress) ⇒
- listeners(connectionAddress) -= listener
-
- case _ ⇒ //ignore other
- }
- }
-}
-
/**
* Base class for remote failure detection management.
+ *
+ * @author Jonas Bonér
*/
abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef])
extends FailureDetector
@@ -83,6 +41,9 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
protected val state: AtomicReference[State] = new AtomicReference[State](newState())
+ // register all initial connections - e.g listen to events from them
+ initialConnections.keys foreach (NetworkEventStream.register(this, _))
+
/**
* State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field.
*/
@@ -123,7 +84,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
@tailrec
final def failOver(from: InetSocketAddress, to: InetSocketAddress) {
- EventHandler.debug(this, "ClusterActorRef failover from [%s] to [%s]".format(from, to))
+ EventHandler.debug(this, "RemoteFailureDetector failover from [%s] to [%s]".format(from, to))
val oldState = state.get
var changed = false
@@ -132,7 +93,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
case (`from`, actorRef) ⇒
changed = true
//actorRef.stop()
- (to, createRemoteActorRef(actorRef.address, to))
+ (to, newConnection(actorRef.address, to))
case other ⇒ other
}
@@ -141,24 +102,27 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
val newState = oldState copy (version = oldState.version + 1, connections = newMap)
//if we are not able to update, the state, we are going to try again.
- if (!state.compareAndSet(oldState, newState)) failOver(from, to)
+ if (!state.compareAndSet(oldState, newState)) {
+ failOver(from, to) // recur
+ }
}
}
@tailrec
final def remove(faultyConnection: ActorRef) {
- EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid))
val oldState = state.get()
var changed = false
- //remote the faultyConnection from the clustered-connections.
+ var faultyAddress: InetSocketAddress = null
var newConnections = Map.empty[InetSocketAddress, ActorRef]
+
oldState.connections.keys foreach { address ⇒
val actorRef: ActorRef = oldState.connections.get(address).get
if (actorRef ne faultyConnection) {
newConnections = newConnections + ((address, actorRef))
} else {
+ faultyAddress = address
changed = true
}
}
@@ -168,11 +132,45 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
//if we are not able to update the state, we just try again.
- if (!state.compareAndSet(oldState, newState)) remove(faultyConnection)
+ if (!state.compareAndSet(oldState, newState)) {
+ remove(faultyConnection) // recur
+ } else {
+ EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress))
+ NetworkEventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it
+ }
}
}
- private[cluster] def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
+ @tailrec
+ final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = {
+
+ val oldState = state.get()
+ val oldConnections = oldState.connections
+
+ oldConnections.get(address) match {
+ case Some(connection) ⇒ connection // we already had the connection, return it
+ case None ⇒ // we need to create it
+ val newConnection = newConnectionFactory()
+ val newConnections = oldConnections + (address -> newConnection)
+
+ //one or more occurrances of the actorRef were removed, so we need to update the state.
+ val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
+
+ //if we are not able to update the state, we just try again.
+ if (!state.compareAndSet(oldState, newState)) {
+ // we failed, need compensating action
+ newConnection.stop() // stop the new connection actor and try again
+ putIfAbsent(address, newConnectionFactory) // recur
+ } else {
+ // we succeeded
+ EventHandler.debug(this, "Adding connection [%s]".format(address))
+ NetworkEventStream.register(this, address) // register the connection - e.g listen to events from it
+ newConnection // return new connection actor
+ }
+ }
+ }
+
+ private[cluster] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
}
}
@@ -181,7 +179,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
* Simple failure detector that removes the failing connection permanently on first error.
*/
class RemoveConnectionOnFirstFailureRemoteFailureDetector(
- initialConnections: Map[InetSocketAddress, ActorRef])
+ initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef])
extends RemoteFailureDetectorBase(initialConnections) {
protected def newState() = State(Long.MinValue, initialConnections)
@@ -215,9 +213,11 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector(
/**
* Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection
* again after the ban period have expired.
+ *
+ * @author Jonas Bonér
*/
class BannagePeriodFailureDetector(
- initialConnections: Map[InetSocketAddress, ActorRef],
+ initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef],
timeToBan: Duration)
extends RemoteFailureDetectorBase(initialConnections) {