Created NetworkEventStream Channel and Listener - an event bus for remote and cluster life-cycle events.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-09-09 14:33:35 +02:00
parent 38c2fe1894
commit 90525bdec3
2 changed files with 105 additions and 137 deletions

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.{ Actor, ActorRef, Props }
import Actor._
import akka.dispatch.PinnedDispatcher
import scala.collection.mutable
import java.net.InetSocketAddress
/**
* Stream of all kinds of network events, remote failure and connection events, cluster failure and connection events etc.
* Also provides API for channel listener management.
*/
object NetworkEventStream {
private sealed trait NetworkEventStreamEvent
private case class Register(listener: Listener, connectionAddress: InetSocketAddress)
extends NetworkEventStreamEvent
private case class Unregister(listener: Listener, connectionAddress: InetSocketAddress)
extends NetworkEventStreamEvent
/**
* Base trait for network event listener.
*/
trait Listener {
def notify(event: RemoteLifeCycleEvent)
}
/**
* Channel actor with a registry of listeners.
*/
private class Channel extends Actor {
val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[Listener]]() {
override def default(k: InetSocketAddress) = mutable.Set.empty[Listener]
}
def receive = {
case event: RemoteClientLifeCycleEvent
listeners(event.remoteAddress) foreach (_ notify event)
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent
case Register(listener, connectionAddress)
listeners(connectionAddress) += listener
case Unregister(listener, connectionAddress)
listeners(connectionAddress) -= listener
case _ //ignore other
}
}
private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true))
/**
* Registers a network event stream listener (asyncronously).
*/
def register(listener: Listener, connectionAddress: InetSocketAddress) =
channel ! Register(listener, connectionAddress)
/**
* Unregisters a network event stream listener (asyncronously) .
*/
def unregister(listener: Listener, connectionAddress: InetSocketAddress) =
channel ! Unregister(listener, connectionAddress)
}

View file

@ -9,7 +9,6 @@ import Actor._
import akka.cluster._
import akka.routing._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
import akka.util.{ ListenerManagement, Duration }
import scala.collection.immutable.Map
@ -20,58 +19,12 @@ import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import System.{ currentTimeMillis newTimestamp }
/**
* Holds error event channel Actor instance and provides API for channel listener management.
*/
object RemoteFailureDetector {
private sealed trait RemoteFailureDetectorChannelEvent
private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
extends RemoteFailureDetectorChannelEvent
private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
extends RemoteFailureDetectorChannelEvent
private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true))
def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
channel ! Register(listener, connectionAddress)
def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
channel ! Unregister(listener, connectionAddress)
private class Channel extends Actor {
val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() {
override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener]
}
def receive = {
case event: RemoteClientLifeCycleEvent
listeners(event.remoteAddress) foreach (_ notify event)
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent
case Register(listener, connectionAddress)
listeners(connectionAddress) += listener
case Unregister(listener, connectionAddress)
listeners(connectionAddress) -= listener
case _ //ignore other
}
}
}
/**
* Base class for remote failure detection management.
*/
abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef])
extends FailureDetector
with RemoteFailureListener {
// import ClusterActorRef._
with NetworkEventStream.Listener {
type T <: AnyRef
@ -90,7 +43,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
}
/**
* State factory. To be defined by subclass that wants to add extra info in the 'meta: Option[T]' field.
* State factory. To be defined by subclass that wants to add extra info in the 'meta: T' field.
*/
protected def newState(): State
@ -198,21 +151,20 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector(
def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {}
override def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
def notify(event: RemoteLifeCycleEvent) = event match {
case RemoteClientWriteFailed(request, cause, client, connectionAddress)
removeConnection(connectionAddress)
override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
case RemoteClientError(cause, client, connectionAddress)
removeConnection(connectionAddress)
override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
case RemoteClientDisconnected(client, connectionAddress)
removeConnection(connectionAddress)
override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
case RemoteClientShutdown(client, connectionAddress)
removeConnection(connectionAddress)
case _ {}
}
private def removeConnection(connectionAddress: InetSocketAddress) =
@ -290,39 +242,36 @@ class BannagePeriodFailureDetector(
}
// ===================================================================================
// RemoteFailureListener callbacks
// NetworkEventStream.Listener callback
// ===================================================================================
override def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordSuccess(connectionAddress, newTimestamp)
}
def notify(event: RemoteLifeCycleEvent) = event match {
case RemoteClientStarted(client, connectionAddress)
recordSuccess(connectionAddress, newTimestamp)
override def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordSuccess(connectionAddress, newTimestamp)
}
case RemoteClientConnected(client, connectionAddress)
recordSuccess(connectionAddress, newTimestamp)
override def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
case RemoteClientWriteFailed(request, cause, client, connectionAddress)
recordFailure(connectionAddress, newTimestamp)
override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
case RemoteClientError(cause, client, connectionAddress)
recordFailure(connectionAddress, newTimestamp)
override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
case RemoteClientDisconnected(client, connectionAddress)
recordFailure(connectionAddress, newTimestamp)
override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
case RemoteClientShutdown(client, connectionAddress)
recordFailure(connectionAddress, newTimestamp)
case _ {}
}
}
/**
* Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections.
*
* class CircuitBreakerRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef])
* class CircuitBreakerNetworkEventStream.Listener(initialConnections: Map[InetSocketAddress, ActorRef])
* extends RemoteFailureDetectorBase(initialConnections) {
*
* def newState() = State(Long.MinValue, initialConnections, None)
@ -333,61 +282,6 @@ class BannagePeriodFailureDetector(
*
* def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {}
*
* // FIXME implement CircuitBreakerRemoteFailureListener
* // FIXME implement CircuitBreakerNetworkEventStream.Listener
* }
*/
/**
* Base trait for remote failure event listener.
*/
trait RemoteFailureListener {
final private[akka] def notify(event: RemoteLifeCycleEvent) = event match {
case RemoteClientStarted(client, connectionAddress)
remoteClientStarted(client, connectionAddress)
case RemoteClientConnected(client, connectionAddress)
remoteClientConnected(client, connectionAddress)
case RemoteClientWriteFailed(request, cause, client, connectionAddress)
remoteClientWriteFailed(request, cause, client, connectionAddress)
case RemoteClientError(cause, client, connectionAddress)
remoteClientError(cause, client, connectionAddress)
case RemoteClientDisconnected(client, connectionAddress)
remoteClientDisconnected(client, connectionAddress)
case RemoteClientShutdown(client, connectionAddress)
remoteClientShutdown(client, connectionAddress)
case RemoteServerWriteFailed(request, cause, server, clientAddress)
remoteServerWriteFailed(request, cause, server, clientAddress)
case RemoteServerError(cause, server)
remoteServerError(cause, server)
case RemoteServerShutdown(server)
remoteServerShutdown(server)
}
def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteServerWriteFailed(
request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {}
def remoteServerError(cause: Throwable, server: RemoteServerModule) {}
def remoteServerShutdown(server: RemoteServerModule) {}
}