From e70ab6d12c9a70255b3af4b0542a062e34913e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 19 Sep 2011 14:40:12 +0200 Subject: [PATCH] Added lockless FailureDetector.putIfAbsent(connection). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../scala/akka/routing/FailureDetector.scala | 7 ++ .../src/main/scala/akka/routing/Routing.scala | 8 +- .../akka/cluster/RemoteFailureDetector.scala | 106 +++++++++--------- 3 files changed, 66 insertions(+), 55 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala index daf6f29067..1b3fae6c5b 100644 --- a/akka-actor/src/main/scala/akka/routing/FailureDetector.scala +++ b/akka-actor/src/main/scala/akka/routing/FailureDetector.scala @@ -59,6 +59,8 @@ object FailureDetector { * the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying. * * + * + * @author Jonas Bonér */ trait FailureDetector { @@ -120,6 +122,11 @@ trait FailureDetector { */ def remove(deadRef: ActorRef) + /** + * TODO: document + */ + def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef + /** * Fails over connections from one address to another. */ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b7c5eddeaa..ec0be0374b 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -103,8 +103,6 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector state.get.iterable foreach (_.stop()) } - def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here - @tailrec final def remove(ref: ActorRef) = { val oldState = state.get @@ -120,6 +118,12 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector if (!state.compareAndSet(oldState, newState)) remove(ref) } } + + def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here + + def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + throw new UnsupportedOperationException("Not supported") + } } /** diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala index 04aec93ae0..7792313f6e 100644 --- a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala @@ -20,52 +20,10 @@ import java.util.concurrent.atomic.AtomicReference import System.{ currentTimeMillis ⇒ newTimestamp } import akka.dispatch.PinnedDispatcher -/** - * Holds error event channel Actor instance and provides API for channel listener management. - */ -object RemoteFailureDetector { - - private sealed trait RemoteFailureDetectorChannelEvent - - private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) - extends RemoteFailureDetectorChannelEvent - - private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) - extends RemoteFailureDetectorChannelEvent - - private[akka] val channel = new LocalActorRef(Props[Channel].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true) - - def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = - channel ! Register(listener, connectionAddress) - - def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = - channel ! Unregister(listener, connectionAddress) - - private class Channel extends Actor { - - val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() { - override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener] - } - - def receive = { - case event: RemoteClientLifeCycleEvent ⇒ - listeners(event.remoteAddress) foreach (_ notify event) - - case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent - - case Register(listener, connectionAddress) ⇒ - listeners(connectionAddress) += listener - - case Unregister(listener, connectionAddress) ⇒ - listeners(connectionAddress) -= listener - - case _ ⇒ //ignore other - } - } -} - /** * Base class for remote failure detection management. + * + * @author Jonas Bonér */ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector @@ -83,6 +41,9 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre protected val state: AtomicReference[State] = new AtomicReference[State](newState()) + // register all initial connections - e.g listen to events from them + initialConnections.keys foreach (NetworkEventStream.register(this, _)) + /** * State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field. */ @@ -123,7 +84,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre @tailrec final def failOver(from: InetSocketAddress, to: InetSocketAddress) { - EventHandler.debug(this, "ClusterActorRef failover from [%s] to [%s]".format(from, to)) + EventHandler.debug(this, "RemoteFailureDetector failover from [%s] to [%s]".format(from, to)) val oldState = state.get var changed = false @@ -132,7 +93,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre case (`from`, actorRef) ⇒ changed = true //actorRef.stop() - (to, createRemoteActorRef(actorRef.address, to)) + (to, newConnection(actorRef.address, to)) case other ⇒ other } @@ -141,24 +102,27 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre val newState = oldState copy (version = oldState.version + 1, connections = newMap) //if we are not able to update, the state, we are going to try again. - if (!state.compareAndSet(oldState, newState)) failOver(from, to) + if (!state.compareAndSet(oldState, newState)) { + failOver(from, to) // recur + } } } @tailrec final def remove(faultyConnection: ActorRef) { - EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid)) val oldState = state.get() var changed = false - //remote the faultyConnection from the clustered-connections. + var faultyAddress: InetSocketAddress = null var newConnections = Map.empty[InetSocketAddress, ActorRef] + oldState.connections.keys foreach { address ⇒ val actorRef: ActorRef = oldState.connections.get(address).get if (actorRef ne faultyConnection) { newConnections = newConnections + ((address, actorRef)) } else { + faultyAddress = address changed = true } } @@ -168,11 +132,45 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre val newState = oldState copy (version = oldState.version + 1, connections = newConnections) //if we are not able to update the state, we just try again. - if (!state.compareAndSet(oldState, newState)) remove(faultyConnection) + if (!state.compareAndSet(oldState, newState)) { + remove(faultyConnection) // recur + } else { + EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) + NetworkEventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it + } } } - private[cluster] def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = { + @tailrec + final def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + + val oldState = state.get() + val oldConnections = oldState.connections + + oldConnections.get(address) match { + case Some(connection) ⇒ connection // we already had the connection, return it + case None ⇒ // we need to create it + val newConnection = newConnectionFactory() + val newConnections = oldConnections + (address -> newConnection) + + //one or more occurrances of the actorRef were removed, so we need to update the state. + val newState = oldState copy (version = oldState.version + 1, connections = newConnections) + + //if we are not able to update the state, we just try again. + if (!state.compareAndSet(oldState, newState)) { + // we failed, need compensating action + newConnection.stop() // stop the new connection actor and try again + putIfAbsent(address, newConnectionFactory) // recur + } else { + // we succeeded + EventHandler.debug(this, "Adding connection [%s]".format(address)) + NetworkEventStream.register(this, address) // register the connection - e.g listen to events from it + newConnection // return new connection actor + } + } + } + + private[cluster] def newConnection(actorAddress: String, inetSocketAddress: InetSocketAddress) = { RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) } } @@ -181,7 +179,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre * Simple failure detector that removes the failing connection permanently on first error. */ class RemoveConnectionOnFirstFailureRemoteFailureDetector( - initialConnections: Map[InetSocketAddress, ActorRef]) + initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) extends RemoteFailureDetectorBase(initialConnections) { protected def newState() = State(Long.MinValue, initialConnections) @@ -215,9 +213,11 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector( /** * Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection * again after the ban period have expired. + * + * @author Jonas Bonér */ class BannagePeriodFailureDetector( - initialConnections: Map[InetSocketAddress, ActorRef], + initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], timeToBan: Duration) extends RemoteFailureDetectorBase(initialConnections) {