Fixed race condition in initial and dynamic management of node connections in Cluster

* Using CAS optimistic concurrency using versioning to fix initial and dynamic management of node connections in Cluster
* Fixed broken bootstrap of ClusterNode - reorganized booting and removed lazy from some fields
* Removed 'start' and 'isRunning' from Cluster
* Removed 'isStarted' Switch in Cluster which was sprinkled all-over cluster impl
* Added more and better logging
* Moved local Cluster ops from Cluster to LocalCluster
* Rewrote RoundRobinFailoverMultiJvmSpec to be correct
* RoundRobinFailoverMultiJvmSpec now passes
* Minor reformatting and edits

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-08-18 11:35:14 +02:00
parent 0daa28a891
commit dfc1a68aac
61 changed files with 473 additions and 578 deletions

View file

@ -115,6 +115,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exc
override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)
else "Actor does not handle [%s]".format(msg)
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
@ -123,7 +124,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exc
*/
object Status {
sealed trait Status extends Serializable
case object Success extends Status
case class Success(status: AnyRef) extends Status
case class Failure(cause: Throwable) extends Status
}
@ -131,6 +132,7 @@ case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
object Timeout {
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
@ -185,11 +187,7 @@ object Actor {
/**
* Handle to the ClusterNode. API for the cluster client.
*/
lazy val cluster: ClusterNode = {
val node = ClusterModule.node
node.start()
node
}
lazy val cluster: ClusterNode = ClusterModule.node
/**
* Handle to the RemoteSupport. API for the remote client/server.
@ -782,4 +780,4 @@ trait Actor {
}
private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior
}
}

View file

@ -129,10 +129,6 @@ trait ClusterNode {
def remoteServerAddress: InetSocketAddress
def isRunning: Boolean
def start(): ClusterNode
def shutdown()
def disconnect(): ClusterNode
@ -424,6 +420,7 @@ trait ClusterNode {
// FIXME considering moving all these private[cluster] methods to a separate trait to get them out of the user's view
private[cluster] def remoteClientLifeCycleListener: ActorRef
private[cluster] def remoteDaemon: ActorRef
/**

View file

@ -89,7 +89,7 @@ object EventHandler extends ListenerManagement {
class EventHandlerException extends AkkaException
lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("event:handler").build
lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("akka:event:handler").build
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]

View file

@ -12,6 +12,7 @@ import akka.actor._
import akka.dispatch.Futures
import akka.event.EventHandler
import akka.actor.UntypedChannel._
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
/**
@ -57,6 +58,11 @@ object RouterType {
*/
object LeastMessages extends RouterType
/**
* A user-defined custom RouterType.
*/
object Custom extends RouterType
}
/**
@ -106,6 +112,11 @@ trait RouterConnections {
*/
def version: Long
/**
* Returns the number of connections.
*/
def size: Int
/**
* Returns a tuple containing the version and Iterable of all connected ActorRefs this Router uses to send messages to.
*
@ -151,12 +162,14 @@ object Routing {
case RouterType.Direct
if (connections.size > 1)
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
actorOf(actorAddress, connections, new DirectRouter())
case RouterType.Random
actorOf(actorAddress, connections, new RandomRouter())
case RouterType.RoundRobin
actorOf(actorAddress, connections, new RoundRobinRouter())
case _ throw new IllegalArgumentException("Unsupported routerType " + routerType)
}
@ -191,9 +204,8 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
router.route(message)(sender)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = {
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
@ -230,9 +242,11 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
def version: Long = state.get().version
def size: Int = state.get().connections.size
def versionedIterator = {
val s = state.get
(s.version, s.connectionIterable)
(s.version, s.connections)
}
@tailrec
@ -240,9 +254,9 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
val oldState = state.get()
//remote the ref from the connections.
var newList = oldState.connectionIterable.filter(currentActorRef currentActorRef ne ref)
var newList = oldState.connections.filter(currentActorRef currentActorRef ne ref)
if (newList.size != oldState.connectionIterable.size) {
if (newList.size != oldState.connections.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
val newState = new State(oldState.version + 1, newList)
@ -251,9 +265,8 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
}
}
class State(val version: Long, val connectionIterable: Iterable[ActorRef])
case class State(val version: Long, val connections: Iterable[ActorRef])
}
}
/**
@ -293,7 +306,6 @@ trait BasicRouter extends Router {
case e: Exception
connections.signalDeadActor(actor)
throw e
}
case None
throwNoConnectionsError()
@ -302,7 +314,7 @@ trait BasicRouter extends Router {
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
case Routing.Broadcast(message)
throw new RoutingException("Broadcasting using a future for the time being is not supported")
throw new RoutingException("Broadcasting using '?' is for the time being is not supported. Use ScatterGatherRouter.")
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
@ -313,7 +325,6 @@ trait BasicRouter extends Router {
case e: Exception
connections.signalDeadActor(actor)
throw e
}
case None
throwNoConnectionsError()
@ -330,7 +341,7 @@ trait BasicRouter extends Router {
}
/**
* A DirectRouter is
* A DirectRouter is FIXME
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -369,7 +380,7 @@ class DirectRouter extends BasicRouter {
}
}
private class DirectRouterState(val ref: ActorRef, val version: Long)
private case class DirectRouterState(val ref: ActorRef, val version: Long)
}
@ -404,9 +415,9 @@ class RandomRouter extends BasicRouter {
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val (version, connectionIterable) = connections.versionedIterator
val newState = new RandomRouterState(connectionIterable.toArray[ActorRef], version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@ -417,8 +428,7 @@ class RandomRouter extends BasicRouter {
}
}
private class RandomRouterState(val array: Array[ActorRef], val version: Long)
private case class RandomRouterState(val array: Array[ActorRef], val version: Long)
}
/**
@ -441,9 +451,9 @@ class RoundRobinRouter extends BasicRouter {
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val (version, connectionIterable) = connections.versionedIterator
val newState = new RoundRobinState(connectionIterable.toArray[ActorRef], version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@ -454,7 +464,7 @@ class RoundRobinRouter extends BasicRouter {
}
}
private class RoundRobinState(val array: Array[ActorRef], val version: Long) {
private case class RoundRobinState(val array: Array[ActorRef], val version: Long) {
private val index = new AtomicInteger(0)
@ -469,7 +479,6 @@ class RoundRobinRouter extends BasicRouter {
else oldIndex
}
}
}
/*

View file

@ -31,7 +31,7 @@ object ReflectiveAccess {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ClusterModule {
lazy val isEnabled = Config.isClusterEnabled && clusterInstance.isDefined
lazy val isEnabled = Config.isClusterEnabled //&& clusterInstance.isDefined
def ensureEnabled() {
if (!isEnabled) {

View file

@ -10,56 +10,53 @@ public final class ClusterProtocol {
}
public enum RemoteDaemonMessageType
implements com.google.protobuf.ProtocolMessageEnum {
START(0, 1),
STOP(1, 2),
USE(2, 3),
RELEASE(3, 4),
MAKE_AVAILABLE(4, 5),
MAKE_UNAVAILABLE(5, 6),
DISCONNECT(6, 7),
RECONNECT(7, 8),
RESIGN(8, 9),
FAIL_OVER_CONNECTIONS(9, 10),
FUNCTION_FUN0_UNIT(10, 11),
FUNCTION_FUN0_ANY(11, 12),
FUNCTION_FUN1_ARG_UNIT(12, 13),
FUNCTION_FUN1_ARG_ANY(13, 14),
STOP(0, 1),
USE(1, 2),
RELEASE(2, 3),
MAKE_AVAILABLE(3, 4),
MAKE_UNAVAILABLE(4, 5),
DISCONNECT(5, 6),
RECONNECT(6, 7),
RESIGN(7, 8),
FAIL_OVER_CONNECTIONS(8, 9),
FUNCTION_FUN0_UNIT(9, 10),
FUNCTION_FUN0_ANY(10, 11),
FUNCTION_FUN1_ARG_UNIT(11, 12),
FUNCTION_FUN1_ARG_ANY(12, 13),
;
public static final int START_VALUE = 1;
public static final int STOP_VALUE = 2;
public static final int USE_VALUE = 3;
public static final int RELEASE_VALUE = 4;
public static final int MAKE_AVAILABLE_VALUE = 5;
public static final int MAKE_UNAVAILABLE_VALUE = 6;
public static final int DISCONNECT_VALUE = 7;
public static final int RECONNECT_VALUE = 8;
public static final int RESIGN_VALUE = 9;
public static final int FAIL_OVER_CONNECTIONS_VALUE = 10;
public static final int FUNCTION_FUN0_UNIT_VALUE = 11;
public static final int FUNCTION_FUN0_ANY_VALUE = 12;
public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 13;
public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 14;
public static final int STOP_VALUE = 1;
public static final int USE_VALUE = 2;
public static final int RELEASE_VALUE = 3;
public static final int MAKE_AVAILABLE_VALUE = 4;
public static final int MAKE_UNAVAILABLE_VALUE = 5;
public static final int DISCONNECT_VALUE = 6;
public static final int RECONNECT_VALUE = 7;
public static final int RESIGN_VALUE = 8;
public static final int FAIL_OVER_CONNECTIONS_VALUE = 9;
public static final int FUNCTION_FUN0_UNIT_VALUE = 10;
public static final int FUNCTION_FUN0_ANY_VALUE = 11;
public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 12;
public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 13;
public final int getNumber() { return value; }
public static RemoteDaemonMessageType valueOf(int value) {
switch (value) {
case 1: return START;
case 2: return STOP;
case 3: return USE;
case 4: return RELEASE;
case 5: return MAKE_AVAILABLE;
case 6: return MAKE_UNAVAILABLE;
case 7: return DISCONNECT;
case 8: return RECONNECT;
case 9: return RESIGN;
case 10: return FAIL_OVER_CONNECTIONS;
case 11: return FUNCTION_FUN0_UNIT;
case 12: return FUNCTION_FUN0_ANY;
case 13: return FUNCTION_FUN1_ARG_UNIT;
case 14: return FUNCTION_FUN1_ARG_ANY;
case 1: return STOP;
case 2: return USE;
case 3: return RELEASE;
case 4: return MAKE_AVAILABLE;
case 5: return MAKE_UNAVAILABLE;
case 6: return DISCONNECT;
case 7: return RECONNECT;
case 8: return RESIGN;
case 9: return FAIL_OVER_CONNECTIONS;
case 10: return FUNCTION_FUN0_UNIT;
case 11: return FUNCTION_FUN0_ANY;
case 12: return FUNCTION_FUN1_ARG_UNIT;
case 13: return FUNCTION_FUN1_ARG_ANY;
default: return null;
}
}
@ -90,7 +87,7 @@ public final class ClusterProtocol {
}
private static final RemoteDaemonMessageType[] VALUES = {
START, STOP, USE, RELEASE, MAKE_AVAILABLE, MAKE_UNAVAILABLE, DISCONNECT, RECONNECT, RESIGN, FAIL_OVER_CONNECTIONS, FUNCTION_FUN0_UNIT, FUNCTION_FUN0_ANY, FUNCTION_FUN1_ARG_UNIT, FUNCTION_FUN1_ARG_ANY,
STOP, USE, RELEASE, MAKE_AVAILABLE, MAKE_UNAVAILABLE, DISCONNECT, RECONNECT, RESIGN, FAIL_OVER_CONNECTIONS, FUNCTION_FUN0_UNIT, FUNCTION_FUN0_ANY, FUNCTION_FUN1_ARG_UNIT, FUNCTION_FUN1_ARG_ANY,
};
public static RemoteDaemonMessageType valueOf(
@ -246,7 +243,7 @@ public final class ClusterProtocol {
}
private void initFields() {
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
actorUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
actorAddress_ = "";
payload_ = com.google.protobuf.ByteString.EMPTY;
@ -434,7 +431,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -450,7 +447,7 @@ public final class ClusterProtocol {
public Builder clear() {
super.clear();
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
bitField0_ = (bitField0_ & ~0x00000001);
if (actorUuidBuilder_ == null) {
actorUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
@ -658,7 +655,7 @@ public final class ClusterProtocol {
private int bitField0_;
// required .RemoteDaemonMessageType messageType = 1;
private akka.cluster.ClusterProtocol.RemoteDaemonMessageType messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
private akka.cluster.ClusterProtocol.RemoteDaemonMessageType messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
public boolean hasMessageType() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
@ -676,7 +673,7 @@ public final class ClusterProtocol {
}
public Builder clearMessageType() {
bitField0_ = (bitField0_ & ~0x00000001);
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
onChanged();
return this;
}
@ -1247,7 +1244,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -1849,7 +1846,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2077,14 +2074,14 @@ public final class ClusterProtocol {
"\002(\t\022\032\n\022senderActorAddress\030\002 \001(\t\022!\n\nfutur" +
"eUuid\030\003 \001(\0132\r.UuidProtocol\022\017\n\007message\030\004 " +
"\002(\014\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low",
"\030\002 \002(\004*\232\002\n\027RemoteDaemonMessageType\022\t\n\005ST" +
"ART\020\001\022\010\n\004STOP\020\002\022\007\n\003USE\020\003\022\013\n\007RELEASE\020\004\022\022\n" +
"\016MAKE_AVAILABLE\020\005\022\024\n\020MAKE_UNAVAILABLE\020\006\022" +
"\016\n\nDISCONNECT\020\007\022\r\n\tRECONNECT\020\010\022\n\n\006RESIGN" +
"\020\t\022\031\n\025FAIL_OVER_CONNECTIONS\020\n\022\026\n\022FUNCTIO" +
"N_FUN0_UNIT\020\013\022\025\n\021FUNCTION_FUN0_ANY\020\014\022\032\n\026" +
"FUNCTION_FUN1_ARG_UNIT\020\r\022\031\n\025FUNCTION_FUN" +
"1_ARG_ANY\020\016B\020\n\014akka.clusterH\001"
"\030\002 \002(\004*\217\002\n\027RemoteDaemonMessageType\022\010\n\004ST" +
"OP\020\001\022\007\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAIL" +
"ABLE\020\004\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNE" +
"CT\020\006\022\r\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\031\n\025FAIL_" +
"OVER_CONNECTIONS\020\t\022\026\n\022FUNCTION_FUN0_UNIT" +
"\020\n\022\025\n\021FUNCTION_FUN0_ANY\020\013\022\032\n\026FUNCTION_FU" +
"N1_ARG_UNIT\020\014\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\r" +
"B\020\n\014akka.clusterH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View file

@ -26,20 +26,19 @@ message RemoteDaemonMessageProtocol {
* Defines the remote daemon message type.
*/
enum RemoteDaemonMessageType {
START = 1;
STOP = 2;
USE = 3;
RELEASE = 4;
MAKE_AVAILABLE = 5;
MAKE_UNAVAILABLE = 6;
DISCONNECT = 7;
RECONNECT = 8;
RESIGN = 9;
FAIL_OVER_CONNECTIONS = 10;
FUNCTION_FUN0_UNIT = 11;
FUNCTION_FUN0_ANY = 12;
FUNCTION_FUN1_ARG_UNIT = 13;
FUNCTION_FUN1_ARG_ANY = 14;
STOP = 1;
USE = 2;
RELEASE = 3;
MAKE_AVAILABLE = 4;
MAKE_UNAVAILABLE = 5;
DISCONNECT = 6;
RECONNECT = 7;
RESIGN = 8;
FAIL_OVER_CONNECTIONS = 9;
FUNCTION_FUN0_UNIT = 10;
FUNCTION_FUN0_ANY = 11;
FUNCTION_FUN1_ARG_UNIT = 12;
FUNCTION_FUN1_ARG_ANY = 13;
}
/**

View file

@ -49,8 +49,11 @@ import RemoteDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap }
import annotation.tailrec
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
/**
@ -60,8 +63,6 @@ import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap
*/
trait ClusterNodeMBean {
def start()
def stop()
def disconnect()
@ -70,8 +71,6 @@ trait ClusterNodeMBean {
def resign()
def isConnected: Boolean
def getRemoteServerHostname: String
def getRemoteServerPort: Int
@ -139,78 +138,13 @@ trait ClusterNodeMBean {
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String
}
/**
* Module for the Cluster. Also holds global state such as configuration data etc.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Cluster {
val EMPTY_STRING = "".intern
// config options
val name = Config.clusterName
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
val enableJMX = config.getBool("akka.enable-jmx", true)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
object LocalCluster {
val clusterDirectory = config.getString("akka.cluster.log-directory", "_akka_cluster")
val clusterDataDirectory = clusterDirectory + "/data"
val clusterLogDirectory = clusterDirectory + "/log"
@volatile
private var properties = Map.empty[String, String]
/**
* Use to override JVM options such as <code>-Dakka.cluster.nodename=node1</code> etc.
* Currently supported options are:
* <pre>
* Cluster setProperty ("akka.cluster.nodename", "node1")
* Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
* Cluster setProperty ("akka.cluster.port", "1234")
* </pre>
*/
def setProperty(property: (String, String)) {
properties = properties + property
}
private def nodename: String = properties.get("akka.cluster.nodename") match {
case Some(uberride) uberride
case None Config.nodename
}
private def hostname: String = properties.get("akka.cluster.hostname") match {
case Some(uberride) uberride
case None Config.hostname
}
private def port: Int = properties.get("akka.cluster.port") match {
case Some(uberride) uberride.toInt
case None Config.remoteServerPort
}
val defaultZooKeeperSerializer = new SerializableSerializer
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
/**
* The node address.
*/
val nodeAddress = NodeAddress(name, nodename)
/**
* The reference to the running ClusterNode.
*/
val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
}
/**
* Looks up the local hostname.
*/
@ -265,6 +199,73 @@ object Cluster {
_zkServer.set(None)
}
}
}
/**
* Module for the Cluster. Also holds global state such as configuration data etc.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Cluster {
val EMPTY_STRING = "".intern
// config options
val name = Config.clusterName
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
val enableJMX = config.getBool("akka.enable-jmx", true)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
@volatile
private var properties = Map.empty[String, String]
/**
* Use to override JVM options such as <code>-Dakka.cluster.nodename=node1</code> etc.
* Currently supported options are:
* <pre>
* Cluster setProperty ("akka.cluster.nodename", "node1")
* Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
* Cluster setProperty ("akka.cluster.port", "1234")
* </pre>
*/
def setProperty(property: (String, String)) {
properties = properties + property
}
private def nodename: String = properties.get("akka.cluster.nodename") match {
case Some(uberride) uberride
case None Config.nodename
}
private def hostname: String = properties.get("akka.cluster.hostname") match {
case Some(uberride) uberride
case None Config.hostname
}
private def port: Int = properties.get("akka.cluster.port") match {
case Some(uberride) uberride.toInt
case None Config.remoteServerPort
}
val defaultZooKeeperSerializer = new SerializableSerializer
/**
* The node address.
*/
val nodeAddress = NodeAddress(name, nodename)
/**
* The reference to the running ClusterNode.
*/
val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
}
/**
* Creates a new AkkaZkClient.
@ -345,6 +346,8 @@ class DefaultClusterNode private[akka] (
import Cluster._
// private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
private[cluster] lazy val remoteClientLifeCycleListener = localActorOf(new Actor {
def receive = {
case RemoteClientError(cause, client, address) client.shutdownClientModule()
@ -373,8 +376,6 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
val isConnected = new Switch(false)
// static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@ -413,22 +414,25 @@ class DefaultClusterNode private[akka] (
// Address -> ClusterActorRef
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
case class VersionedConnectionState(version: Long, connections: Map[String, Tuple2[InetSocketAddress, ActorRef]])
// all the connections to other nodes
private[akka] val nodeConnections = {
var conns = Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]
// add the remote connection to 'this' node as well, but as a 'local' actor
if (includeRefNodeInReplicaSet) conns += (nodeAddress.nodeName -> (remoteServerAddress, remoteDaemon))
new AtomicReference[VersionedConnectionState](VersionedConnectionState(0, conns))
}
// ============================================================================================================
// ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY =============
// ============================================================================================================
lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
if (includeRefNodeInReplicaSet)
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
// ZooKeeper client
lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
// leader election listener, registered to the 'leaderLock' below
lazy private[cluster] val leaderElectionCallback = new LockListener {
private[cluster] val leaderElectionCallback = new LockListener {
override def lockAcquired() {
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
self.publish(NewLeader(self.nodeAddress.nodeName))
@ -440,7 +444,7 @@ class DefaultClusterNode private[akka] (
}
// leader election lock in ZooKeeper
lazy private[cluster] val leaderLock = new WriteLock(
private[cluster] val leaderLock = new WriteLock(
zkClient.connection.getZookeeper,
LEADER_ELECTION_PATH, null,
leaderElectionCallback)
@ -449,20 +453,12 @@ class DefaultClusterNode private[akka] (
if (enableJMX) createMBean
initializeNode()
// =======================================
// Node
// =======================================
def isRunning: Boolean = isConnected.isOn
def start(): ClusterNode = {
isConnected.switchOn {
initializeNode()
}
this
}
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
@ -478,6 +474,7 @@ class DefaultClusterNode private[akka] (
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
//EventHandler.info(this, "Connected to nodes [\n\t%s]".format(nodeConnections.toList.map({ case (_, (address, _)) address }).mkString("\n\t")))
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
@ -488,7 +485,7 @@ class DefaultClusterNode private[akka] (
locallyCachedMembershipNodes.clear()
nodeConnections.toList.foreach({
nodeConnections.get.connections.toList.foreach({
case (_, (address, _))
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
@ -501,15 +498,13 @@ class DefaultClusterNode private[akka] (
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
nodeConnections.clear()
nodeConnections.set(VersionedConnectionState(0, Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]))
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
}
isConnected.switchOff {
shutdownNode()
}
shutdownNode()
}
def disconnect(): ClusterNode = {
@ -707,7 +702,7 @@ class DefaultClusterNode private[akka] (
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
serializer: Serializer): ClusterNode = {
EventHandler.debug(this,
"Storing actor with address [%s] in cluster".format(actorAddress))
@ -757,7 +752,7 @@ class DefaultClusterNode private[akka] (
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
this
} else throw new ClusterException("Not connected to cluster")
}
/**
* Removes actor from the cluster.
@ -781,9 +776,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid clustered or not?
*/
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressRegistryPathFor(actorAddress))
} else false
def isClustered(actorAddress: String): Boolean = zkClient.exists(actorAddressRegistryPathFor(actorAddress))
/**
* Is the actor with uuid in use on 'this' node or not?
@ -793,9 +786,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
} else false
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
@ -807,7 +798,7 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
@ -883,7 +874,7 @@ class DefaultClusterNode private[akka] (
actorRef.start()
actorRef
}
} else None
}
/**
* Using (checking out) actor on a specific set of nodes.
@ -892,22 +883,19 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
if (isConnected.isOn) {
val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
// set the UUID to replicated from - if available
replicateFromUuid foreach (uuid builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
// set the UUID to replicated from - if available
replicateFromUuid foreach (uuid builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
val command = builder.build
val command = builder.build
nodes foreach { node
nodeConnections.get(node) foreach {
case (_, connection)
sendCommandToNode(connection, command, async = false)
}
nodes foreach { node
nodeConnections.get.connections(node) foreach {
case (address, connection)
sendCommandToNode(connection, command, async = false)
}
}
}
@ -941,16 +929,14 @@ class DefaultClusterNode private[akka] (
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no
// longer available. Then what to do? Should we even remove this method?
if (isConnected.isOn) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
uuidsForActorAddress(actorAddress) foreach { uuid
EventHandler.debug(this,
"Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
uuidsForActorAddress(actorAddress) foreach { uuid
EventHandler.debug(this,
"Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
}
ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
}
}
@ -958,19 +944,17 @@ class DefaultClusterNode private[akka] (
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
*/
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
if (isConnected.isOn) {
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorAddress(actorAddress)
.build
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorAddress(actorAddress)
.build
nodesForActorsInUseWithAddress(actorAddress) foreach { node
nodeConnections.get(node) foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
nodesForActorsInUseWithAddress(actorAddress) foreach { node
nodeConnections.get.connections(node) foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
}
}
@ -978,7 +962,7 @@ class DefaultClusterNode private[akka] (
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
def ref(actorAddress: String, router: RouterType): ActorRef = {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
@ -989,8 +973,7 @@ class DefaultClusterNode private[akka] (
case (_, address) clusterActorRefs.put(address, actorRef)
}
actorRef.start()
} else throw new ClusterException("Not connected to cluster")
}
/**
* Returns the UUIDs of all actors checked out on this node.
@ -1005,9 +988,8 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
private[akka] def uuidsForClusteredActors: Array[UUID] =
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
} else Array.empty[UUID]
/**
* Returns the addresses of all actors registered in this cluster.
@ -1017,13 +999,13 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor id for the actor with a specific UUID.
*/
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = {
try {
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
} catch {
case e: ZkNoNodeException None
}
} else None
}
/**
* Returns the actor ids for all the actors with a specific UUID.
@ -1034,7 +1016,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor UUIDs for actor ID.
*/
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = {
try {
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1042,23 +1024,23 @@ class DefaultClusterNode private[akka] (
} catch {
case e: ZkNoNodeException Array[UUID]()
}
} else Array.empty[UUID]
}
/**
* Returns the node names of all actors in use with UUID.
*/
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = {
try {
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
} catch {
case e: ZkNoNodeException Array[String]()
}
} else Array.empty[String]
}
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = {
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
case c: CharSequence new UUID(c)
@ -1066,12 +1048,12 @@ class DefaultClusterNode private[akka] (
} catch {
case e: ZkNoNodeException Array[UUID]()
}
} else Array.empty[UUID]
}
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = {
val uuids =
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
@ -1081,7 +1063,7 @@ class DefaultClusterNode private[akka] (
case e: ZkNoNodeException Array[UUID]()
}
actorAddressForUuids(uuids)
} else Array.empty[String]
}
/**
* Returns Serializer for actor with specific address.
@ -1248,18 +1230,26 @@ class DefaultClusterNode private[akka] (
if (async) {
connection ! command
} else {
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
try {
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
case Some(Success)
EventHandler.debug(this, "Replica for [%s] successfully created".format(connection.address))
case Some(Success(status))
EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
throw cause
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
throw cause
case None
case None
val error = new ClusterException(
"Remote command to [%s] timed out".format(connection.address))
EventHandler.error(error, this, error.toString)
throw error
}
} catch {
case e: Exception
val error = new ClusterException(
"Operation to instantiate replicas throughout the cluster timed out")
"Remote command to [%s] timed out".format(connection.address))
EventHandler.error(error, this, error.toString)
throw error
}
@ -1302,7 +1292,7 @@ class DefaultClusterNode private[akka] (
*/
private def nodesForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[String] = {
var replicaNames = Set.empty[String]
val nrOfClusterNodes = nodeConnections.size
val nrOfClusterNodes = nodeConnections.get.connections.size
if (replicationFactor < 1) return replicaNames
if (nrOfClusterNodes < replicationFactor) throw new IllegalArgumentException(
@ -1322,7 +1312,7 @@ class DefaultClusterNode private[akka] (
for {
nodeName preferredNodes
key nodeConnections.keys
key nodeConnections.get.connections.keys
if key == nodeName
} replicaNames = replicaNames + nodeName
@ -1352,56 +1342,73 @@ class DefaultClusterNode private[akka] (
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[ActorRef] = {
for {
node nodesForReplicationFactor(replicationFactor, actorAddress)
connectionOption nodeConnections.get(node)
connectionOption nodeConnections.get.connections(node)
connection connectionOption
actorRef connection._2
} yield actorRef
}
private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
/**
* Update the list of connections to other nodes in the cluster.
*
* @returns a Map with the remote socket addresses to of disconnected node connections
*/
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
@tailrec
final private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
var change = false
val oldState = nodeConnections.get
var newConnections = oldState.connections //Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
newlyDisconnectedMembershipNodes foreach { node
disconnectedConnections += (node -> (nodeConnections(node) match {
disconnectedConnections = disconnectedConnections + (node -> (oldState.connections(node) match {
case (address, _) address
}))
}
if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
try {
// remove connections to failed nodes
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
// remove connections to failed nodes
newlyDisconnectedMembershipNodes foreach { node
newConnections = newConnections - node
change = true
}
// add connections newly arrived nodes
newlyConnectedMembershipNodes foreach { node
if (!nodeConnections.contains(node)) {
// only connect to each replica once
// add connections newly arrived nodes
newlyConnectedMembershipNodes foreach { node
if (!newConnections.contains(node)) {
remoteSocketAddressForNode(node) foreach { address
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
// only connect to each replica once
remoteSocketAddressForNode(node) foreach { address
EventHandler.debug(this, "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
nodeConnections.put(node, (address, clusterDaemon))
}
}
val clusterDaemon = remoteService.actorFor(
RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
newConnections = newConnections + (node -> (address, clusterDaemon))
change = true
}
} finally {
connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
}
}
disconnectedConnections
// add the remote connection to 'this' node as well, but as a 'local' actor
if (includeRefNodeInReplicaSet)
newConnections = newConnections + (nodeAddress.nodeName -> (remoteServerAddress, remoteDaemon))
//there was a state change, so we are now going to update the state.
val newState = new VersionedConnectionState(oldState.version + 1, newConnections)
if (!nodeConnections.compareAndSet(oldState, newState)) {
// we failed to set the state, try again
connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes, newlyDisconnectedMembershipNodes)
} else {
// we succeeded to set the state, return
EventHandler.info(this, "Connected to nodes [\n\t%s]".format(newConnections.mkString("\n\t")))
disconnectedConnections
}
}
private[cluster] def joinCluster() {
@ -1437,7 +1444,7 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
EventHandler.info(this, "failOverClusterActorRef from %s to %s".format(from, to))
EventHandler.info(this, "Failing over ClusterActorRef from %s to %s".format(from, to))
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}
@ -1516,7 +1523,7 @@ class DefaultClusterNode private[akka] (
.build
// FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
nodeConnections.values foreach {
nodeConnections.get.connections.values foreach {
case (_, connection) sendCommandToNode(connection, command, async = true)
}
}
@ -1589,8 +1596,6 @@ class DefaultClusterNode private[akka] (
private def createMBean = {
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
override def start() = self.start()
override def stop() = self.shutdown()
override def disconnect() = self.disconnect()
@ -1599,8 +1604,6 @@ class DefaultClusterNode private[akka] (
override def resign() = self.resign()
override def isConnected = self.isConnected.isOn
override def getNodeAddres = self.nodeAddress
override def getRemoteServerHostname = self.hostname
@ -1789,7 +1792,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
message.getMessageType match {
case USE handleUse(message)
case RELEASE handleRelease(message)
case START cluster.start()
case STOP cluster.shutdown()
case DISCONNECT cluster.disconnect()
case RECONNECT cluster.reconnect()
@ -1916,7 +1918,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success)
self.reply(Success(cluster.remoteServerAddress.toString))
} catch {
case error: Throwable
self.reply(Failure(error))

View file

@ -70,7 +70,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def start(): this.type = synchronized[this.type] {
if (_status == ActorRefInternals.UNSTARTED) {
_status = ActorRefInternals.RUNNING
//TODO add this? Actor.registry.register(this)
//Actor.registry.register(this)
}
this
}
@ -78,7 +78,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
//TODO add this? Actor.registry.unregister(this)
//Actor.registry.unregister(this)
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
@ -104,7 +104,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
(s.version, s.connections.values)
}
def size(): Int = state.get().connections.size
def size: Int = state.get().connections.size
def stopAll() {
state.get().connections.values foreach (_.stop()) // shut down all remote connections
@ -119,7 +119,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
val newMap = oldState.connections map {
case (`from`, actorRef)
change = true
actorRef.stop()
// actorRef.stop()
(to, createRemoteActorRef(actorRef.address, to))
case other other
}
@ -156,8 +156,6 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
}
class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef])
case class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef])
}
}

View file

@ -239,8 +239,10 @@ abstract class RemoteClient private[akka] (
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultPromise[T](request.getActorInfo.getTimeout)
val futureResult =
if (senderFuture.isDefined) senderFuture.get
else new DefaultPromise[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
@ -347,12 +349,20 @@ class ActiveRemoteClient private[akka] (
EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress))
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
try {
connection = bootstrap.connect(remoteAddress)
} catch {
case e: Exception
EventHandler.error(e, this, "Remote client failed to connect to [%s]".format(remoteAddress))
throw e
}
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this)
EventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress))
false
} else {
//Send cookie
@ -389,7 +399,7 @@ class ActiveRemoteClient private[akka] (
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress), this)
EventHandler.error(connection.getCause, this, "Reconnection to [%s] has failed".format(remoteAddress))
false
} else {
@ -489,6 +499,7 @@ class ActiveRemoteClientHandler(
client.module.shutdownClientConnection(remoteAddress)
}
}
case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
@ -507,6 +518,7 @@ class ActiveRemoteClientHandler(
future.completeWithException(parseException(reply, client.loader))
}
}
case other
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
}
@ -554,7 +566,7 @@ class ActiveRemoteClientHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
if (event.getCause ne null)
EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this)
EventHandler.error(event.getCause, this, "Unexpected exception from downstream in remote client")
else
EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event))
@ -563,7 +575,7 @@ class ActiveRemoteClientHandler(
spawn {
client.module.shutdownClientConnection(remoteAddress)
}
case e
case e: Exception
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
}
@ -573,13 +585,14 @@ class ActiveRemoteClientHandler(
val exception = reply.getException
val classname = exception.getClassname
try {
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
val exceptionClass =
if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Throwable
case problem: Exception
EventHandler.error(problem, this, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
@ -661,7 +674,6 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
@ -717,7 +729,6 @@ trait NettyRemoteServerModule extends RemoteServerModule {
_isRunning switchOff {
currentServer.getAndSet(None) foreach { instance
EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
instance.shutdown()
}
}
@ -920,7 +931,7 @@ class RemoteServerHandler(
try {
actor ! PoisonPill
} catch {
case e: Exception EventHandler.error(e, "Couldn't stop %s".format(actor), this)
case e: Exception EventHandler.error(e, this, "Couldn't stop %s".format(actor))
}
}
@ -946,7 +957,7 @@ class RemoteServerHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this)
EventHandler.error(event.getCause, this, "Unexpected exception from remote downstream")
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))

View file

@ -12,7 +12,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
def testNodes: Int
override def beforeAll() = {
Cluster.startLocalCluster()
LocalCluster.startLocalCluster()
onReady()
ClusterTestNode.ready(getClass.getName)
}
@ -23,7 +23,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
ClusterTestNode.waitForExits(getClass.getName, testNodes - 1)
ClusterTestNode.cleanUp(getClass.getName)
onShutdown()
Cluster.shutdownLocalCluster()
LocalCluster.shutdownLocalCluster()
}
def onShutdown() = {}

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.event-handler-level = "DEBUG"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.event-handler-level = "DEBUG"

