diff --git a/akka-remote/src/main/scala/akka/cluster/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/cluster/NetworkEventStream.scala new file mode 100644 index 0000000000..1b1b7d2f7e --- /dev/null +++ b/akka-remote/src/main/scala/akka/cluster/NetworkEventStream.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Actor, ActorRef, Props } +import Actor._ +import akka.dispatch.PinnedDispatcher + +import scala.collection.mutable + +import java.net.InetSocketAddress + +/** + * Stream of all kinds of network events, remote failure and connection events, cluster failure and connection events etc. + * Also provides API for channel listener management. + */ +object NetworkEventStream { + + private sealed trait NetworkEventStreamEvent + + private case class Register(listener: Listener, connectionAddress: InetSocketAddress) + extends NetworkEventStreamEvent + + private case class Unregister(listener: Listener, connectionAddress: InetSocketAddress) + extends NetworkEventStreamEvent + + /** + * Base trait for network event listener. + */ + trait Listener { + def notify(event: RemoteLifeCycleEvent) + } + + /** + * Channel actor with a registry of listeners. + */ + private class Channel extends Actor { + + val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[Listener]]() { + override def default(k: InetSocketAddress) = mutable.Set.empty[Listener] + } + + def receive = { + case event: RemoteClientLifeCycleEvent ⇒ + listeners(event.remoteAddress) foreach (_ notify event) + + case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent + + case Register(listener, connectionAddress) ⇒ + listeners(connectionAddress) += listener + + case Unregister(listener, connectionAddress) ⇒ + listeners(connectionAddress) -= listener + + case _ ⇒ //ignore other + } + } + + private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true)) + + /** + * Registers a network event stream listener (asyncronously). + */ + def register(listener: Listener, connectionAddress: InetSocketAddress) = + channel ! Register(listener, connectionAddress) + + /** + * Unregisters a network event stream listener (asyncronously) . + */ + def unregister(listener: Listener, connectionAddress: InetSocketAddress) = + channel ! Unregister(listener, connectionAddress) +} diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala index 09cf65d88c..67eb7971c8 100644 --- a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala @@ -9,7 +9,6 @@ import Actor._ import akka.cluster._ import akka.routing._ import akka.event.EventHandler -import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } import akka.util.{ ListenerManagement, Duration } import scala.collection.immutable.Map @@ -20,58 +19,12 @@ import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import System.{ currentTimeMillis ⇒ newTimestamp } -/** - * Holds error event channel Actor instance and provides API for channel listener management. - */ -object RemoteFailureDetector { - - private sealed trait RemoteFailureDetectorChannelEvent - - private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) - extends RemoteFailureDetectorChannelEvent - - private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) - extends RemoteFailureDetectorChannelEvent - - private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true)) - - def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = - channel ! Register(listener, connectionAddress) - - def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = - channel ! Unregister(listener, connectionAddress) - - private class Channel extends Actor { - - val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() { - override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener] - } - - def receive = { - case event: RemoteClientLifeCycleEvent ⇒ - listeners(event.remoteAddress) foreach (_ notify event) - - case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent - - case Register(listener, connectionAddress) ⇒ - listeners(connectionAddress) += listener - - case Unregister(listener, connectionAddress) ⇒ - listeners(connectionAddress) -= listener - - case _ ⇒ //ignore other - } - } -} - /** * Base class for remote failure detection management. */ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector - with RemoteFailureListener { - - // import ClusterActorRef._ + with NetworkEventStream.Listener { type T <: AnyRef @@ -90,7 +43,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre } /** - * State factory. To be defined by subclass that wants to add extra info in the 'meta: Option[T]' field. + * State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field. */ protected def newState(): State @@ -198,21 +151,20 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector( def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} - override def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { - removeConnection(connectionAddress) - } + def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ + removeConnection(connectionAddress) - override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { - removeConnection(connectionAddress) - } + case RemoteClientError(cause, client, connectionAddress) ⇒ + removeConnection(connectionAddress) - override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - removeConnection(connectionAddress) - } + case RemoteClientDisconnected(client, connectionAddress) ⇒ + removeConnection(connectionAddress) - override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - removeConnection(connectionAddress) + case RemoteClientShutdown(client, connectionAddress) ⇒ + removeConnection(connectionAddress) + + case _ ⇒ {} } private def removeConnection(connectionAddress: InetSocketAddress) = @@ -290,39 +242,36 @@ class BannagePeriodFailureDetector( } // =================================================================================== - // RemoteFailureListener callbacks + // NetworkEventStream.Listener callback // =================================================================================== - override def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordSuccess(connectionAddress, newTimestamp) - } + def notify(event: RemoteLifeCycleEvent) = event match { + case RemoteClientStarted(client, connectionAddress) ⇒ + recordSuccess(connectionAddress, newTimestamp) - override def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordSuccess(connectionAddress, newTimestamp) - } + case RemoteClientConnected(client, connectionAddress) ⇒ + recordSuccess(connectionAddress, newTimestamp) - override def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordFailure(connectionAddress, newTimestamp) - } + case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) - override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordFailure(connectionAddress, newTimestamp) - } + case RemoteClientError(cause, client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) - override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordFailure(connectionAddress, newTimestamp) - } + case RemoteClientDisconnected(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) - override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) { - recordFailure(connectionAddress, newTimestamp) + case RemoteClientShutdown(client, connectionAddress) ⇒ + recordFailure(connectionAddress, newTimestamp) + + case _ ⇒ {} } } /** * Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections. * - * class CircuitBreakerRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef]) + * class CircuitBreakerNetworkEventStream.Listener(initialConnections: Map[InetSocketAddress, ActorRef]) * extends RemoteFailureDetectorBase(initialConnections) { * * def newState() = State(Long.MinValue, initialConnections, None) @@ -333,61 +282,6 @@ class BannagePeriodFailureDetector( * * def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {} * - * // FIXME implement CircuitBreakerRemoteFailureListener + * // FIXME implement CircuitBreakerNetworkEventStream.Listener * } */ - -/** - * Base trait for remote failure event listener. - */ -trait RemoteFailureListener { - - final private[akka] def notify(event: RemoteLifeCycleEvent) = event match { - case RemoteClientStarted(client, connectionAddress) ⇒ - remoteClientStarted(client, connectionAddress) - - case RemoteClientConnected(client, connectionAddress) ⇒ - remoteClientConnected(client, connectionAddress) - - case RemoteClientWriteFailed(request, cause, client, connectionAddress) ⇒ - remoteClientWriteFailed(request, cause, client, connectionAddress) - - case RemoteClientError(cause, client, connectionAddress) ⇒ - remoteClientError(cause, client, connectionAddress) - - case RemoteClientDisconnected(client, connectionAddress) ⇒ - remoteClientDisconnected(client, connectionAddress) - - case RemoteClientShutdown(client, connectionAddress) ⇒ - remoteClientShutdown(client, connectionAddress) - - case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ - remoteServerWriteFailed(request, cause, server, clientAddress) - - case RemoteServerError(cause, server) ⇒ - remoteServerError(cause, server) - - case RemoteServerShutdown(server) ⇒ - remoteServerShutdown(server) - } - - def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientWriteFailed( - request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {} - - def remoteServerWriteFailed( - request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {} - - def remoteServerError(cause: Throwable, server: RemoteServerModule) {} - - def remoteServerShutdown(server: RemoteServerModule) {} -}