Merge with master

This commit is contained in:
Viktor Klang 2011-09-09 12:00:19 +02:00
commit 8a7eacb3c6
12 changed files with 1465 additions and 199 deletions

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import akka.util.duration._
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.{ TimeUnit, CountDownLatch }
class CircuitBreakerSpec extends WordSpec with MustMatchers {
class TestClass(cb: CircuitBreaker) {
def doNothing = {
cb {
// do nothing
}
}
def throwException: Long = {
cb {
throw new java.lang.IllegalArgumentException
}
}
}
val openLatch = new CountDownLatch(1)
val closeLatch = new CountDownLatch(1)
val halfOpenLatch = new CountDownLatch(1)
val cb = CircuitBreaker(CircuitBreaker.Config(100.millis, 10))
cb onOpen {
openLatch.countDown()
} onClose {
closeLatch.countDown()
} onHalfOpen {
halfOpenLatch.countDown()
}
"A CircuitBreaker" must {
"remain closed (no exceptions are thrown)" in {
for (i 1 to 20) {
new TestClass(cb).doNothing
}
}
"be changing states correctly" in {
/**
* 10 failures throwing IllegalArgumentException
*/
for (i 1 to 10) {
evaluating { (new TestClass(cb).throwException) } must produce[java.lang.IllegalArgumentException]
}
/**
* Should be OPEN.
*/
for (i 1 to 10) {
intercept[IllegalArgumentException](new TestClass(cb).throwException)
assert(openLatch.await(30, TimeUnit.SECONDS) === true)
}
/**
* Sleep for more than 100ms
*/
Thread.sleep(200)
/**
* Should be HALF OPEN after 100 millis timeout.
*/
intercept[IllegalArgumentException](new TestClass(cb).throwException)
assert(halfOpenLatch.await(30, TimeUnit.SECONDS) === true)
/**
* Should be OPEN again.
*/
for (i 1 to 10) {
intercept[IllegalArgumentException](new TestClass(cb).throwException)
assert(openLatch.await(30, TimeUnit.SECONDS) === true)
}
}
}
}

View file

@ -272,7 +272,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
def ask(message: AnyRef, sender: ActorRef): Future[AnyRef] = ask(message, timeout, sender)
/**
* Akka Java API. <p/>
* Akka Java API. <p/>
* Sends a message asynchronously returns a future holding the eventual reply message.
* <p/>
* <b>NOTE:</b>

View file

@ -17,21 +17,34 @@ class RemoteEventHandler extends Actor {
def receive = {
// client
case RemoteClientError(cause, client, address) EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
case RemoteClientWriteFailed(request, cause, client, address) EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString))
case RemoteClientDisconnected(client, address) EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
case RemoteClientConnected(client, address) EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
case RemoteClientStarted(client, address) EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)
case RemoteClientShutdown(client, address) EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString)
case RemoteClientError(cause, client, address)
EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
case RemoteClientWriteFailed(request, cause, client, address)
EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString))
case RemoteClientDisconnected(client, address)
EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
case RemoteClientConnected(client, address)
EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
case RemoteClientStarted(client, address)
EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)
case RemoteClientShutdown(client, address)
EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString)
// server
case RemoteServerError(cause, server) EventHandler.error(cause, server, "RemoteServerError")
case RemoteServerWriteFailed(request, cause, server, clientAddress) EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString))
case RemoteServerStarted(server) EventHandler.info(server, "RemoteServerStarted")
case RemoteServerShutdown(server) EventHandler.info(server, "RemoteServerShutdown")
case RemoteServerClientConnected(server, clientAddress) EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientDisconnected(server, clientAddress) EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientClosed(server, clientAddress) EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString)
case RemoteServerError(cause, server)
EventHandler.error(cause, server, "RemoteServerError")
case RemoteServerWriteFailed(request, cause, server, clientAddress)
EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString))
case RemoteServerStarted(server)
EventHandler.info(server, "RemoteServerStarted")
case RemoteServerShutdown(server)
EventHandler.info(server, "RemoteServerShutdown")
case RemoteServerClientConnected(server, clientAddress)
EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientDisconnected(server, clientAddress)
EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString)
case RemoteServerClientClosed(server, clientAddress)
EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString)
case _ //ignore other
}

View file

@ -104,18 +104,23 @@ case class RemoteClientError(
@BeanProperty cause: Throwable,
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty client: RemoteClientModule,
@BeanProperty remoteAddress: InetSocketAddress) extends RemoteClientLifeCycleEvent
case class RemoteClientWriteFailed(
@BeanProperty request: AnyRef,
@BeanProperty cause: Throwable,

View file

@ -531,9 +531,9 @@ sealed trait Future[+T] extends japi.Future[T] {
* a valid result then the new Future will contain the same.
* Example:
* <pre>
* Future(6 / 0) failure { case e: ArithmeticException 0 } // result: 0
* Future(6 / 0) failure { case e: NotFoundException 0 } // result: exception
* Future(6 / 2) failure { case e: ArithmeticException 0 } // result: 3
* Future(6 / 0) recover { case e: ArithmeticException 0 } // result: 0
* Future(6 / 0) recover { case e: NotFoundException 0 } // result: exception
* Future(6 / 2) recover { case e: ArithmeticException 0 } // result: 3
* </pre>
*/
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {

View file

@ -0,0 +1,127 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import akka.AkkaException
import akka.actor._
import akka.event.EventHandler
import akka.config.ConfigurationException
import akka.actor.UntypedChannel._
import akka.dispatch.{ Future, Futures }
import akka.util.ReflectiveAccess
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import scala.annotation.tailrec
/**
* Misc helper and factory methods for failure detection.
*/
object FailureDetector {
def createCustomFailureDetector(
implClass: String,
connections: Map[InetSocketAddress, ActorRef]): FailureDetector = {
ReflectiveAccess.createInstance(
implClass,
Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]),
Array[AnyRef](connections)) match {
case Right(actor) actor
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException(
"Could not instantiate custom FailureDetector of [" +
implClass + "] due to: " +
cause, cause)
}
}
}
/**
* The FailureDetector acts like a middleman between the Router and
* the actor reference that does the routing and can dectect and act upon failure.
*
* Through the FailureDetector:
* <ol>
* <li>
* the actor ref can signal that something has changed in the known set of connections. The Router can see
* when a changed happened (by checking the version) and update its internal datastructures.
* </li>
* <li>
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
* </li>
* </ol>
*/
trait FailureDetector {
/**
* Returns true if the 'connection' is considered available.
*/
def isAvailable(connection: InetSocketAddress): Boolean
/**
* Records a successful connection.
*/
def recordSuccess(connection: InetSocketAddress, timestamp: Long)
/**
* Records a failed connection.
*/
def recordFailure(connection: InetSocketAddress, timestamp: Long)
/**
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
* able to update its internal datastructures.
*/
def version: Long
/**
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and size and version.
*/
def size: Int
/**
* Stops all managed actors
*/
def stopAll()
/**
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
* the time element, also the version is included to be able to read the data (the connections) and the version
* in an atomic manner.
*
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
* view of some set of connections.
*/
def versionedIterable: VersionedIterable[ActorRef]
/**
* A callback that can be used to indicate that a connected actorRef was dead.
* <p/>
* Implementations should make sure that this method can be called without the actorRef being part of the
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
* not working.
*
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
* made how long this takes.
*
* @param ref the dead
*/
def remove(deadRef: ActorRef)
/**
* Fails over connections from one address to another.
*/
def failOver(from: InetSocketAddress, to: InetSocketAddress)
}