View file

@ -34,7 +34,7 @@ class NewLeaderChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
})
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node2", NrOfNodes) {
@ -57,7 +57,7 @@ class NewLeaderChangeListenerMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
node.shutdown()

View file

@ -34,7 +34,7 @@ class NodeConnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
})
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node2", NrOfNodes) {
@ -55,7 +55,7 @@ class NodeConnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
barrier("start-node1", NrOfNodes) {}
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
node.shutdown()

View file

@ -34,7 +34,7 @@ class NodeDisconnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode
})
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node2", NrOfNodes) {
@ -57,7 +57,7 @@ class NodeDisconnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
node.shutdown()

View file

@ -25,7 +25,7 @@ class ConfigurationStorageMultiJvmNode1 extends MasterClusterTestNode {
"be able to store, read and remove custom configuration data" in {
barrier("start-node-1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@ -65,7 +65,7 @@ class ConfigurationStorageMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
node.start()
Cluster.node
}
barrier("store-config-data-node-1", NrOfNodes) {

View file

@ -28,7 +28,7 @@ class LeaderElectionMultiJvmNode1 extends MasterClusterTestNode {
"be able to elect a single leader in the cluster and perform re-election if leader resigns" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
node.isLeader must be === true
@ -55,7 +55,7 @@ class LeaderElectionMultiJvmNode2 extends ClusterTestNode {
node.isLeader must be === false
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
node.isLeader must be === false

View file

@ -50,7 +50,7 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode {
"be able to store an ActorRef in the cluster without a replication strategy and retrieve it with 'use'" in {
barrier("start-node-1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@ -86,7 +86,7 @@ class RegistryStoreMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
node.start()
Cluster.node
}
barrier("store-1-in-node-1", NrOfNodes) {

View file

@ -27,7 +27,7 @@ class DeploymentMultiJvmNode1 extends MasterClusterTestNode {
"be able to deploy deployments in akka.conf and lookup the deployments by 'address'" in {
barrier("start-node-1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@ -56,7 +56,7 @@ class DeploymentMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
node.start()
Cluster.node
}
barrier("perform-deployment-on-node-1", NrOfNodes) {

View file

@ -33,7 +33,7 @@ class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode {
"be able to migrate an actor from one node to another" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@ -67,7 +67,7 @@ class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
Thread.sleep(2000) // wait for fail-over from node1 to node2
@ -75,7 +75,8 @@ class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode {
barrier("check-fail-over-to-node2", NrOfNodes - 1) {
// both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
val actorRef = Actor.registry.local.actorFor("hello-world")
.getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? "Hello").as[String].get must be("World from node [node2]")
}
@ -110,7 +111,7 @@ class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode {
}
barrier("start-node3", NrOfNodes - 1) {
node.start()
Cluster.node
}
Thread.sleep(2000) // wait for fail-over from node2 to node3
@ -118,7 +119,8 @@ class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode {
barrier("check-fail-over-to-node3", NrOfNodes - 2) {
// both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
val actorRef = Actor.registry.local.actorFor("hello-world")
.getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? "Hello").as[String].get must be("World from node [node3]")
}

View file

@ -39,7 +39,7 @@
* "be able to migrate an actor from one node to another" in {
*
* barrier("start-node-1", NrOfNodes) {
* node.start()
* Cluster.node
* }
*
* barrier("start-node-2", NrOfNodes) {
@ -80,7 +80,7 @@
* }
*
* barrier("start-node-2", NrOfNodes) {
* node.start()
* Cluster.node
* }
*
* barrier("store-1-in-node-1", NrOfNodes) {

View file

@ -1,6 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2

View file

@ -41,7 +41,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
"ClusterActorRef" must {
"cleanup itself" in {
node.start
Cluster.node
barrier("awaitStarted", NrOfNodes).await()
val ref = Actor.actorOf[ClusterActorRefCleanupMultiJvmSpec.TestActor]("service-test")
@ -127,7 +127,7 @@ class ClusterActorRefCleanupMultiJvmNode2 extends ClusterTestNode {
}
})
node.start()
Cluster.node
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()
@ -151,7 +151,7 @@ class ClusterActorRefCleanupMultiJvmNode3 extends ClusterTestNode {
}
})
node.start()
Cluster.node
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -37,7 +37,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends Cluste
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@ -68,7 +68,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends Master
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -39,7 +39,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterT
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@ -89,7 +89,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterCl
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2

View file

@ -1,6 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,6 +0,0 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,119 +0,0 @@
/*
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.replication.transactionlog.writethrough.nosnapshot
import akka.actor._
import akka.cluster._
import Cluster._
import akka.config.Config
object ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec {
var NrOfNodes = 2
sealed trait TransactionLogMessage extends Serializable
case class Count(nr: Int) extends TransactionLogMessage
case class Log(full: String) extends TransactionLogMessage
case object GetLog extends TransactionLogMessage
class HelloWorld extends Actor with Serializable {
var log = ""
def receive = {
case Count(nr)
log += nr.toString
self.reply("World from node [" + Config.nodename + "]")
case GetLog
self.reply(Log(log))
}
}
}
/*
class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends ClusterTestNode {
import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
"A cluster" must {
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" ignore {
barrier("start-node1", NrOfNodes) {
node.start()
}
barrier("create-actor-on-node1", NrOfNodes) {
val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
node.isInUseOnNode("hello-world") must be(true)
actorRef.address must be("hello-world")
var counter = 0
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
counter += 1
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
}
barrier("start-node2", NrOfNodes) {
}
node.shutdown()
}
}
}
class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends MasterClusterTestNode {
import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
val testNodes = NrOfNodes
"A cluster" must {
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
}
barrier("create-actor-on-node1", NrOfNodes) {
}
barrier("start-node2", NrOfNodes) {
node.start()
}
Thread.sleep(5000) // wait for fail-over from node1 to node2
barrier("check-fail-over-to-node2", NrOfNodes - 1) {
// both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
}
node.shutdown()
}
}
override def onReady() {
LocalBookKeeperEnsemble.start()
}
override def onShutdown() {
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
}
}*/

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"

View file

@ -38,7 +38,7 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends Clust
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@ -70,7 +70,7 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends Maste
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "ERROR"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -39,7 +39,7 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends Cluster
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@ -89,7 +89,7 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterC
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2

View file

@ -46,7 +46,7 @@ class FailoverDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
val ignoreExceptions = Seq(EventFilter[NotYetConnectedException], EventFilter[ConnectException])
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]
@ -71,7 +71,7 @@ class FailoverDirectRoutingMultiJvmNode2 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)

View file

@ -27,7 +27,7 @@ class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
"___" must {
"___" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
@ -41,7 +41,7 @@ class HomeNodeMultiJvmNode2 extends ClusterTestNode {
"Direct Router: A Direct Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1").start()

View file

@ -29,7 +29,7 @@ class SingleReplicaDirectRoutingMultiJvmNode1 extends MasterClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("waiting-to-end", NrOfNodes).await()
@ -44,7 +44,7 @@ class SingleReplicaDirectRoutingMultiJvmNode2 extends ClusterTestNode {
"Direct Router: when node send message to existing node it" must {
"communicate with that node" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
val actor = Actor.actorOf[SomeActor]("service-hello").start().asInstanceOf[ClusterActorRef]

View file

@ -49,7 +49,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
val ignoreExceptions = Seq(EventFilter[NotYetConnectedException], EventFilter[ConnectException])
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
// ============= the real testing =================
@ -102,7 +102,7 @@ class RandomFailoverMultiJvmNode2 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)
@ -116,7 +116,7 @@ class RandomFailoverMultiJvmNode3 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Thread.sleep(30 * 1000)

View file

@ -27,7 +27,7 @@ class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
"___" must {
"___" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
node.shutdown()
@ -41,7 +41,7 @@ class HomeNodeMultiJvmNode2 extends ClusterTestNode {
"Random Router: A Random Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1")

View file

@ -33,7 +33,7 @@ class Random1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
"Random Router: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
Cluster.node
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)

View file

@ -35,7 +35,7 @@ class Random3ReplicasMultiJvmNode1 extends MasterClusterTestNode {
"___" must {
"___" in {
node.start()
Cluster.node
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
@ -55,7 +55,7 @@ class Random3ReplicasMultiJvmNode2 extends ClusterTestNode {
"Random: A cluster" must {
"distribute requests randomly" in {
node.start()
Cluster.node
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()
@ -103,7 +103,7 @@ class Random3ReplicasMultiJvmNode3 extends ClusterTestNode {
"___" must {
"___" in {
node.start()
Cluster.node
//wait till node 1 has started.
barrier("begin", NrOfNodes).await()

View file

@ -2,6 +2,7 @@ akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node3"]
akka.cluster.include-ref-node-in-replica-set = on
akka.actor.timeout = 30

View file

@ -1,6 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node3"]
akka.cluster.include-ref-node-in-replica-set = on
akka.actor.timeout = 30

View file

@ -1,6 +1,8 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node2"]
akka.actor.deployment.service-hello.clustered.replication-factor = 2
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node3"]
akka.cluster.include-ref-node-in-replica-set = on
akka.actor.timeout = 30

View file

@ -15,18 +15,12 @@ object RoundRobinFailoverMultiJvmSpec {
val NrOfNodes = 3
class SomeActor extends Actor with Serializable {
//println("---------------------------------------------------------------------------")
//println("SomeActor has been created on node [" + Config.nodename + "]")
//println("---------------------------------------------------------------------------")
def receive = {
case "identify" {
//println("The node received the 'identify' command")
self.reply(Config.nodename)
}
case "shutdown" {
//println("The node received the 'shutdown' command")
new Thread() {
override def run() {
Thread.sleep(2000)
@ -45,12 +39,6 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
def testNodes = NrOfNodes
def sleepSome() {
//println("Starting sleep")
Thread.sleep(10000) //nasty.. but ok for now.
//println("Finished doing sleep")
}
"Round Robin: when round robin fails" must {
"jump to another replica" in {
val ignoreExceptions = Seq(
@ -60,38 +48,42 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
var oldFoundConnections: JSet[String] = null
var actor: ActorRef = null
// ============= the real testing =================
val actor = Actor.actorOf[SomeActor]("service-hello").asInstanceOf[ClusterActorRef]
Cluster.barrier("node-start", NrOfNodes) {
Cluster.node
}
val oldFoundConnections = identifyConnections(actor)
//println("---------------------------- oldFoundConnections ------------------------")
//println(oldFoundConnections)
Cluster.barrier("actor-creation", NrOfNodes) {
actor = Actor.actorOf[SomeActor]("service-hello")
actor.isInstanceOf[ClusterActorRef] must be(true)
//since we have replication factor 2
oldFoundConnections.size() must be(2)
val actor2 = Actor.registry.local.actorFor("service-hello")
.getOrElse(fail("Actor should have been in the local actor registry"))
// actor2.isInstanceOf[ClusterActorRef] must be(true)
//terminate a node
actor ! "shutdown"
Cluster.node.isInUseOnNode("service-hello") must be(true)
oldFoundConnections = identifyConnections(actor)
sleepSome()
//since we have replication factor 2
oldFoundConnections.size() must be(2)
}
//this is where the system behaves unpredictable. From time to time it works... from time to time there
//all kinds of connection timeouts. So this test shows that there are problems. For the time being
//the test code has been deactivated to prevent causing problems.
Thread.sleep(5000) // wait for fail-over from node3
//val newFoundConnections = identifyConnections(actor)
//println("---------------------------- newFoundConnections ------------------------")
//println(newFoundConnections)
Cluster.barrier("verify-fail-over", NrOfNodes - 1) {
val newFoundConnections = identifyConnections(actor)
//it still must be 2 since a different node should have been used to failover to
//newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
//Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
//newFoundConnections.equals(oldFoundConnections) must be(false)
//it still must be 2 since a different node should have been used to failover to
newFoundConnections.size() must be(2)
//they are not disjoint since, there must be a single element that is in both
Collections.disjoint(newFoundConnections, oldFoundConnections) must be(false)
//but they should not be equal since the shutdown-node has been replaced by another one.
newFoundConnections.equals(oldFoundConnections) must be(false)
}
Cluster.node.shutdown()
}
@ -113,10 +105,19 @@ class RoundRobinFailoverMultiJvmNode2 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("node-start", NrOfNodes) {
Cluster.node
}
Thread.sleep(30 * 1000)
Cluster.barrier("actor-creation", NrOfNodes) {
}
Cluster.node.isInUseOnNode("service-hello") must be(false)
Thread.sleep(5000) // wait for fail-over from node3
Cluster.barrier("verify-fail-over", NrOfNodes - 1) {
}
}
}
}
@ -127,10 +128,16 @@ class RoundRobinFailoverMultiJvmNode3 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
Cluster.barrier("node-start", NrOfNodes) {
Cluster.node
}
Thread.sleep(30 * 1000)
Cluster.barrier("actor-creation", NrOfNodes) {
}
Cluster.node.isInUseOnNode("service-hello") must be(true)
Cluster.node.shutdown()
}
}
}

View file

@ -28,7 +28,7 @@ class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
"___" must {
"___" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
barrier("waiting-for-end", NrOfNodes).await()
@ -44,7 +44,7 @@ class HomeNodeMultiJvmNode2 extends ClusterTestNode {
"Round Robin: A Router" must {
"obey 'home-node' config option when instantiated actor in cluster" in {
node.start()
Cluster.node
barrier("waiting-for-begin", NrOfNodes).await()
val actorNode1 = Actor.actorOf[SomeActor]("service-node1").start()

View file

@ -32,7 +32,7 @@ class RoundRobin1ReplicaMultiJvmNode1 extends MasterClusterTestNode {
"Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
node.start()
Cluster.node
var hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)

View file

@ -10,6 +10,7 @@ import org.scalatest.BeforeAndAfterAll
import akka.cluster._
import Cluster._
import LocalCluster._
import akka.actor._
import akka.actor.Actor._
import akka.config.Config
@ -47,7 +48,7 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
//wait till ndoe 2 has started.
@ -89,7 +90,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
//wait till node 3 has started.

View file

@ -12,6 +12,7 @@ import akka.cluster._
import akka.actor._
import akka.actor.Actor._
import akka.config.Config
import LocalCluster._
import Cluster._
/**
@ -41,7 +42,7 @@ class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {
node.start()
Cluster.node
}
//wait till ndoe 2 has started.
@ -82,7 +83,7 @@ class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
node.start()
Cluster.node
}
//wait till node 3 has started.
@ -142,7 +143,7 @@ class RoundRobin3ReplicasMultiJvmNode3 extends WordSpec with MustMatchers {
barrier("start-node2", NrOfNodes).await()
barrier("start-node3", NrOfNodes) {
node.start()
Cluster.node
}
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()

View file

@ -56,7 +56,7 @@ class ScatterGatherFailoverMultiJvmNode1 extends MasterClusterTestNode {
EventHandler.notify(TestEvent.Mute(ignoreExceptions))
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
/*
@ -97,7 +97,7 @@ class ScatterGatherFailoverMultiJvmNode2 extends ClusterTestNode {
"___" must {
"___" in {
Cluster.node.start()
Cluster.node
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
/*

View file

@ -103,7 +103,7 @@ object PingPongMultiJvmNode1 {
pause("start", "Ready to start all nodes")
println("Starting nodes ...")
node.start
Cluster.node
node.barrier("start", NrOfNodes) {
// wait for others to start
@ -205,7 +205,7 @@ class PongNode(number: Int) {
pause("start")
node.barrier("start", NrOfNodes) {
node.start
Cluster.node
}
pause("create")

View file

@ -218,7 +218,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
LocalCluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()

View file

@ -175,13 +175,13 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo
}
override def beforeAll() = {
Cluster.startLocalCluster()
LocalCluster.startLocalCluster()
LocalBookKeeperEnsemble.start()
}
override def afterAll() = {
Cluster.node.shutdown()
Cluster.shutdownLocalCluster()
LocalCluster.shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
Actor.registry.local.shutdownAll()

View file

@ -284,7 +284,7 @@ something in coordination::
"be able to start all nodes" in {
Cluster.barrier("start", NrOfNodes) {
Cluster.node.start()
Cluster.node
}
Cluster.node.isRunning must be(true)
Cluster.node.shutdown()
@ -305,7 +305,7 @@ something in coordination::
"be able to start all nodes" in {
Cluster.barrier("start", NrOfNodes) {
Cluster.node.start()
Cluster.node
}
Cluster.node.isRunning must be(true)
Cluster.node.shutdown()

View file

@ -131,11 +131,12 @@ akka {
mongodb {
# Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes
uri = "mongodb://localhost/akka.mailbox" # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections
uri = "mongodb://localhost/akka.mailbox" # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections
# Configurable timeouts for certain ops
timeout {
read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future
write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future
read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future
write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future
}
}

View file

@ -6,5 +6,5 @@ include "akka-reference.conf"
akka {
event-handlers = ["akka.testkit.TestEventListener"]
event-handler-level = "WARNING"
event-handler-level = "INFO"
}