diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 1dc422a148..9a51553f81 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -115,6 +115,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exc
override def getMessage =
if (ref ne null) "Actor %s does not handle [%s]".format(ref, msg)
else "Actor does not handle [%s]".format(msg)
+
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
@@ -123,7 +124,7 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef = null) extends Exc
*/
object Status {
sealed trait Status extends Serializable
- case object Success extends Status
+ case class Success(status: AnyRef) extends Status
case class Failure(cause: Throwable) extends Status
}
@@ -131,6 +132,7 @@ case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
}
+
object Timeout {
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
@@ -185,11 +187,7 @@ object Actor {
/**
* Handle to the ClusterNode. API for the cluster client.
*/
- lazy val cluster: ClusterNode = {
- val node = ClusterModule.node
- node.start()
- node
- }
+ lazy val cluster: ClusterNode = ClusterModule.node
/**
* Handle to the RemoteSupport. API for the remote client/server.
@@ -782,4 +780,4 @@ trait Actor {
}
private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior
-}
\ No newline at end of file
+}
diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
index e36492879c..eb470600d0 100644
--- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
+++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
@@ -129,10 +129,6 @@ trait ClusterNode {
def remoteServerAddress: InetSocketAddress
- def isRunning: Boolean
-
- def start(): ClusterNode
-
def shutdown()
def disconnect(): ClusterNode
@@ -424,6 +420,7 @@ trait ClusterNode {
// FIXME considering moving all these private[cluster] methods to a separate trait to get them out of the user's view
private[cluster] def remoteClientLifeCycleListener: ActorRef
+
private[cluster] def remoteDaemon: ActorRef
/**
diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala
index c6ea69dcc0..99f27313ff 100644
--- a/akka-actor/src/main/scala/akka/event/EventHandler.scala
+++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala
@@ -89,7 +89,7 @@ object EventHandler extends ListenerManagement {
class EventHandlerException extends AkkaException
- lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("event:handler").build
+ lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("akka:event:handler").build
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala
index 5fb5986b25..66febb36b8 100644
--- a/akka-actor/src/main/scala/akka/routing/Routing.scala
+++ b/akka-actor/src/main/scala/akka/routing/Routing.scala
@@ -12,6 +12,7 @@ import akka.actor._
import akka.dispatch.Futures
import akka.event.EventHandler
import akka.actor.UntypedChannel._
+
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
/**
@@ -57,6 +58,11 @@ object RouterType {
*/
object LeastMessages extends RouterType
+ /**
+ * A user-defined custom RouterType.
+ */
+ object Custom extends RouterType
+
}
/**
@@ -106,6 +112,11 @@ trait RouterConnections {
*/
def version: Long
+ /**
+ * Returns the number of connections.
+ */
+ def size: Int
+
/**
* Returns a tuple containing the version and Iterable of all connected ActorRefs this Router uses to send messages to.
*
@@ -151,12 +162,14 @@ object Routing {
case RouterType.Direct ⇒
if (connections.size > 1)
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
-
actorOf(actorAddress, connections, new DirectRouter())
+
case RouterType.Random ⇒
actorOf(actorAddress, connections, new RandomRouter())
+
case RouterType.RoundRobin ⇒
actorOf(actorAddress, connections, new RoundRobinRouter())
+
case _ ⇒ throw new IllegalArgumentException("Unsupported routerType " + routerType)
}
@@ -191,9 +204,8 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
router.route(message)(sender)
}
- override def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any,
- timeout: Timeout,
- channel: UntypedChannel): Future[Any] = {
+ override def postMessageToMailboxAndCreateFutureResultWithTimeout(
+ message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef ⇒ Some(ref)
case _ ⇒ None
@@ -230,9 +242,11 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
def version: Long = state.get().version
+ def size: Int = state.get().connections.size
+
def versionedIterator = {
val s = state.get
- (s.version, s.connectionIterable)
+ (s.version, s.connections)
}
@tailrec
@@ -240,9 +254,9 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
val oldState = state.get()
//remote the ref from the connections.
- var newList = oldState.connectionIterable.filter(currentActorRef ⇒ currentActorRef ne ref)
+ var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref)
- if (newList.size != oldState.connectionIterable.size) {
+ if (newList.size != oldState.connections.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
val newState = new State(oldState.version + 1, newList)
@@ -251,9 +265,8 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
}
}
- class State(val version: Long, val connectionIterable: Iterable[ActorRef])
+ case class State(val version: Long, val connections: Iterable[ActorRef])
}
-
}
/**
@@ -293,7 +306,6 @@ trait BasicRouter extends Router {
case e: Exception ⇒
connections.signalDeadActor(actor)
throw e
-
}
case None ⇒
throwNoConnectionsError()
@@ -302,7 +314,7 @@ trait BasicRouter extends Router {
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
case Routing.Broadcast(message) ⇒
- throw new RoutingException("Broadcasting using a future for the time being is not supported")
+ throw new RoutingException("Broadcasting using '?' is for the time being is not supported. Use ScatterGatherRouter.")
case _ ⇒
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
@@ -313,7 +325,6 @@ trait BasicRouter extends Router {
case e: Exception ⇒
connections.signalDeadActor(actor)
throw e
-
}
case None ⇒
throwNoConnectionsError()
@@ -330,7 +341,7 @@ trait BasicRouter extends Router {
}
/**
- * A DirectRouter is
+ * A DirectRouter is FIXME
*
* @author Jonas Bonér
*/
@@ -369,7 +380,7 @@ class DirectRouter extends BasicRouter {
}
}
- private class DirectRouterState(val ref: ActorRef, val version: Long)
+ private case class DirectRouterState(val ref: ActorRef, val version: Long)
}
@@ -404,9 +415,9 @@ class RandomRouter extends BasicRouter {
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
-
val (version, connectionIterable) = connections.versionedIterator
val newState = new RandomRouterState(connectionIterable.toArray[ActorRef], version)
+
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@@ -417,8 +428,7 @@ class RandomRouter extends BasicRouter {
}
}
- private class RandomRouterState(val array: Array[ActorRef], val version: Long)
-
+ private case class RandomRouterState(val array: Array[ActorRef], val version: Long)
}
/**
@@ -441,9 +451,9 @@ class RoundRobinRouter extends BasicRouter {
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
-
val (version, connectionIterable) = connections.versionedIterator
val newState = new RoundRobinState(connectionIterable.toArray[ActorRef], version)
+
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
@@ -454,7 +464,7 @@ class RoundRobinRouter extends BasicRouter {
}
}
- private class RoundRobinState(val array: Array[ActorRef], val version: Long) {
+ private case class RoundRobinState(val array: Array[ActorRef], val version: Long) {
private val index = new AtomicInteger(0)
@@ -469,7 +479,6 @@ class RoundRobinRouter extends BasicRouter {
else oldIndex
}
}
-
}
/*
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 4a82884f03..f7a2f2a249 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -31,7 +31,7 @@ object ReflectiveAccess {
* @author Jonas Bonér
*/
object ClusterModule {
- lazy val isEnabled = Config.isClusterEnabled && clusterInstance.isDefined
+ lazy val isEnabled = Config.isClusterEnabled //&& clusterInstance.isDefined
def ensureEnabled() {
if (!isEnabled) {
diff --git a/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java b/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
index a55ad42536..a5d4016cc7 100644
--- a/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
+++ b/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
@@ -10,56 +10,53 @@ public final class ClusterProtocol {
}
public enum RemoteDaemonMessageType
implements com.google.protobuf.ProtocolMessageEnum {
- START(0, 1),
- STOP(1, 2),
- USE(2, 3),
- RELEASE(3, 4),
- MAKE_AVAILABLE(4, 5),
- MAKE_UNAVAILABLE(5, 6),
- DISCONNECT(6, 7),
- RECONNECT(7, 8),
- RESIGN(8, 9),
- FAIL_OVER_CONNECTIONS(9, 10),
- FUNCTION_FUN0_UNIT(10, 11),
- FUNCTION_FUN0_ANY(11, 12),
- FUNCTION_FUN1_ARG_UNIT(12, 13),
- FUNCTION_FUN1_ARG_ANY(13, 14),
+ STOP(0, 1),
+ USE(1, 2),
+ RELEASE(2, 3),
+ MAKE_AVAILABLE(3, 4),
+ MAKE_UNAVAILABLE(4, 5),
+ DISCONNECT(5, 6),
+ RECONNECT(6, 7),
+ RESIGN(7, 8),
+ FAIL_OVER_CONNECTIONS(8, 9),
+ FUNCTION_FUN0_UNIT(9, 10),
+ FUNCTION_FUN0_ANY(10, 11),
+ FUNCTION_FUN1_ARG_UNIT(11, 12),
+ FUNCTION_FUN1_ARG_ANY(12, 13),
;
- public static final int START_VALUE = 1;
- public static final int STOP_VALUE = 2;
- public static final int USE_VALUE = 3;
- public static final int RELEASE_VALUE = 4;
- public static final int MAKE_AVAILABLE_VALUE = 5;
- public static final int MAKE_UNAVAILABLE_VALUE = 6;
- public static final int DISCONNECT_VALUE = 7;
- public static final int RECONNECT_VALUE = 8;
- public static final int RESIGN_VALUE = 9;
- public static final int FAIL_OVER_CONNECTIONS_VALUE = 10;
- public static final int FUNCTION_FUN0_UNIT_VALUE = 11;
- public static final int FUNCTION_FUN0_ANY_VALUE = 12;
- public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 13;
- public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 14;
+ public static final int STOP_VALUE = 1;
+ public static final int USE_VALUE = 2;
+ public static final int RELEASE_VALUE = 3;
+ public static final int MAKE_AVAILABLE_VALUE = 4;
+ public static final int MAKE_UNAVAILABLE_VALUE = 5;
+ public static final int DISCONNECT_VALUE = 6;
+ public static final int RECONNECT_VALUE = 7;
+ public static final int RESIGN_VALUE = 8;
+ public static final int FAIL_OVER_CONNECTIONS_VALUE = 9;
+ public static final int FUNCTION_FUN0_UNIT_VALUE = 10;
+ public static final int FUNCTION_FUN0_ANY_VALUE = 11;
+ public static final int FUNCTION_FUN1_ARG_UNIT_VALUE = 12;
+ public static final int FUNCTION_FUN1_ARG_ANY_VALUE = 13;
public final int getNumber() { return value; }
public static RemoteDaemonMessageType valueOf(int value) {
switch (value) {
- case 1: return START;
- case 2: return STOP;
- case 3: return USE;
- case 4: return RELEASE;
- case 5: return MAKE_AVAILABLE;
- case 6: return MAKE_UNAVAILABLE;
- case 7: return DISCONNECT;
- case 8: return RECONNECT;
- case 9: return RESIGN;
- case 10: return FAIL_OVER_CONNECTIONS;
- case 11: return FUNCTION_FUN0_UNIT;
- case 12: return FUNCTION_FUN0_ANY;
- case 13: return FUNCTION_FUN1_ARG_UNIT;
- case 14: return FUNCTION_FUN1_ARG_ANY;
+ case 1: return STOP;
+ case 2: return USE;
+ case 3: return RELEASE;
+ case 4: return MAKE_AVAILABLE;
+ case 5: return MAKE_UNAVAILABLE;
+ case 6: return DISCONNECT;
+ case 7: return RECONNECT;
+ case 8: return RESIGN;
+ case 9: return FAIL_OVER_CONNECTIONS;
+ case 10: return FUNCTION_FUN0_UNIT;
+ case 11: return FUNCTION_FUN0_ANY;
+ case 12: return FUNCTION_FUN1_ARG_UNIT;
+ case 13: return FUNCTION_FUN1_ARG_ANY;
default: return null;
}
}
@@ -90,7 +87,7 @@ public final class ClusterProtocol {
}
private static final RemoteDaemonMessageType[] VALUES = {
- START, STOP, USE, RELEASE, MAKE_AVAILABLE, MAKE_UNAVAILABLE, DISCONNECT, RECONNECT, RESIGN, FAIL_OVER_CONNECTIONS, FUNCTION_FUN0_UNIT, FUNCTION_FUN0_ANY, FUNCTION_FUN1_ARG_UNIT, FUNCTION_FUN1_ARG_ANY,
+ STOP, USE, RELEASE, MAKE_AVAILABLE, MAKE_UNAVAILABLE, DISCONNECT, RECONNECT, RESIGN, FAIL_OVER_CONNECTIONS, FUNCTION_FUN0_UNIT, FUNCTION_FUN0_ANY, FUNCTION_FUN1_ARG_UNIT, FUNCTION_FUN1_ARG_ANY,
};
public static RemoteDaemonMessageType valueOf(
@@ -246,7 +243,7 @@ public final class ClusterProtocol {
}
private void initFields() {
- messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
+ messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
actorUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
actorAddress_ = "";
payload_ = com.google.protobuf.ByteString.EMPTY;
@@ -434,7 +431,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@@ -450,7 +447,7 @@ public final class ClusterProtocol {
public Builder clear() {
super.clear();
- messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
+ messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
bitField0_ = (bitField0_ & ~0x00000001);
if (actorUuidBuilder_ == null) {
actorUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
@@ -658,7 +655,7 @@ public final class ClusterProtocol {
private int bitField0_;
// required .RemoteDaemonMessageType messageType = 1;
- private akka.cluster.ClusterProtocol.RemoteDaemonMessageType messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
+ private akka.cluster.ClusterProtocol.RemoteDaemonMessageType messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
public boolean hasMessageType() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
@@ -676,7 +673,7 @@ public final class ClusterProtocol {
}
public Builder clearMessageType() {
bitField0_ = (bitField0_ & ~0x00000001);
- messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
+ messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.STOP;
onChanged();
return this;
}
@@ -1247,7 +1244,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@@ -1849,7 +1846,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@@ -2077,14 +2074,14 @@ public final class ClusterProtocol {
"\002(\t\022\032\n\022senderActorAddress\030\002 \001(\t\022!\n\nfutur" +
"eUuid\030\003 \001(\0132\r.UuidProtocol\022\017\n\007message\030\004 " +
"\002(\014\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low",
- "\030\002 \002(\004*\232\002\n\027RemoteDaemonMessageType\022\t\n\005ST" +
- "ART\020\001\022\010\n\004STOP\020\002\022\007\n\003USE\020\003\022\013\n\007RELEASE\020\004\022\022\n" +
- "\016MAKE_AVAILABLE\020\005\022\024\n\020MAKE_UNAVAILABLE\020\006\022" +
- "\016\n\nDISCONNECT\020\007\022\r\n\tRECONNECT\020\010\022\n\n\006RESIGN" +
- "\020\t\022\031\n\025FAIL_OVER_CONNECTIONS\020\n\022\026\n\022FUNCTIO" +
- "N_FUN0_UNIT\020\013\022\025\n\021FUNCTION_FUN0_ANY\020\014\022\032\n\026" +
- "FUNCTION_FUN1_ARG_UNIT\020\r\022\031\n\025FUNCTION_FUN" +
- "1_ARG_ANY\020\016B\020\n\014akka.clusterH\001"
+ "\030\002 \002(\004*\217\002\n\027RemoteDaemonMessageType\022\010\n\004ST" +
+ "OP\020\001\022\007\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAIL" +
+ "ABLE\020\004\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNE" +
+ "CT\020\006\022\r\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\031\n\025FAIL_" +
+ "OVER_CONNECTIONS\020\t\022\026\n\022FUNCTION_FUN0_UNIT" +
+ "\020\n\022\025\n\021FUNCTION_FUN0_ANY\020\013\022\032\n\026FUNCTION_FU" +
+ "N1_ARG_UNIT\020\014\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\r" +
+ "B\020\n\014akka.clusterH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
diff --git a/akka-cluster/src/main/protocol/ClusterProtocol.proto b/akka-cluster/src/main/protocol/ClusterProtocol.proto
index 476548b4ac..9220a9e997 100644
--- a/akka-cluster/src/main/protocol/ClusterProtocol.proto
+++ b/akka-cluster/src/main/protocol/ClusterProtocol.proto
@@ -26,20 +26,19 @@ message RemoteDaemonMessageProtocol {
* Defines the remote daemon message type.
*/
enum RemoteDaemonMessageType {
- START = 1;
- STOP = 2;
- USE = 3;
- RELEASE = 4;
- MAKE_AVAILABLE = 5;
- MAKE_UNAVAILABLE = 6;
- DISCONNECT = 7;
- RECONNECT = 8;
- RESIGN = 9;
- FAIL_OVER_CONNECTIONS = 10;
- FUNCTION_FUN0_UNIT = 11;
- FUNCTION_FUN0_ANY = 12;
- FUNCTION_FUN1_ARG_UNIT = 13;
- FUNCTION_FUN1_ARG_ANY = 14;
+ STOP = 1;
+ USE = 2;
+ RELEASE = 3;
+ MAKE_AVAILABLE = 4;
+ MAKE_UNAVAILABLE = 5;
+ DISCONNECT = 6;
+ RECONNECT = 7;
+ RESIGN = 8;
+ FAIL_OVER_CONNECTIONS = 9;
+ FUNCTION_FUN0_UNIT = 10;
+ FUNCTION_FUN0_ANY = 11;
+ FUNCTION_FUN1_ARG_UNIT = 12;
+ FUNCTION_FUN1_ARG_ANY = 13;
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index d28f4e91e4..3ff416f2a7 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -49,8 +49,11 @@ import RemoteDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
+
import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap }
+import annotation.tailrec
+
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
/**
@@ -60,8 +63,6 @@ import java.util.concurrent.{ CopyOnWriteArrayList, Callable, ConcurrentHashMap
*/
trait ClusterNodeMBean {
- def start()
-
def stop()
def disconnect()
@@ -70,8 +71,6 @@ trait ClusterNodeMBean {
def resign()
- def isConnected: Boolean
-
def getRemoteServerHostname: String
def getRemoteServerPort: Int
@@ -139,78 +138,13 @@ trait ClusterNodeMBean {
def getActorAddressToUuidsPathForWithNodeName(actorAddress: String, uuid: UUID): String
}
-/**
- * Module for the Cluster. Also holds global state such as configuration data etc.
- *
- * @author Jonas Bonér
- */
-object Cluster {
- val EMPTY_STRING = "".intern
-
- // config options
- val name = Config.clusterName
- val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
- val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
- val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
- val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
- val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
- val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
- val enableJMX = config.getBool("akka.enable-jmx", true)
- val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
- val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
+object LocalCluster {
val clusterDirectory = config.getString("akka.cluster.log-directory", "_akka_cluster")
-
val clusterDataDirectory = clusterDirectory + "/data"
val clusterLogDirectory = clusterDirectory + "/log"
- @volatile
- private var properties = Map.empty[String, String]
-
- /**
- * Use to override JVM options such as -Dakka.cluster.nodename=node1 etc.
- * Currently supported options are:
- *
- * Cluster setProperty ("akka.cluster.nodename", "node1")
- * Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
- * Cluster setProperty ("akka.cluster.port", "1234")
- *
- */
- def setProperty(property: (String, String)) {
- properties = properties + property
- }
-
- private def nodename: String = properties.get("akka.cluster.nodename") match {
- case Some(uberride) ⇒ uberride
- case None ⇒ Config.nodename
- }
-
- private def hostname: String = properties.get("akka.cluster.hostname") match {
- case Some(uberride) ⇒ uberride
- case None ⇒ Config.hostname
- }
-
- private def port: Int = properties.get("akka.cluster.port") match {
- case Some(uberride) ⇒ uberride.toInt
- case None ⇒ Config.remoteServerPort
- }
-
- val defaultZooKeeperSerializer = new SerializableSerializer
-
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
- /**
- * The node address.
- */
- val nodeAddress = NodeAddress(name, nodename)
-
- /**
- * The reference to the running ClusterNode.
- */
- val node = {
- if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
- new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
- }
-
/**
* Looks up the local hostname.
*/
@@ -265,6 +199,73 @@ object Cluster {
_zkServer.set(None)
}
}
+}
+
+/**
+ * Module for the Cluster. Also holds global state such as configuration data etc.
+ *
+ * @author Jonas Bonér
+ */
+object Cluster {
+ val EMPTY_STRING = "".intern
+
+ // config options
+ val name = Config.clusterName
+ val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
+ val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
+ val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
+ val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
+ val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
+ val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
+ val enableJMX = config.getBool("akka.enable-jmx", true)
+ val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
+ val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
+
+ @volatile
+ private var properties = Map.empty[String, String]
+
+ /**
+ * Use to override JVM options such as -Dakka.cluster.nodename=node1 etc.
+ * Currently supported options are:
+ *
+ * Cluster setProperty ("akka.cluster.nodename", "node1")
+ * Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
+ * Cluster setProperty ("akka.cluster.port", "1234")
+ *
+ */
+ def setProperty(property: (String, String)) {
+ properties = properties + property
+ }
+
+ private def nodename: String = properties.get("akka.cluster.nodename") match {
+ case Some(uberride) ⇒ uberride
+ case None ⇒ Config.nodename
+ }
+
+ private def hostname: String = properties.get("akka.cluster.hostname") match {
+ case Some(uberride) ⇒ uberride
+ case None ⇒ Config.hostname
+ }
+
+ private def port: Int = properties.get("akka.cluster.port") match {
+ case Some(uberride) ⇒ uberride.toInt
+ case None ⇒ Config.remoteServerPort
+ }
+
+ val defaultZooKeeperSerializer = new SerializableSerializer
+
+ /**
+ * The node address.
+ */
+ val nodeAddress = NodeAddress(name, nodename)
+
+ /**
+ * The reference to the running ClusterNode.
+ */
+ val node = {
+ if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
+ new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
+ }
/**
* Creates a new AkkaZkClient.
@@ -345,6 +346,8 @@ class DefaultClusterNode private[akka] (
import Cluster._
+ // private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
+
private[cluster] lazy val remoteClientLifeCycleListener = localActorOf(new Actor {
def receive = {
case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule()
@@ -373,8 +376,6 @@ class DefaultClusterNode private[akka] (
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
- val isConnected = new Switch(false)
-
// static nodes
val CLUSTER_PATH = "/" + nodeAddress.clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
@@ -413,22 +414,25 @@ class DefaultClusterNode private[akka] (
// Address -> ClusterActorRef
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
+ case class VersionedConnectionState(version: Long, connections: Map[String, Tuple2[InetSocketAddress, ActorRef]])
+
+ // all the connections to other nodes
+ private[akka] val nodeConnections = {
+ var conns = Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]
+ // add the remote connection to 'this' node as well, but as a 'local' actor
+ if (includeRefNodeInReplicaSet) conns += (nodeAddress.nodeName -> (remoteServerAddress, remoteDaemon))
+ new AtomicReference[VersionedConnectionState](VersionedConnectionState(0, conns))
+ }
+
// ============================================================================================================
// ========== WARNING: THESE FIELDS AND EVERYTHING USING THEM IN THE CONSTRUCTOR NEEDS TO BE LAZY =============
// ============================================================================================================
- lazy private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
- val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
- if (includeRefNodeInReplicaSet)
- conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
- conns
- }
-
// ZooKeeper client
- lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
+ private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
// leader election listener, registered to the 'leaderLock' below
- lazy private[cluster] val leaderElectionCallback = new LockListener {
+ private[cluster] val leaderElectionCallback = new LockListener {
override def lockAcquired() {
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
self.publish(NewLeader(self.nodeAddress.nodeName))
@@ -440,7 +444,7 @@ class DefaultClusterNode private[akka] (
}
// leader election lock in ZooKeeper
- lazy private[cluster] val leaderLock = new WriteLock(
+ private[cluster] val leaderLock = new WriteLock(
zkClient.connection.getZookeeper,
LEADER_ELECTION_PATH, null,
leaderElectionCallback)
@@ -449,20 +453,12 @@ class DefaultClusterNode private[akka] (
if (enableJMX) createMBean
+ initializeNode()
+
// =======================================
// Node
// =======================================
- def isRunning: Boolean = isConnected.isOn
-
- def start(): ClusterNode = {
- isConnected.switchOn {
- initializeNode()
- }
-
- this
- }
-
private[cluster] def initializeNode() {
EventHandler.info(this,
("\nCreating cluster node with" +
@@ -478,6 +474,7 @@ class DefaultClusterNode private[akka] (
joinCluster()
joinLeaderElection()
fetchMembershipNodes()
+ //EventHandler.info(this, "Connected to nodes [\n\t%s]".format(nodeConnections.toList.map({ case (_, (address, _)) ⇒ address }).mkString("\n\t")))
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
@@ -488,7 +485,7 @@ class DefaultClusterNode private[akka] (
locallyCachedMembershipNodes.clear()
- nodeConnections.toList.foreach({
+ nodeConnections.get.connections.toList.foreach({
case (_, (address, _)) ⇒
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
@@ -501,15 +498,13 @@ class DefaultClusterNode private[akka] (
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
- nodeConnections.clear()
+ nodeConnections.set(VersionedConnectionState(0, Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]))
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
}
- isConnected.switchOff {
- shutdownNode()
- }
+ shutdownNode()
}
def disconnect(): ClusterNode = {
@@ -707,7 +702,7 @@ class DefaultClusterNode private[akka] (
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
- serializer: Serializer): ClusterNode = if (isConnected.isOn) {
+ serializer: Serializer): ClusterNode = {
EventHandler.debug(this,
"Storing actor with address [%s] in cluster".format(actorAddress))
@@ -757,7 +752,7 @@ class DefaultClusterNode private[akka] (
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
this
- } else throw new ClusterException("Not connected to cluster")
+ }
/**
* Removes actor from the cluster.
@@ -781,9 +776,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid clustered or not?
*/
- def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
- zkClient.exists(actorAddressRegistryPathFor(actorAddress))
- } else false
+ def isClustered(actorAddress: String): Boolean = zkClient.exists(actorAddressRegistryPathFor(actorAddress))
/**
* Is the actor with uuid in use on 'this' node or not?
@@ -793,9 +786,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid in use or not?
*/
- def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
- zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
- } else false
+ def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
@@ -807,7 +798,7 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
+ def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
@@ -883,7 +874,7 @@ class DefaultClusterNode private[akka] (
actorRef.start()
actorRef
}
- } else None
+ }
/**
* Using (checking out) actor on a specific set of nodes.
@@ -892,22 +883,19 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
- if (isConnected.isOn) {
+ val builder = RemoteDaemonMessageProtocol.newBuilder
+ .setMessageType(USE)
+ .setActorAddress(actorAddress)
- val builder = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(USE)
- .setActorAddress(actorAddress)
+ // set the UUID to replicated from - if available
+ replicateFromUuid foreach (uuid ⇒ builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
- // set the UUID to replicated from - if available
- replicateFromUuid foreach (uuid ⇒ builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
+ val command = builder.build
- val command = builder.build
-
- nodes foreach { node ⇒
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒
- sendCommandToNode(connection, command, async = false)
- }
+ nodes foreach { node ⇒
+ nodeConnections.get.connections(node) foreach {
+ case (address, connection) ⇒
+ sendCommandToNode(connection, command, async = false)
}
}
}
@@ -941,16 +929,14 @@ class DefaultClusterNode private[akka] (
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no
// longer available. Then what to do? Should we even remove this method?
- if (isConnected.isOn) {
- ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
+ ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
- uuidsForActorAddress(actorAddress) foreach { uuid ⇒
- EventHandler.debug(this,
- "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
+ uuidsForActorAddress(actorAddress) foreach { uuid ⇒
+ EventHandler.debug(this,
+ "Releasing actor [%s] with UUID [%s] after usage".format(actorAddress, uuid))
- ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
- ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
- }
+ ignore[ZkNoNodeException](zkClient.deleteRecursive(nodeToUuidsPathFor(nodeAddress.nodeName, uuid)))
+ ignore[ZkNoNodeException](zkClient.delete(actorUuidRegistryRemoteAddressPathFor(uuid)))
}
}
@@ -958,19 +944,17 @@ class DefaultClusterNode private[akka] (
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
*/
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
- if (isConnected.isOn) {
- EventHandler.debug(this,
- "Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
+ EventHandler.debug(this,
+ "Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
- val command = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(RELEASE)
- .setActorAddress(actorAddress)
- .build
+ val command = RemoteDaemonMessageProtocol.newBuilder
+ .setMessageType(RELEASE)
+ .setActorAddress(actorAddress)
+ .build
- nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
- }
+ nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒
+ nodeConnections.get.connections(node) foreach {
+ case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
}
}
}
@@ -978,7 +962,7 @@ class DefaultClusterNode private[akka] (
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
- def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
+ def ref(actorAddress: String, router: RouterType): ActorRef = {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
@@ -989,8 +973,7 @@ class DefaultClusterNode private[akka] (
case (_, address) ⇒ clusterActorRefs.put(address, actorRef)
}
actorRef.start()
-
- } else throw new ClusterException("Not connected to cluster")
+ }
/**
* Returns the UUIDs of all actors checked out on this node.
@@ -1005,9 +988,8 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
- private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForClusteredActors: Array[UUID] =
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
- } else Array.empty[UUID]
/**
* Returns the addresses of all actors registered in this cluster.
@@ -1017,13 +999,13 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor id for the actor with a specific UUID.
*/
- private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
+ private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = {
try {
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
} catch {
case e: ZkNoNodeException ⇒ None
}
- } else None
+ }
/**
* Returns the actor ids for all the actors with a specific UUID.
@@ -1034,7 +1016,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor UUIDs for actor ID.
*/
- private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = {
try {
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
case c: CharSequence ⇒ new UUID(c)
@@ -1042,23 +1024,23 @@ class DefaultClusterNode private[akka] (
} catch {
case e: ZkNoNodeException ⇒ Array[UUID]()
}
- } else Array.empty[UUID]
+ }
/**
* Returns the node names of all actors in use with UUID.
*/
- private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
+ private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = {
try {
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
} catch {
case e: ZkNoNodeException ⇒ Array[String]()
}
- } else Array.empty[String]
+ }
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
- private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = {
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
case c: CharSequence ⇒ new UUID(c)
@@ -1066,12 +1048,12 @@ class DefaultClusterNode private[akka] (
} catch {
case e: ZkNoNodeException ⇒ Array[UUID]()
}
- } else Array.empty[UUID]
+ }
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
- def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
+ def addressesForActorsInUseOnNode(nodeName: String): Array[String] = {
val uuids =
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
@@ -1081,7 +1063,7 @@ class DefaultClusterNode private[akka] (
case e: ZkNoNodeException ⇒ Array[UUID]()
}
actorAddressForUuids(uuids)
- } else Array.empty[String]
+ }
/**
* Returns Serializer for actor with specific address.
@@ -1248,18 +1230,26 @@ class DefaultClusterNode private[akka] (
if (async) {
connection ! command
} else {
- (connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
+ try {
+ (connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
- case Some(Success) ⇒
- EventHandler.debug(this, "Replica for [%s] successfully created".format(connection.address))
+ case Some(Success(status)) ⇒
+ EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status))
- case Some(Failure(cause)) ⇒
- EventHandler.error(cause, this, cause.toString)
- throw cause
+ case Some(Failure(cause)) ⇒
+ EventHandler.error(cause, this, cause.toString)
+ throw cause
- case None ⇒
+ case None ⇒
+ val error = new ClusterException(
+ "Remote command to [%s] timed out".format(connection.address))
+ EventHandler.error(error, this, error.toString)
+ throw error
+ }
+ } catch {
+ case e: Exception ⇒
val error = new ClusterException(
- "Operation to instantiate replicas throughout the cluster timed out")
+ "Remote command to [%s] timed out".format(connection.address))
EventHandler.error(error, this, error.toString)
throw error
}
@@ -1302,7 +1292,7 @@ class DefaultClusterNode private[akka] (
*/
private def nodesForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[String] = {
var replicaNames = Set.empty[String]
- val nrOfClusterNodes = nodeConnections.size
+ val nrOfClusterNodes = nodeConnections.get.connections.size
if (replicationFactor < 1) return replicaNames
if (nrOfClusterNodes < replicationFactor) throw new IllegalArgumentException(
@@ -1322,7 +1312,7 @@ class DefaultClusterNode private[akka] (
for {
nodeName ← preferredNodes
- key ← nodeConnections.keys
+ key ← nodeConnections.get.connections.keys
if key == nodeName
} replicaNames = replicaNames + nodeName
@@ -1352,56 +1342,73 @@ class DefaultClusterNode private[akka] (
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[ActorRef] = {
for {
node ← nodesForReplicationFactor(replicationFactor, actorAddress)
- connectionOption ← nodeConnections.get(node)
+ connectionOption ← nodeConnections.get.connections(node)
connection ← connectionOption
actorRef ← connection._2
} yield actorRef
}
- private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
-
/**
* Update the list of connections to other nodes in the cluster.
*
* @returns a Map with the remote socket addresses to of disconnected node connections
*/
- private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
+ @tailrec
+ final private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
+ var change = false
+ val oldState = nodeConnections.get
+
+ var newConnections = oldState.connections //Map.empty[String, Tuple2[InetSocketAddress, ActorRef]]
+
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
newlyDisconnectedMembershipNodes foreach { node ⇒
- disconnectedConnections += (node -> (nodeConnections(node) match {
+ disconnectedConnections = disconnectedConnections + (node -> (oldState.connections(node) match {
case (address, _) ⇒ address
}))
}
- if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
- try {
- // remove connections to failed nodes
- newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
+ // remove connections to failed nodes
+ newlyDisconnectedMembershipNodes foreach { node ⇒
+ newConnections = newConnections - node
+ change = true
+ }
- // add connections newly arrived nodes
- newlyConnectedMembershipNodes foreach { node ⇒
- if (!nodeConnections.contains(node)) {
- // only connect to each replica once
+ // add connections newly arrived nodes
+ newlyConnectedMembershipNodes foreach { node ⇒
+ if (!newConnections.contains(node)) {
- remoteSocketAddressForNode(node) foreach { address ⇒
- EventHandler.debug(this,
- "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
+ // only connect to each replica once
+ remoteSocketAddressForNode(node) foreach { address ⇒
+ EventHandler.debug(this, "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
- val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
- nodeConnections.put(node, (address, clusterDaemon))
- }
- }
+ val clusterDaemon = remoteService.actorFor(
+ RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
+ newConnections = newConnections + (node -> (address, clusterDaemon))
+ change = true
}
- } finally {
- connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
}
}
- disconnectedConnections
+ // add the remote connection to 'this' node as well, but as a 'local' actor
+ if (includeRefNodeInReplicaSet)
+ newConnections = newConnections + (nodeAddress.nodeName -> (remoteServerAddress, remoteDaemon))
+
+ //there was a state change, so we are now going to update the state.
+ val newState = new VersionedConnectionState(oldState.version + 1, newConnections)
+
+ if (!nodeConnections.compareAndSet(oldState, newState)) {
+ // we failed to set the state, try again
+ connectToAllNewlyArrivedMembershipNodesInCluster(
+ newlyConnectedMembershipNodes, newlyDisconnectedMembershipNodes)
+ } else {
+ // we succeeded to set the state, return
+ EventHandler.info(this, "Connected to nodes [\n\t%s]".format(newConnections.mkString("\n\t")))
+ disconnectedConnections
+ }
}
private[cluster] def joinCluster() {
@@ -1437,7 +1444,7 @@ class DefaultClusterNode private[akka] (
}
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
- EventHandler.info(this, "failOverClusterActorRef from %s to %s".format(from, to))
+ EventHandler.info(this, "Failing over ClusterActorRef from %s to %s".format(from, to))
clusterActorRefs.values(from) foreach (_.failOver(from, to))
}
@@ -1516,7 +1523,7 @@ class DefaultClusterNode private[akka] (
.build
// FIXME now we are broadcasting to ALL nodes in the cluster even though a fraction might have a reference to the actors - should that be fixed?
- nodeConnections.values foreach {
+ nodeConnections.get.connections.values foreach {
case (_, connection) ⇒ sendCommandToNode(connection, command, async = true)
}
}
@@ -1589,8 +1596,6 @@ class DefaultClusterNode private[akka] (
private def createMBean = {
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
- override def start() = self.start()
-
override def stop() = self.shutdown()
override def disconnect() = self.disconnect()
@@ -1599,8 +1604,6 @@ class DefaultClusterNode private[akka] (
override def resign() = self.resign()
- override def isConnected = self.isConnected.isOn
-
override def getNodeAddres = self.nodeAddress
override def getRemoteServerHostname = self.hostname
@@ -1789,7 +1792,6 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
message.getMessageType match {
case USE ⇒ handleUse(message)
case RELEASE ⇒ handleRelease(message)
- case START ⇒ cluster.start()
case STOP ⇒ cluster.shutdown()
case DISCONNECT ⇒ cluster.disconnect()
case RECONNECT ⇒ cluster.reconnect()
@@ -1916,7 +1918,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
- self.reply(Success)
+ self.reply(Success(cluster.remoteServerAddress.toString))
} catch {
case error: Throwable ⇒
self.reply(Failure(error))
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index 918ec92fe5..05283de2c2 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -70,7 +70,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def start(): this.type = synchronized[this.type] {
if (_status == ActorRefInternals.UNSTARTED) {
_status = ActorRefInternals.RUNNING
- //TODO add this? Actor.registry.register(this)
+ //Actor.registry.register(this)
}
this
}
@@ -78,7 +78,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
- //TODO add this? Actor.registry.unregister(this)
+ //Actor.registry.unregister(this)
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
@@ -104,7 +104,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
(s.version, s.connections.values)
}
- def size(): Int = state.get().connections.size
+ def size: Int = state.get().connections.size
def stopAll() {
state.get().connections.values foreach (_.stop()) // shut down all remote connections
@@ -119,7 +119,7 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
val newMap = oldState.connections map {
case (`from`, actorRef) ⇒
change = true
- actorRef.stop()
+ // actorRef.stop()
(to, createRemoteActorRef(actorRef.address, to))
case other ⇒ other
}
@@ -156,8 +156,6 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine
}
}
- class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef])
-
+ case class State(val version: Long, val connections: Map[InetSocketAddress, ActorRef])
}
-
}
diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index 43331d1022..c67e627191 100644
--- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -239,8 +239,10 @@ abstract class RemoteClient private[akka] (
None
} else {
- val futureResult = if (senderFuture.isDefined) senderFuture.get
- else new DefaultPromise[T](request.getActorInfo.getTimeout)
+ val futureResult =
+ if (senderFuture.isDefined) senderFuture.get
+ else new DefaultPromise[T](request.getActorInfo.getTimeout)
+
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
@@ -347,12 +349,20 @@ class ActiveRemoteClient private[akka] (
EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress))
// Wait until the connection attempt succeeds or fails.
- connection = bootstrap.connect(remoteAddress)
+
+ try {
+ connection = bootstrap.connect(remoteAddress)
+ } catch {
+ case e: Exception ⇒
+ EventHandler.error(e, this, "Remote client failed to connect to [%s]".format(remoteAddress))
+ throw e
+ }
+
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
- EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this)
+ EventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress))
false
} else {
//Send cookie
@@ -389,7 +399,7 @@ class ActiveRemoteClient private[akka] (
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
- EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress), this)
+ EventHandler.error(connection.getCause, this, "Reconnection to [%s] has failed".format(remoteAddress))
false
} else {
@@ -489,6 +499,7 @@ class ActiveRemoteClientHandler(
client.module.shutdownClientConnection(remoteAddress)
}
}
+
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
@@ -507,6 +518,7 @@ class ActiveRemoteClientHandler(
future.completeWithException(parseException(reply, client.loader))
}
}
+
case other ⇒
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
}
@@ -554,7 +566,7 @@ class ActiveRemoteClientHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
if (event.getCause ne null)
- EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this)
+ EventHandler.error(event.getCause, this, "Unexpected exception from downstream in remote client")
else
EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event))
@@ -563,7 +575,7 @@ class ActiveRemoteClientHandler(
spawn {
client.module.shutdownClientConnection(remoteAddress)
}
- case e ⇒
+ case e: Exception ⇒
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
}
@@ -573,13 +585,14 @@ class ActiveRemoteClientHandler(
val exception = reply.getException
val classname = exception.getClassname
try {
- val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
- else Class.forName(classname)
+ val exceptionClass =
+ if (loader.isDefined) loader.get.loadClass(classname)
+ else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
- case problem: Throwable ⇒
+ case problem: Exception ⇒
EventHandler.error(problem, this, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
@@ -661,7 +674,6 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
-
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
@@ -717,7 +729,6 @@ trait NettyRemoteServerModule extends RemoteServerModule {
_isRunning switchOff {
currentServer.getAndSet(None) foreach { instance ⇒
EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
-
instance.shutdown()
}
}
@@ -920,7 +931,7 @@ class RemoteServerHandler(
try {
actor ! PoisonPill
} catch {
- case e: Exception ⇒ EventHandler.error(e, "Couldn't stop %s".format(actor), this)
+ case e: Exception ⇒ EventHandler.error(e, this, "Couldn't stop %s".format(actor))
}
}
@@ -946,7 +957,7 @@ class RemoteServerHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
- EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this)
+ EventHandler.error(event.getCause, this, "Unexpected exception from remote downstream")
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala
index 0d2b078d11..e805c9156e 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala
@@ -12,7 +12,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
def testNodes: Int
override def beforeAll() = {
- Cluster.startLocalCluster()
+ LocalCluster.startLocalCluster()
onReady()
ClusterTestNode.ready(getClass.getName)
}
@@ -23,7 +23,7 @@ trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAft
ClusterTestNode.waitForExits(getClass.getName, testNodes - 1)
ClusterTestNode.cleanUp(getClass.getName)
onShutdown()
- Cluster.shutdownLocalCluster()
+ LocalCluster.shutdownLocalCluster()
}
def onShutdown() = {}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf
index 2f642a20f0..762f32d92a 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode1.conf
@@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
+akka.event-handler-level = "DEBUG"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf
index 2f642a20f0..762f32d92a 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmNode2.conf
@@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
+akka.event-handler-level = "DEBUG"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmSpec.scala
index 43f128e4d9..20b7c538da 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/newleader/NewLeaderChangeListenerMultiJvmSpec.scala
@@ -34,7 +34,7 @@ class NewLeaderChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
})
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node2", NrOfNodes) {
@@ -57,7 +57,7 @@ class NewLeaderChangeListenerMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
node.shutdown()
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmSpec.scala
index 3ba61bbd26..d14c56fcc4 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodeconnected/NodeConnectedChangeListenerMultiJvmSpec.scala
@@ -34,7 +34,7 @@ class NodeConnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode {
})
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node2", NrOfNodes) {
@@ -55,7 +55,7 @@ class NodeConnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
barrier("start-node1", NrOfNodes) {}
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
node.shutdown()
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmSpec.scala
index 8eb7a272f7..667a4e3972 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/changelisteners/nodedisconnected/NodeDisconnectedChangeListenerMultiJvmSpec.scala
@@ -34,7 +34,7 @@ class NodeDisconnectedChangeListenerMultiJvmNode1 extends MasterClusterTestNode
})
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node2", NrOfNodes) {
@@ -57,7 +57,7 @@ class NodeDisconnectedChangeListenerMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
node.shutdown()
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmSpec.scala
index 314562be4d..d20bd286e3 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/configuration/ConfigurationStorageMultiJvmSpec.scala
@@ -25,7 +25,7 @@ class ConfigurationStorageMultiJvmNode1 extends MasterClusterTestNode {
"be able to store, read and remove custom configuration data" in {
barrier("start-node-1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@@ -65,7 +65,7 @@ class ConfigurationStorageMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("store-config-data-node-1", NrOfNodes) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala
index 28aaaf656a..a652ea023a 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/leader/election/LeaderElectionMultiJvmSpec.scala
@@ -28,7 +28,7 @@ class LeaderElectionMultiJvmNode1 extends MasterClusterTestNode {
"be able to elect a single leader in the cluster and perform re-election if leader resigns" in {
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
node.isLeader must be === true
@@ -55,7 +55,7 @@ class LeaderElectionMultiJvmNode2 extends ClusterTestNode {
node.isLeader must be === false
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
node.isLeader must be === false
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala
index 349626445c..c80e0fe492 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala
@@ -50,7 +50,7 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode {
"be able to store an ActorRef in the cluster without a replication strategy and retrieve it with 'use'" in {
barrier("start-node-1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@@ -86,7 +86,7 @@ class RegistryStoreMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("store-1-in-node-1", NrOfNodes) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmSpec.scala
index f90693baad..49ab4c5d48 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/deployment/DeploymentMultiJvmSpec.scala
@@ -27,7 +27,7 @@ class DeploymentMultiJvmNode1 extends MasterClusterTestNode {
"be able to deploy deployments in akka.conf and lookup the deployments by 'address'" in {
barrier("start-node-1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("start-node-2", NrOfNodes) {
@@ -56,7 +56,7 @@ class DeploymentMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node-2", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("perform-deployment-on-node-1", NrOfNodes) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
index 68e49a51fb..b60bee0c45 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
@@ -33,7 +33,7 @@ class MigrationAutomaticMultiJvmNode1 extends ClusterTestNode {
"be able to migrate an actor from one node to another" in {
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@@ -67,7 +67,7 @@ class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode {
}
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
Thread.sleep(2000) // wait for fail-over from node1 to node2
@@ -75,7 +75,8 @@ class MigrationAutomaticMultiJvmNode2 extends ClusterTestNode {
barrier("check-fail-over-to-node2", NrOfNodes - 1) {
// both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true)
- val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ val actorRef = Actor.registry.local.actorFor("hello-world")
+ .getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? "Hello").as[String].get must be("World from node [node2]")
}
@@ -110,7 +111,7 @@ class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode {
}
barrier("start-node3", NrOfNodes - 1) {
- node.start()
+ Cluster.node
}
Thread.sleep(2000) // wait for fail-over from node2 to node3
@@ -118,7 +119,8 @@ class MigrationAutomaticMultiJvmNode3 extends MasterClusterTestNode {
barrier("check-fail-over-to-node3", NrOfNodes - 2) {
// both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true)
- val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ val actorRef = Actor.registry.local.actorFor("hello-world")
+ .getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? "Hello").as[String].get must be("World from node [node3]")
}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala
index a88c29694f..2435b9a788 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala
@@ -39,7 +39,7 @@
* "be able to migrate an actor from one node to another" in {
*
* barrier("start-node-1", NrOfNodes) {
- * node.start()
+ * Cluster.node
* }
*
* barrier("start-node-2", NrOfNodes) {
@@ -80,7 +80,7 @@
* }
*
* barrier("start-node-2", NrOfNodes) {
- * node.start()
+ * Cluster.node
* }
*
* barrier("store-1-in-node-1", NrOfNodes) {
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
index 65bd1e0e07..426599fede 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode1.conf
@@ -1,6 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handlers = ["akka.testkit.TestEventListener"]
-akka.event-handler-level = "ERROR"
+akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
index 518aed1cd0..bfef5ab2c0 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode2.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "ERROR"
+akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
index 882d9cb7db..18abff13ce 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmNode3.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "ERROR"
+akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.clustered.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.clustered.replication-factor = 2
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
index e8ed5f2992..4eda57acbf 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala
@@ -41,7 +41,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
"ClusterActorRef" must {
"cleanup itself" in {
- node.start
+ Cluster.node
barrier("awaitStarted", NrOfNodes).await()
val ref = Actor.actorOf[ClusterActorRefCleanupMultiJvmSpec.TestActor]("service-test")
@@ -127,7 +127,7 @@ class ClusterActorRefCleanupMultiJvmNode2 extends ClusterTestNode {
}
})
- node.start()
+ Cluster.node
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()
@@ -151,7 +151,7 @@ class ClusterActorRefCleanupMultiJvmNode3 extends ClusterTestNode {
}
})
- node.start()
+ Cluster.node
barrier("awaitStarted", NrOfNodes).await()
barrier("finished", NrOfNodes).await()
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
index 7e6054a1f8..370167dcad 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "ERROR"
+akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
index 1138cb7f46..2916577457 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
@@ -37,7 +37,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends Cluste
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@@ -68,7 +68,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends Master
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf
index a118d2fcf2..97a0fb3687 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "ERROR"
+akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala
index cdcd6cf3e2..1b18dac466 100644
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala
@@ -39,7 +39,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterT
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
- node.start()
+ Cluster.node
}
barrier("create-actor-on-node1", NrOfNodes) {
@@ -89,7 +89,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterCl
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
- node.start()
+ Cluster.node
}
Thread.sleep(5000) // wait for fail-over from node1 to node2
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
deleted file mode 100644
index 74957902ed..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
+++ /dev/null
@@ -1,6 +0,0 @@
-akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
-akka.actor.deployment.hello-world.router = "direct"
-akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
-akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
-akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
deleted file mode 100644
index a88c260d8c..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
+++ /dev/null
@@ -1 +0,0 @@
--Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
deleted file mode 100644
index 74957902ed..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
+++ /dev/null
@@ -1,6 +0,0 @@
-akka.enabled-modules = ["cluster"]
-akka.event-handler-level = "WARNING"
-akka.actor.deployment.hello-world.router = "direct"
-akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
-akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
-akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
deleted file mode 100644
index f1e01f253d..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
+++ /dev/null
@@ -1 +0,0 @@
--Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
deleted file mode 100644
index d408f59df7..0000000000
--- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (C) 2009-2011 Typesafe Inc.