View file

@ -70,89 +70,6 @@ trait VersionedIterable[A] {
*/
class RoutingException(message: String) extends AkkaException(message)
/**
* Misc helper and factory methods for failure detection.
*/
object FailureDetector {
def createCustomFailureDetector(implClass: String, connections: Map[InetSocketAddress, ActorRef]): FailureDetector = {
ReflectiveAccess.createInstance(implClass, Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), Array[AnyRef](connections)) match {
case Right(actor) actor
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException("Could not instantiate custom FailureDetector of [" + implClass + "] due to: " + cause, cause)
}
}
}
/**
* The FailureDetector acts like a middleman between the Router and the actor reference that does the routing
* and can dectect and act upon failur.
*
* Through the FailureDetector:
* <ol>
* <li>
* the actor ref can signal that something has changed in the known set of connections. The Router can see
* when a changed happened (by checking the version) and update its internal datastructures.
* </li>
* <li>
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
* </li>
* </ol>
*/
trait FailureDetector {
/**
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
* able to update its internal datastructures.
*/
def version: Long
/**
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
* with an atomic read of and size and version.
*/
def size: Int
/**
* Stops all managed actors
*/
def stopAll()
/**
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
* the time element, also the version is included to be able to read the data (the connections) and the version
* in an atomic manner.
*
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
* view of some set of connections.
*/
def versionedIterable: VersionedIterable[ActorRef]
/**
* A callback that can be used to indicate that a connected actorRef was dead.
* <p/>
* Implementations should make sure that this method can be called without the actorRef being part of the
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
* not working.
*
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
* made how long this takes.
*
* @param ref the dead
*/
def remove(deadRef: ActorRef)
/**
* Fails over connections from one address to another.
*/
def failOver(from: InetSocketAddress, to: InetSocketAddress)
}
/**
* Default "local" failure detector. This failure detector removes an actor from the
* router if an exception occured in the router's thread (e.g. when trying to add
@ -160,7 +77,7 @@ trait FailureDetector {
*/
class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector {
case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
case class State(version: Long, iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
private val state = new AtomicReference[State]
@ -169,6 +86,13 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector
state.set(State(Long.MinValue, connectionIterable))
}
def isAvailable(connection: InetSocketAddress): Boolean =
state.get.iterable.find(c connection == c).isDefined
def recordSuccess(connection: InetSocketAddress, timestamp: Long) {}
def recordFailure(connection: InetSocketAddress, timestamp: Long) {}
def version: Long = state.get.version
def size: Int = state.get.iterable.size
@ -323,7 +247,6 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends Abstrac
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
* FIXME: this is also the location where message buffering should be done in case of failure.
*/
trait BasicRouter extends Router {
@ -340,7 +263,7 @@ trait BasicRouter extends Router {
//it is a broadcast message, we are going to send to message to all connections.
connections.versionedIterable.iterable.foreach(actor
try {
actor.!(message)(sender)
actor.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(actor)
@ -351,7 +274,7 @@ trait BasicRouter extends Router {
next match {
case Some(actor)
try {
actor.!(message)(sender)
actor.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(actor)
@ -364,12 +287,13 @@ trait BasicRouter extends Router {
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
case Routing.Broadcast(message)
throw new RoutingException("Broadcasting using '?' is for the time being is not supported. Use ScatterGatherRouter.")
throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
case Some(actor)
try {
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Exception
@ -404,6 +328,7 @@ class DirectRouter extends BasicRouter {
if (currentState.ref == null) None else Some(currentState.ref)
}
// FIXME rename all 'getState' methods to 'currentState', non-scala
@tailrec
private def getState: DirectRouterState = {
val currentState = state.get

View file

@ -0,0 +1,241 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import System._
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
// NOTE: This CircuitBreaker is work-in-progress and does NOT work as normal ones.
// It is not meant to be used as a regular CircuitBreaker, since it does not
// prevent using the failed connection but only calls the registered call-backs.
// Is meant to be changed a bit and used in the FailurDetector later.
/**
* <pre>
* // Create the CircuitBreaker
* val circuitBreaker =
* CircuitBreaker(CircuitBreaker.Config(60.seconds, 5))
*
* // Configure the CircuitBreaker actions
* circuitBreaker
* onOpen {
* ...
* } onClose {
* ...
* } onHalfOpen {
* ...
* }
*
* circuitBreaker {
* ...
* }
* </pre>
*/
object CircuitBreaker {
case class Config(timeout: Duration, failureThreshold: Int)
private[akka] def apply(implicit config: Config): CircuitBreaker = new CircuitBreaker(config)
}
class CircuitBreaker private (val config: CircuitBreaker.Config) {
import CircuitBreaker._
private object InternalState {
def apply(circuitBreaker: CircuitBreaker): InternalState =
InternalState(
0L,
Closed(circuitBreaker),
circuitBreaker.config.timeout.toMillis,
circuitBreaker.config.failureThreshold,
0L,
0)
}
/**
* Represents the internal state of the CircuitBreaker.
*/
private case class InternalState(
version: Long,
state: CircuitBreakerState,
timeout: Long,
failureThreshold: Int,
tripTime: Long,
failureCount: Int,
onOpenListeners: List[() Unit] = Nil,
onCloseListeners: List[() Unit] = Nil,
onHalfOpenListeners: List[() Unit] = Nil)
private[akka] trait CircuitBreakerState {
val circuitBreaker: CircuitBreaker
def onError(e: Throwable)
def preInvoke()
def postInvoke()
}
/**
* CircuitBreaker is CLOSED, normal operation.
*/
private[akka] case class Closed(circuitBreaker: CircuitBreaker) extends CircuitBreakerState {
def onError(e: Throwable) = {
circuitBreaker.incrementFailureCount()
val currentCount = circuitBreaker.failureCount
val threshold = circuitBreaker.failureThreshold
if (currentCount >= threshold) circuitBreaker.trip()
}
def preInvoke() {}
def postInvoke() { circuitBreaker.resetFailureCount() }
}
/**
* CircuitBreaker is OPEN. Calls are failing fast.
*/
private[akka] case class Open(circuitBreaker: CircuitBreaker) extends CircuitBreakerState {
def onError(e: Throwable) {}
def preInvoke() {
val now = currentTimeMillis
val elapsed = now - circuitBreaker.tripTime
if (elapsed <= circuitBreaker.timeout)
circuitBreaker.notifyOpen()
circuitBreaker.attemptReset()
}
def postInvoke() {}
}
/**
* CircuitBreaker is HALF OPEN. Calls are still failing after timeout.
*/
private[akka] case class HalfOpen(circuitBreaker: CircuitBreaker) extends CircuitBreakerState {
def onError(e: Throwable) {
circuitBreaker.trip()
circuitBreaker.notifyHalfOpen()
}
def preInvoke() {}
def postInvoke() { circuitBreaker.reset() }
}
private val ref = new AtomicReference(InternalState(this))
def timeout = ref.get.timeout
def failureThreshold = ref.get.failureThreshold
def failureCount = ref.get.failureCount
def tripTime = ref.get.tripTime
@tailrec
final def incrementFailureCount() {
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
failureCount = oldState.failureCount + 1)
if (!ref.compareAndSet(oldState, newState)) incrementFailureCount()
}
@tailrec
final def reset() {
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
failureCount = 0,
state = Closed(this))
if (!ref.compareAndSet(oldState, newState)) reset()
}
@tailrec
final def resetFailureCount() {
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
failureCount = 0)
if (!ref.compareAndSet(oldState, newState)) resetFailureCount()
}
@tailrec
final def attemptReset() {
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
state = HalfOpen(this))
if (!ref.compareAndSet(oldState, newState)) attemptReset()
}
@tailrec
final def trip() {
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
state = Open(this),
tripTime = currentTimeMillis)
if (!ref.compareAndSet(oldState, newState)) trip()
}
def apply[T](body: T): T = {
val oldState = ref.get
oldState.state.preInvoke()
try {
val ret = body
oldState.state.postInvoke()
ret
} catch {
case e: Throwable
oldState.state.onError(e)
throw e
}
}
@tailrec
final def onClose(body: Unit): CircuitBreaker = {
val f = () body
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
onCloseListeners = f :: oldState.onCloseListeners)
if (!ref.compareAndSet(oldState, newState)) onClose(f)
else this
}
@tailrec
final def onOpen(body: Unit): CircuitBreaker = {
val f = () body
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
onOpenListeners = f :: oldState.onOpenListeners)
if (!ref.compareAndSet(oldState, newState)) onOpen(() f)
else this
}
@tailrec
final def onHalfOpen(body: Unit): CircuitBreaker = {
val f = () body
val oldState = ref.get
val newState = oldState copy (version = oldState.version + 1,
onHalfOpenListeners = f :: oldState.onHalfOpenListeners)
if (!ref.compareAndSet(oldState, newState)) onHalfOpen(() f)
else this
}
def notifyOpen() {
ref.get.onOpenListeners foreach (f f())
}
def notifyHalfOpen() {
ref.get.onHalfOpenListeners foreach (f f())
}
def notifyClosed() {
ref.get.onCloseListeners foreach (f f())
}
}

View file

@ -696,7 +696,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -1490,7 +1490,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2862,7 +2862,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -3337,7 +3337,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -4164,7 +4164,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -5530,7 +5530,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -6038,7 +6038,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -6503,7 +6503,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -7050,7 +7050,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -7483,7 +7483,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -7886,7 +7886,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -8289,7 +8289,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -8759,7 +8759,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -9304,7 +9304,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -10117,7 +10117,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}

View file

@ -301,7 +301,7 @@ class DefaultClusterNode private[akka] (
val remote = new akka.cluster.netty.NettyRemoteSupport
remote.start(hostname, port)
remote.register(RemoteClusterDaemon.Address, remoteDaemon)
remote.addListener(RemoteFailureDetector.registry)
remote.addListener(RemoteFailureDetector.channel)
remote.addListener(remoteClientLifeCycleHandler)
remote
}
@ -428,7 +428,7 @@ class DefaultClusterNode private[akka] (
remoteService.shutdown() // shutdown server
RemoteFailureDetector.registry.stop()
RemoteFailureDetector.channel.stop()
remoteClientLifeCycleHandler.stop()
remoteDaemon.stop()

View file

@ -10,63 +10,112 @@ import akka.cluster._
import akka.routing._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
import akka.util.ListenerManagement
import akka.util.{ ListenerManagement, Duration }
import scala.collection.mutable.{ HashMap, Set }
import scala.collection.immutable.Map
import scala.collection.mutable
import scala.annotation.tailrec
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import System.{ currentTimeMillis newTimestamp }
/**
* Holds error event channel Actor instance and provides API for channel listener management.
*/
object RemoteFailureDetector {
private sealed trait FailureDetectorEvent
private case class Register(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent
private case class Unregister(strategy: RemoteFailureListener, address: InetSocketAddress) extends FailureDetectorEvent
private sealed trait RemoteFailureDetectorChannelEvent
private[akka] val registry = new LocalActorRef(Props[Registry].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true)
private case class Register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
extends RemoteFailureDetectorChannelEvent
def register(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Register(strategy, address)
private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress)
extends RemoteFailureDetectorChannelEvent
def unregister(strategy: RemoteFailureListener, address: InetSocketAddress) = registry ! Unregister(strategy, address)
private[akka] val channel = new LocalActorRef(Props[Channel].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true)
private class Registry extends Actor {
def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
channel ! Register(listener, connectionAddress)
val strategies = new HashMap[InetSocketAddress, Set[RemoteFailureListener]]() {
override def default(k: InetSocketAddress) = Set.empty[RemoteFailureListener]
def unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) =
channel ! Unregister(listener, connectionAddress)
private class Channel extends Actor {
val listeners = new mutable.HashMap[InetSocketAddress, mutable.Set[RemoteFailureListener]]() {
override def default(k: InetSocketAddress) = mutable.Set.empty[RemoteFailureListener]
}
def receive = {
case event: RemoteClientLifeCycleEvent
strategies(event.remoteAddress) foreach (_ notify event)
listeners(event.remoteAddress) foreach (_ notify event)
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent
case Register(strategy, address)
strategies(address) += strategy
case Register(listener, connectionAddress)
listeners(connectionAddress) += listener
case Unregister(strategy, address)
strategies(address) -= strategy
case Unregister(listener, connectionAddress)
listeners(connectionAddress) -= listener
case _ //ignore other
}
}
}
abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector {
/**
* Base class for remote failure detection management.
*/
abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddress, ActorRef])
extends FailureDetector
with RemoteFailureListener {
import ClusterActorRef._
case class State(val version: Long = Integer.MIN_VALUE, val connections: Map[InetSocketAddress, ActorRef]) extends VersionedIterable[ActorRef] {
type T <: AnyRef
protected case class State(
version: Long,
connections: Map[InetSocketAddress, ActorRef],
meta: T = null.asInstanceOf[T])
extends VersionedIterable[ActorRef] {
def iterable: Iterable[ActorRef] = connections.values
}
// type C
protected val state: AtomicReference[State] = {
val ref = new AtomicReference[State]
ref set newState()
ref
}
private val state = new AtomicReference[State]()
/**
* State factory. To be defined by subclass that wants to add extra info in the 'meta: Option[T]' field.
*/
protected def newState(): State
state.set(State(Long.MinValue, initialConnections))
/**
* Returns true if the 'connection' is considered available.
*
* To be implemented by subclass.
*/
def isAvailable(connectionAddress: InetSocketAddress): Boolean
def version: Long = state.get().version
/**
* Records a successful connection.
*
* To be implemented by subclass.
*/
def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long)
/**
* Records a failed connection.
*
* To be implemented by subclass.
*/
def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long)
def version: Long = state.get.version
def versionedIterable = state.get
@ -75,7 +124,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
def connections: Map[InetSocketAddress, ActorRef] = state.get.connections
def stopAll() {
state.get().iterable foreach (_.stop()) // shut down all remote connections
state.get.iterable foreach (_.stop()) // shut down all remote connections
}
@tailrec
@ -84,6 +133,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
val oldState = state.get
var changed = false
val newMap = oldState.connections map {
case (`from`, actorRef)
changed = true
@ -94,7 +144,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
if (changed) {
//there was a state change, so we are now going to update the state.
val newState = State(oldState.version + 1, newMap)
val newState = oldState copy (version = oldState.version + 1, connections = newMap)
//if we are not able to update, the state, we are going to try again.
if (!state.compareAndSet(oldState, newState)) failOver(from, to)
@ -106,7 +156,6 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
EventHandler.debug(this, "ClusterActorRef remove [%s]".format(faultyConnection.uuid))
val oldState = state.get()
var changed = false
//remote the faultyConnection from the clustered-connections.
@ -122,7 +171,7 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
if (changed) {
//one or more occurrances of the actorRef were removed, so we need to update the state.
val newState = State(oldState.version + 1, newConnections)
val newState = oldState copy (version = oldState.version + 1, connections = newConnections)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) remove(faultyConnection)
@ -130,37 +179,206 @@ abstract class RemoteFailureDetectorBase(initialConnections: Map[InetSocketAddre
}
}
/**
* Simple failure detector that removes the failing connection permanently on first error.
*/
class RemoveConnectionOnFirstFailureRemoteFailureDetector(
initialConnections: Map[InetSocketAddress, ActorRef])
extends RemoteFailureDetectorBase(initialConnections) {
protected def newState() = State(Long.MinValue, initialConnections)
def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined
def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {}
def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {}
override def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
removeConnection(connectionAddress)
}
private def removeConnection(connectionAddress: InetSocketAddress) =
connections.get(connectionAddress) foreach { conn remove(conn) }
}
/**
* Failure detector that bans the failing connection for 'timeToBan: Duration' and will try to use the connection
* again after the ban period have expired.
*/
class BannagePeriodFailureDetector(
initialConnections: Map[InetSocketAddress, ActorRef],
timeToBan: Duration)
extends RemoteFailureDetectorBase(initialConnections) {
// FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired
type T = Map[InetSocketAddress, BannedConnection]
case class BannedConnection(bannedSince: Long, connection: ActorRef)
val timeToBanInMillis = timeToBan.toMillis
protected def newState() =
State(Long.MinValue, initialConnections, Map.empty[InetSocketAddress, BannedConnection])
private def removeConnection(connectionAddress: InetSocketAddress) =
connections.get(connectionAddress) foreach { conn remove(conn) }
// ===================================================================================
// FailureDetector callbacks
// ===================================================================================
def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined
@tailrec
final def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {
val oldState = state.get
val bannedConnection = oldState.meta.get(connectionAddress)
if (bannedConnection.isDefined) {
val BannedConnection(bannedSince, connection) = bannedConnection.get
val currentlyBannedFor = newTimestamp - bannedSince
if (currentlyBannedFor > timeToBanInMillis) {
// ban time has expired - add connection to available connections
val newConnections = oldState.connections + (connectionAddress -> connection)
val newBannedConnections = oldState.meta - connectionAddress
val newState = oldState copy (version = oldState.version + 1,
connections = newConnections,
meta = newBannedConnections)
if (!state.compareAndSet(oldState, newState)) recordSuccess(connectionAddress, timestamp)
}
}
}
@tailrec
final def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {
val oldState = state.get
val connection = oldState.connections.get(connectionAddress)
if (connection.isDefined) {
val newConnections = oldState.connections - connectionAddress
val bannedConnection = BannedConnection(timestamp, connection.get)
val newBannedConnections = oldState.meta + (connectionAddress -> bannedConnection)
val newState = oldState copy (version = oldState.version + 1,
connections = newConnections,
meta = newBannedConnections)
if (!state.compareAndSet(oldState, newState)) recordFailure(connectionAddress, timestamp)
}
}
// ===================================================================================
// RemoteFailureListener callbacks
// ===================================================================================
override def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordSuccess(connectionAddress, newTimestamp)
}
override def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordSuccess(connectionAddress, newTimestamp)
}
override def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
override def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
override def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
override def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {
recordFailure(connectionAddress, newTimestamp)
}
}
/**
* Failure detector that uses the Circuit Breaker pattern to detect and recover from failing connections.
*
* class CircuitBreakerRemoteFailureListener(initialConnections: Map[InetSocketAddress, ActorRef])
* extends RemoteFailureDetectorBase(initialConnections) {
*
* def newState() = State(Long.MinValue, initialConnections, None)
*
* def isAvailable(connectionAddress: InetSocketAddress): Boolean = connections.get(connectionAddress).isDefined
*
* def recordSuccess(connectionAddress: InetSocketAddress, timestamp: Long) {}
*
* def recordFailure(connectionAddress: InetSocketAddress, timestamp: Long) {}
*
* // FIXME implement CircuitBreakerRemoteFailureListener
* }
*/
/**
* Base trait for remote failure event listener.
*/
trait RemoteFailureListener {
def notify(event: RemoteLifeCycleEvent) = event match {
case RemoteClientWriteFailed(request, cause, client, address)
remoteClientWriteFailed(request, cause, client, address)
println("--------->>> RemoteClientWriteFailed")
case RemoteClientError(cause, client, address)
println("--------->>> RemoteClientError")
remoteClientError(cause, client, address)
case RemoteClientDisconnected(client, address)
remoteClientDisconnected(client, address)
println("--------->>> RemoteClientDisconnected")
case RemoteClientShutdown(client, address)
remoteClientShutdown(client, address)
println("--------->>> RemoteClientShutdown")
final private[akka] def notify(event: RemoteLifeCycleEvent) = event match {
case RemoteClientStarted(client, connectionAddress)
remoteClientStarted(client, connectionAddress)
case RemoteClientConnected(client, connectionAddress)
remoteClientConnected(client, connectionAddress)
case RemoteClientWriteFailed(request, cause, client, connectionAddress)
remoteClientWriteFailed(request, cause, client, connectionAddress)
case RemoteClientError(cause, client, connectionAddress)
remoteClientError(cause, client, connectionAddress)
case RemoteClientDisconnected(client, connectionAddress)
remoteClientDisconnected(client, connectionAddress)
case RemoteClientShutdown(client, connectionAddress)
remoteClientShutdown(client, connectionAddress)
case RemoteServerWriteFailed(request, cause, server, clientAddress)
remoteServerWriteFailed(request, cause, server, clientAddress)
case RemoteServerError(cause, server)
remoteServerError(cause, server)
case RemoteServerShutdown(server)
remoteServerShutdown(server)
}
def remoteClientStarted(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientConnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {}
request: AnyRef, cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {}
def remoteClientError(cause: Throwable, client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {}
def remoteClientDisconnected(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {}
def remoteClientShutdown(client: RemoteClientModule, connectionAddress: InetSocketAddress) {}
def remoteServerWriteFailed(
request: AnyRef, cause: Throwable, server: RemoteServerModule, clientAddress: Option[InetSocketAddress]) {}
@ -169,37 +387,3 @@ trait RemoteFailureListener {
def remoteServerShutdown(server: RemoteServerModule) {}
}
class RemoveConnectionOnFirstFailureRemoteFailureDetector(initialConnections: Map[InetSocketAddress, ActorRef])
extends RemoteFailureDetectorBase(initialConnections)
with RemoteFailureListener {
override def remoteClientWriteFailed(
request: AnyRef, cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {
removeConnection(address)
}
override def remoteClientError(cause: Throwable, client: RemoteClientModule, address: InetSocketAddress) {
removeConnection(address)
}
override def remoteClientDisconnected(client: RemoteClientModule, address: InetSocketAddress) {
removeConnection(address)
}
override def remoteClientShutdown(client: RemoteClientModule, address: InetSocketAddress) {
removeConnection(address)
}
private def removeConnection(address: InetSocketAddress) =
connections.get(address) foreach { connection remove(connection) }
}
trait LinearBackoffRemoteFailureListener extends RemoteFailureListener {
}
trait ExponentialBackoffRemoteFailureListener extends RemoteFailureListener {
}
trait CircuitBreakerRemoteFailureListener extends RemoteFailureListener {
}

View file

@ -0,0 +1,682 @@
Clustering
==========
Overview
--------
The clustering module provides services like group membership, clustering, and
failover of actors.
The clustering module is based on ZooKeeper.
Starting up the ZooKeeper ensemble
----------------------------------
Embedded ZooKeeper server
~~~~~~~~~~~~~~~~~~~~~~~~~
For testing purposes the simplest way is to start up a single embedded ZooKeeper
server. This can be done like this:
.. literalinclude:: examples/clustering.scala
:language: scala
:lines: 1-2,5,2-4,7
You can leave ``port`` and ``tickTime`` out which will then default to port 2181
and tick time 5000 ms.
ZooKeeper server ensemble
~~~~~~~~~~~~~~~~~~~~~~~~~
For production you should always run an ensemble of at least 3 servers. The
number should be quorum-based, e.g. 3, 5, 7 etc.
|more| Read more about this in the `ZooKeeper Installation and Admin Guide
<http://hadoop.apache.org/zookeeper/docs/r3.1.1/zookeeperAdmin.htm>`_.
In the future Cloudy Akka Provisioning module will automate this.
Creating, starting and stopping a cluster node
----------------------------------------------
Once we have one or more ZooKeeper servers running we can create and start up a
cluster node.
Cluster configuration
~~~~~~~~~~~~~~~~~~~~~
Cluster is configured in the ``akka.cloud.cluster`` section in the ``akka.conf``
configuration file. Here you specify the default addresses to the ZooKeeper
servers, timeouts, if compression should be on or off, and so on.
.. code-block:: conf
akka {
cloud {
cluster {
zookeeper-server-addresses = "wallace:2181,gromit:2181"
remote-server-port = 2552
max-time-to-wait-until-connected = 5
session-timeout = 60
connection-timeout = 30
use-compression = on
}
}
}
Creating a node
~~~~~~~~~~~~~~~
The first thing you need to do on each node is to create a new cluster
node. That is done by invoking ``newNode`` on the ``Cluster`` object. Here is
the signature with its default values::
def newNode(
nodeAddress: NodeAddress,
zkServerAddresses: String = Cluster.zooKeeperServers,
serializer: ZkSerializer = Cluster.defaultSerializer,
hostname: String = NetworkUtil.getLocalhostName,
remoteServerPort: Int = Cluster.remoteServerPort): ClusterNode
The ``NodeAddress`` defines the address for a node and has the following
signature::
final case class NodeAddress(
clusterName: String,
nodeName: String,
hostname: String = Cluster.lookupLocalhostName,
port: Int = Cluster.remoteServerPort)
You have to specify a cluster name and node name while the hostname and port for
the remote server can be left out to use default values.
Here is a an example of creating a node in which we only specify the node address
and let the rest of the configuration options have their default values::
import akka.cloud.cluster._
val clusterNode = Cluster.newNode(NodeAddress("test-cluster", "node1"))
You can also use the ``apply`` method on the ``Cluster`` object to create a new
node in a more idiomatic Scala way::
import akka.cloud.cluster._
val clusterNode = Cluster(NodeAddress("test-cluster", "node1"))
The ``NodeAddress`` defines the name of the node and the name of cluster. This
allows you to have multiple clusters running in parallel in isolation,
not aware of each other.
The other parameters to know are:
- ``zkServerAddresses`` -- a list of the ZooKeeper servers to connect to,
default is "localhost:2181"
- ``serializer`` -- the serializer to use when serializing configuration data
into the cluster. Default is ``Cluster.defaultSerializer`` which is using Java
serialization
- ``hostname`` -- the hostname to use for the node
- ``remoteServerPort`` -- the remote server port, for the internal remote server
Starting a node
~~~~~~~~~~~~~~~
Creating a node does not make it join the cluster. In order to do that you need
to invoke the ``start`` method::
val clusterNode = Cluster.newNode(NodeAddress("test-cluster", "node1"))
clusterNode.start
Or if you prefer to do it in one line of code::
val clusterNode = Cluster.newNode(NodeAddress("test-cluster", "node1")).start
Stopping a node
~~~~~~~~~~~~~~~
To stop a node invoke ``stop``::
clusterNode.stop
Querying which nodes are part of the cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You can query the cluster for which nodes have joined the cluster. This is done using the ``membershipNodes``method::
val allNodesInCluster = clusterNode.membershipNodes
You can also query the 'Cluster' object for which nodes are member of the a specific cluster::
val nodes = Cluster nodesInCluster "test-cluster"
Resetting the Cluster
~~~~~~~~~~~~~~~~~~~~~
You can reset the whole cluster using the ``reset`` method on the ``Cluster`` object::
Cluster.reset
This shuts down all nodes and removes them from the cluster, it also removes all clustered actors and configuration data from the registry. Use this method with care.
You can reset all the nodes in a specific cluster using the ``resetNodesInCluster`` method on the ``Cluster`` object::
Cluster resetNodesInCluster "test-cluster"
Cluster event subscription
--------------------------
The cluster module supports subscribing to events happening in the cluster. For
example, this can be very useful for knowing when a new nodes come and go,
allowing you to dynamically resize the cluster. Here is an example::
clusterNode.register(new ChangeListener {
override def nodeConnected(node: String, thisNode: ClusterNode) {
// ...
}
override def nodeDisconnected(node: String, thisNode: ClusterNode) {
// ...
}
})
As parameters into these callbacks the cluster passes the name of the node that
joined or left the cluster as well as the local node itself.
Here is the full trait with all the callbacks you can implement::
trait ChangeListener {
def nodeConnected(node: String, client: ClusterNode) = {}
def nodeDisconnected(node: String, client: ClusterNode) = {}
def newLeader(name: String, client: ClusterNode) = {}
def thisNodeNewSession(client: ClusterNode) = {}
def thisNodeConnected(client: ClusterNode) = {}
def thisNodeDisconnected(client: ClusterNode) = {}
def thisNodeExpired(client: ClusterNode) = {}
}
Here is when each callback will be invoked:
- ``nodeConnected`` -- when a node joins the cluster
- ``nodeDisconnected`` -- when a node leaves the cluster
- ``newLeader`` -- when there has been a leader election and the new leader is elected
- ``thisNodeNewSession`` -- when the local node has created a new session to the cluster
- ``thisNodeConnected`` -- when a local node has joined the cluster
- ``thisNodeDisconnected`` -- when a local node has left the cluster
- ``thisNodeExpired`` -- when the local node's session has expired
If you are using this from Java then you need to use the "Java-friendly"
``ChangeListenerAdapter`` abstract class instead of the ``ChangeListener``
trait.
Clustered Actor Registry
------------------------
You can cluster actors by storing them in the cluster by UUID. The actors will
be serialized deeply (with or without its mailbox and pending messages) and put
in a highly available storage. This actor can then be checked out on any other
node, used there and then checked in again. The cluster will also take care of
transparently migrating actors residing on a failed node onto another node on
the cluster so that the application can continue working as if nothing happened.
Let's look at an example. First we create a simple Hello World actor. We also
create a ``Format`` type class for serialization. For simplicity we are using
plain Java serialization. ::
import akka.serialization._
import akka.actor._
@serializable class HelloActor extends Actor {
private var count = 0
self.id = "service:hello"
def receive = {
case "hello" =>
count = count + 1
self reply ("world " + count)
}
}
object BinaryFormats {
@serializable implicit object HelloActorFormat
extends SerializerBasedActorFormat[HelloActor] {
val serializer = Serializer.Java
}
}
|more| Read more about actor serialization in the `Akka Serialization
Documentation <http://doc.akka.io/serialization-scala>`_.
.. todo:: add explanation on how to do this with the Java API
Once we can serialize and deserialize the actor we have what we need in
order to cluster the actor. We have four methods at our disposal:
- ``store``
- ``remove``
- ``use``
- ``release``
ActorAddress
----------------
The ``ActorAddress`` is used to represent the address to a specific actor. All methods in the API that deals with actors works with ``ActorAddress`` and represents one of these identifiers:
- ``actorUuid`` -- the UUID for an actor; ``Actor.uuid``
- ``actorId`` -- the ID for an actor; ``Actor.id``
- ``actorClassName`` -- the class name of an actor; ``Actor.actorClassName``
To create a ``ActorAddress`` you can create the it using named arguments like this::
ActorAddress(actorUuid = uuid)
ActorAddress(actorId = id)
ActorAddress(actorClassName = className)
Or, if you are using the API from Java (or prefer the syntaxt in Scala) then you can use the ``ActorAddress`` factory methods::
ActorAddress.forUuid(uuid)
ActorAddress.forId(id)
ActorAddress.forClassName(className)
Store and Remove
----------------
The methods for storing an actor in the cluster and removing it from the cluster
are:
- ``store`` -- clusters the actor by adding it to the clustered actor registry, available to any node in the cluster
- ``remove`` -- removes the actor from the clustered actor registry
The ``store`` method also allows you to specify a replication factor. The
``replicationFactor`` defines the number of (randomly picked) nodes in the cluster that
the stored actor should be automatically deployed to and instantiated locally on (using
``use``). If you leave this argument out then a replication factor of ``0`` will be used
which means that the actor will only be stored in the clustered actor registry and not
deployed anywhere.
The last argument to the ``store`` method is the ``serializeMailbox`` which defines if
the actor's mailbox should be serialized along with the actor, stored in the cluster and
deployed (if replication factor is set to more than ``0``). If it should or not depends
on your use-case. Default is ``false``
This is the signatures for the ``store`` method (all different permutations of these methods are available for using from Java)::
def store[T <: Actor]
(actorRef: ActorRef, replicationFactor: Int = 0, serializeMailbox: Boolean = false)
(implicit format: Format[T]): ClusterNode
def store[T <: Actor]
(actorClass: Class[T], replicationFactor: Int = 0, serializeMailbox: Boolean = false)
(implicit format: Format[T]): ClusterNode
The ``implicit format: Format[T]`` might look scary but this argument is chosen for you and passed in automatically by the compiler as long as you have imported the serialization typeclass for the actor you are storing, e.g. the ``HelloActorFormat`` (defined above and imported in the sample below).
Here is an example of how to use ``store`` to cluster an already
created actor::
import Actor._
import ActorSerialization._
import BinaryFormats._
val clusterNode = Cluster.newNode(NodeAddress("test-cluster", "node1")).start
val hello = actorOf[HelloActor].start.asInstanceOf[LocalActorRef]
val serializeMailbox = false
val replicationFactor = 5
clusterNode store (hello, serializeMailbox, replicationFactor)
Here is an example of how to use ``store`` to cluster an actor by type::
clusterNode store classOf[HelloActor]
The ``remove`` method allows you to passing in a ``ActorAddress``::
cluster remove actorAddress
You can also remove an actor by type like this::
cluster remove classOf[HelloActor]
Use and Release
---------------
The two methods for "checking out" an actor from the cluster for use and
"checking it in" after use are:
- ``use`` -- "checks out" for use on a specific node, this will deserialize
the actor and instantiated on the node it is being checked out on
- ``release`` -- "checks in" the actor after being done with it, important for
the cluster bookkeeping
The ``use`` and ``release`` methods allow you to pass an instance of ``ActorAddress``. Here is an example::
val helloActor1 = cluster use actorAddress
helloActor1 ! "hello"
helloActor2 ! "hello"
helloActor3 ! "hello"
cluster release actorAddress
Ref and Router
--------------
The ``ref`` method is used to create an actor reference to a set of clustered
(remote) actors defined with a spefific routing policy.
This is the signature for ``ref``::
def ref(actorAddress: ActorAddress, router: Router.RouterType): ActorRef
The final argument ``router`` defines a routing policy in which you have the
following options:
- ``Router.Direct`` -- this policy means that the reference will only represent one single actor which it will use all the time when sending messages to the actor. If the query returns multiple actors then a single one is picked out randomly.
- ``Router.Random`` -- this policy will route the messages to a randomly picked actor in the set of actors in the cluster, returned by the query.
- ``Router.RoundRobin`` -- this policy will route the messages to the set of actors in the cluster returned by the query in a round-robin fashion. E.g. circle around the set of actors in order.
Here is an example::
// Store the PongActor in the cluster and deploy it to 5 nodes in the cluster
localNode store (classOf[PongActor], 5)
// Get a reference to all the pong actors through a round-robin router ActorRef
val pong = localNode ref (actorAddress, Router.RoundRobin)
// Send it messages
pong ! Ping
Actor migration
---------------
The cluster has mechanisms to either manually or automatically fail over all
actors running on a node that have crashed to another node in the cluster. It
will also make sure that all remote clients that are communicating these actors
will automatically and transparently reconnect to the new host node.
Automatic actor migration on fail-over
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
All actors are checked out with ``use`` are tracked by the cluster and will be
automatically failed over to a new node in the cluster if the node that up and
how is it is running on (using it) it crashes. Tracking will stop when the actor
is checked in using ``release``.
Manual actor migration
~~~~~~~~~~~~~~~~~~~~~~
You can move an actor for one node to another using the ``migrate`` method. Here is the parameter list:
- ``from`` -- the address of the node migrating from (default is the address for the node you are invoking it on)
- ``to`` -- the address of the node migrating to
- ``actorAddress`` -- the ``ActorAddress``
Here is an example::
clusterNode migrate (
NodeAddress("test-cluster", "node1"),
NodeAddress("test-cluster", "node2"),
actorAddress)
Here is an example using ``actorId`` and ``to``, e.g. relying on the default value for ``from`` (this node)::
clusterNode migrate (NodeAddress("test-cluster", "node2"), actorAddress)
Compute Grid
-----------------------------
Akka can work as a compute grid by allowing you to send functions to the nodes
in the cluster and collect the result back.
The workhorse for this is the ``send`` method (in different variations). The
``send`` methods take the following parameters:
- ``f`` -- the function you want to be invoked on the remote nodes in the cluster
- ``arg`` -- the argument to the function (not all of them have this parameter)
- ``replicationFactor`` -- the replication factor defining the number of nodes you want the function to be sent and invoked on
You can currently send these function types to the cluster:
- ``Function0[Unit]`` -- takes no arguments and returns nothing
- ``Function0[Any]`` -- takes no arguments and returns a value of type ``Any``
- ``Function1[Any, Unit]`` -- takes an arguments of type ``Any`` and returns nothing
- ``Function1[Any, Any]`` -- takes an arguments of type ``Any`` and returns a value of type ``Any``
All ``send`` methods returns immediately after the functions have been sent off
asynchronously to the remote nodes. The ``send`` methods that takes a function
that yields a return value all return a ``scala.List`` of ``akka.dispatch.Future[Any]``.
This gives you the option of handling these futures the way you wish. Some helper
functions for working with ``Future`` are in the ``akka.dispatch.Futures`` object.
|more| Read more about futures in the `Akka documentation on Futures
<http://doc.akka.io/actors-scala#Actors%20(Scala)-Send%20messages-Send-And-Receive-Future>`_.
Here are some examples showing how you can use the different ``send`` methods.
Send a ``Function0[Unit]``::
val node1 = Cluster newNode (NodeAddress("test", "node1", port = 9991)) start
val node2 = Cluster newNode (NodeAddress("test", "node2", port = 9992)) start
val fun = () => println(">>> AKKA ROCKS <<<")
// send and invoke function on to two cluster nodes
node1 send (fun, 2)
Send a ``Function0[Any]``::
val node1 = Cluster newNode (NodeAddress("test", "node1", port = 9991)) start
val node2 = Cluster newNode (NodeAddress("test", "node2", port = 9992)) start
val fun = () => "AKKA ROCKS"
// send and invoke function on to two cluster nodes and get result
val futures = node1 send (fun, 2)
Futures awaitAll futures
println("Cluster says [" + futures.map(_.result).mkString(" - ") + "]")
Send a ``Function1[Any, Unit]``::
val node1 = Cluster newNode (NodeAddress("test", "node1", port = 9991)) start
val node2 = Cluster newNode (NodeAddress("test", "node2", port = 9992)) start
val fun = ((s: String) => println(">>> " + s + " <<<")).asInstanceOf[Function1[Any, Unit]]
// send and invoke function on to two cluster nodes
node1 send (fun, "AKKA ROCKS", 2)
Send a ``Function1[Any, Any]``::
val node1 = Cluster newNode (NodeAddress("test", "node1", port = 9991)) start
val node2 = Cluster newNode (NodeAddress("test", "node2", port = 9992)) start
val fun = ((i: Int) => i * i).asInstanceOf[Function1[Any, Any]]
// send and invoke function on one cluster node and get result
val future1 = node1 send (fun, 2, 1) head
val future2 = node1 send (fun, 2, 1) head
// grab the result from the first one that returns
val result = Futures awaitEither (future1, future2)
println("Cluster says [" + result.get + "]")
Querying the Clustered Actor Registry
-------------------------------------
Here we have some other methods for querying the Clustered Actor Registry in different ways.
Check if an actor is clustered (stored and/or used in the cluster):
- ``def isClustered(actorUuid: UUID, actorId: String, actorClassName: String): Boolean``
- When using this method you should only specify one of the parameters using "named parameters" as in the examples above.
Check if an actor is used by a specific node (e.g. checked out locally using ``use``):
- ``def isInUseOnNode(actorUuid: UUID, actorId: String, actorClassName: String, node: NodeAddress): Boolean``
- When using this method you should only specify one of the parameters using "named parameters" as in the examples above. Default argument for ``node`` is "this" node.
Lookup the remote addresses for a specific actor (can reside on more than one node):
- ``def addressesForActor(actorUuid: UUID, actorId: String,
actorClassName: String): Array[Tuple2[UUID, InetSocketAddress]]``
- When using this method you should only specify one of the parameters using "named parameters" as in the examples above.
Lookup all actors that are in use (e.g. "checked out") on this node:
- ``def uuidsForActorsInUse: Array[UUID]``
- ``def idsForActorsInUse: Array[String]``
- ``def classNamesForActorsInUse: Array[String]``
Lookup all actors are available (e.g. "stored") in the Clustered Actor Registry:
- ``def uuidsForClusteredActors: Array[UUID]``
- ``def idsForClusteredActors: Array[String]``
- ``def classNamesForClusteredActors: Array[String]``
Lookup the ``Actor.id`` by ``Actor.uuid``:
- ``def actorIdForUuid(uuid: UUID): String``
- ``def actorIdsForUuids(uuids: Array[UUID]): Array[String]``
Lookup the ``Actor.actorClassName`` by ``Actor.uuid``:
- ``def actorClassNameForUuid(uuid: UUID): String``
- ``def actorClassNamesForUuids(uuids: Array[UUID]): Array[String]``
Lookup the ``Actor.uuid``'s by ``Actor.id``:
- ``def uuidsForActorId(actorId: String): Array[UUID]``
Lookup the ``Actor.uuid``'s by ``Actor.actorClassName``:
- ``def uuidsForActorClassName(actorClassName: String): Array[UUID]``
Lookup which nodes that have checked out a specific actor:
- ``def nodesForActorsInUseWithUuid(uuid: UUID): Array[String]``
- ``def nodesForActorsInUseWithId(id: String): Array[String]``
- ``def nodesForActorsInUseWithClassName(className: String): Array[String]``
Lookup the ``Actor.uuid`` for the actors that have been checked out a specific node:
- ``def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID]``
- ``def idsForActorsInUseOnNode(nodeName: String): Array[String]``
- ``def classNamesForActorsInUseOnNode(nodeName: String): Array[String]``
Lookup the serialization ``Format`` instance for a specific actor:
- ``def formatForActor(actorUuid: UUID, actorId: String, actorClassName: String): Format[T]``
- When using this method you should only specify one of the parameters using "named parameters" as in the examples above.
Clustered configuration manager
-------------------------------
Custom configuration data
~~~~~~~~~~~~~~~~~~~~~~~~~
You can also store configuration data into the cluster. This is done using the
``setConfigElement`` and ``getConfigElement`` methods. The key is a ``String`` and the data a ``Array[Byte]``::
clusterNode setConfigElement ("hello", "world".getBytes("UTF-8"))
val valueAsBytes = clusterNode getConfigElement ("hello") // returns Array[Byte]
val valueAsString = new String(valueAsBytes, "UTF-8")
You can also remove an entry using the ``removeConfigElement`` method and get an
``Array[String]`` with all the keys::
clusterNode removeConfigElement ("hello")
val allConfigElementKeys = clusterNode.getConfigElementKeys // returns Array[String]
Consolidation and management of the Akka configuration file
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Not implemented yet.
The actor configuration file ``akka.conf`` will also be stored into the cluster
and it will be possible to have one single configuration file, stored on the server, and pushed out to all
the nodes that joins the cluster. Each node only needs to be configured with the ZooKeeper
server address and the master configuration will only reside in one single place
simplifying administration of the cluster and alleviates the risk of having
different configuration files lying around in the cluster.
Leader election
---------------
The cluster supports leader election. There will always only be one single
leader in the cluster. The first thing that happens when the cluster startup is
a leader election. The leader that gets elected will stay the leader until it
crashes or is shut down, then an automatic reelection process will take place
and a new leader is elected. Only having one leader in a cluster can be very
useful to solve the wide range of problems. You can find out which node is the
leader by invoking the ``leader`` method. A node can also check if it is the
leader by invoking the ``isLeader`` method. A leader node can also explicitly
resign and issue a new leader election by invoking the ``resign`` method. Each node has an election number stating its ranking in the last election. You can query a node for its election number through the ``electionNumber`` method.
JMX monitoring and management
-----------------------------
.. todo:: Add some docs to each method
The clustering module has an JMX MBean that you can use. Here is the interface
with all available operations::
trait ClusterNodeMBean {
def start: Unit
def stop: Unit
def disconnect: Unit
def reconnect: Unit
def resign: Unit
def isConnected: Boolean
def getRemoteServerHostname: String
def getRemoteServerPort: Int
def getNodeName: String
def getClusterName: String
def getZooKeeperServerAddresses: String
def getMemberNodes: Array[String]
def getLeader: String
def getUuidsForClusteredActors: Array[String]
def getIdsForClusteredActors: Array[String]
def getClassNamesForClusteredActors: Array[String]
def getUuidsForActorsInUse: Array[String]
def getIdsForActorsInUse: Array[String]
def getClassNamesForActorsInUse: Array[String]
def getNodesForActorInUseWithUuid(uuid: String): Array[String]
def getNodesForActorInUseWithId(id: String): Array[String]
def getNodesForActorInUseWithClassName(className: String): Array[String]
def getUuidsForActorsInUseOnNode(nodeName: String): Array[String]
def getIdsForActorsInUseOnNode(nodeName: String): Array[String]
def getClassNamesForActorsInUseOnNode(nodeName: String): Array[String]
def setConfigElement(key: String, value: String): Unit
def getConfigElement(key: String): AnyRef
def removeConfigElement(key: String): Unit
def getConfigElementKeys: Array[String]
}
JMX support is turned on and off using the default ``akka.enable-jmx``
configuration option.
.. code-block:: conf
akka {
enable-jmx = on
}
.. |more| image:: more.png
:align: middle
:alt: More info