diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
index 7149c6c984..5fbf2dceaa 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
@@ -18,7 +18,6 @@ class DeployerSpec extends WordSpec with MustMatchers {
Deploy(
"service-ping",
LeastCPU,
- "akka.serialization.Format$Default$",
Clustered(
Vector(Node("node1")),
Replicate(3),
diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
index 3544767453..5f4ceedabb 100644
--- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala
@@ -119,7 +119,7 @@ class RoutingSpec extends WordSpec with MustMatchers {
for (i ← 1 to 500) d ! i
try {
- latch.await(10 seconds)
+ latch.await(20 seconds)
} finally {
// because t1 is much slower and thus has a bigger mailbox all the time
t1Count.get must be < (t2Count.get)
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 0f51f11080..4a9b07a222 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -326,7 +326,7 @@ object Actor extends ListenerManagement {
*
*/
def actorOf[T <: Actor](creator: ⇒ T, address: String): ActorRef = {
- createActor(address, () ⇒ new LocalActorRef(() ⇒ creator, address, Transient))
+ createActor(address, () ⇒ new LocalActorRef(() ⇒ creator, address))
}
/**
@@ -349,7 +349,7 @@ object Actor extends ListenerManagement {
* JAVA API
*/
def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = {
- createActor(address, () ⇒ new LocalActorRef(() ⇒ creator.create, address, Transient))
+ createActor(address, () ⇒ new LocalActorRef(() ⇒ creator.create, address))
}
def localActorOf[T <: Actor: Manifest]: ActorRef = {
@@ -369,11 +369,11 @@ object Actor extends ListenerManagement {
}
def localActorOf[T <: Actor](factory: ⇒ T): ActorRef = {
- new LocalActorRef(() ⇒ factory, new UUID().toString, Transient)
+ new LocalActorRef(() ⇒ factory, new UUID().toString)
}
def localActorOf[T <: Actor](factory: ⇒ T, address: String): ActorRef = {
- new LocalActorRef(() ⇒ factory, address, Transient)
+ new LocalActorRef(() ⇒ factory, address)
}
/**
@@ -410,12 +410,12 @@ object Actor extends ListenerManagement {
private[akka] def createActor(address: String, actorFactory: () ⇒ ActorRef): ActorRef = {
Address.validate(address)
registry.actorFor(address) match { // check if the actor for the address is already in the registry
- case Some(actorRef) ⇒ actorRef // it is -> return it
- case None ⇒ // it is not -> create it
+ case Some(actorRef) ⇒ actorRef // it is -> return it
+ case None ⇒ // it is not -> create it
try {
Deployer.deploymentFor(address) match {
- case Deploy(_, router, _, Local) ⇒ actorFactory() // create a local actor
- case deploy ⇒ newClusterActorRef(actorFactory, address, deploy)
+ case Deploy(_, router, Local) ⇒ actorFactory() // create a local actor
+ case deploy ⇒ newClusterActorRef(actorFactory, address, deploy)
}
} catch {
case e: DeploymentException ⇒
@@ -438,17 +438,17 @@ object Actor extends ListenerManagement {
throw new ActorInitializationException(
"Could not instantiate Actor of " + clazz +
- "\nMake sure Actor is NOT defined inside a class/trait," +
- "\nif so put it outside the class/trait, f.e. in a companion object," +
- "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause)
+ "\nMake sure Actor is NOT defined inside a class/trait," +
+ "\nif so put it outside the class/trait, f.e. in a companion object," +
+ "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", cause)
}
- }, address, Transient)
+ }, address)
}
private def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = {
deploy match {
case Deploy(
- configAdress, router, serializerClassName,
+ configAdress, router,
Clustered(
preferredHomeNodes,
replicas,
@@ -461,14 +461,11 @@ object Actor extends ListenerManagement {
if (!Actor.remote.isRunning) throw new IllegalStateException(
"Remote server is not running")
- val isHomeNode = preferredHomeNodes exists (home ⇒ DeploymentConfig.isHomeNode(home))
+ val isHomeNode = DeploymentConfig.isHomeNode(preferredHomeNodes)
val nrOfReplicas = DeploymentConfig.replicaValueFor(replicas)
- def serializerErrorDueTo(reason: String) =
- throw new akka.config.ConfigurationException(
- "Could not create Serializer object [" + serializerClassName +
- "] for serialization of actor [" + address +
- "] since " + reason)
+ def serializerErrorDueTo(reason: String) = throw new akka.config.ConfigurationException(
+ "Could not create Serializer for actor [" + address + "] due to: " + reason)
val serializer: Serializer =
Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
@@ -487,13 +484,16 @@ object Actor extends ListenerManagement {
storeActorAndGetClusterRef(Transient, serializer)
case replication: Replication ⇒
+ if (DeploymentConfig.routerTypeFor(router) != akka.routing.RouterType.Direct) throw new ConfigurationException(
+ "Can't replicate an actor [" + address + "] configured with another router than \"direct\" - found [" + router + "]")
+
if (isHomeNode) { // stateful actor's home node
cluster
.use(address, serializer)
.getOrElse(throw new ConfigurationException(
"Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
+
} else {
- // FIXME later manage different 'storage' (data grid) as well
storeActorAndGetClusterRef(replication, serializer)
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 6550a13a7e..0fea09723b 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -9,10 +9,10 @@ import akka.dispatch._
import akka.config._
import akka.config.Supervision._
import akka.util._
-import akka.serialization.{ Format, Serializer }
+import akka.serialization.{ Format, Serializer, Serialization }
import ReflectiveAccess._
import ClusterModule._
-import DeploymentConfig.{ ReplicationScheme, Replication, Transient, WriteThrough, WriteBehind }
+import DeploymentConfig.{ TransactionLog ⇒ TransactionLogConfig, _ }
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
@@ -416,10 +416,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*
* @author Jonas Bonér
*/
-class LocalActorRef private[akka] (
- private[this] val actorFactory: () ⇒ Actor,
- val address: String,
- replicationScheme: ReplicationScheme)
+class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, val address: String)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@@ -447,49 +444,38 @@ class LocalActorRef private[akka] (
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
- private val isReplicated: Boolean = replicationScheme match {
- case _: Transient | Transient ⇒ false
- case _ ⇒ true
- }
-
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
"Could not create Serializer object [" + this.getClass.getName +
"]")
private val serializer: Serializer =
- akka.serialization.Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
+ Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
+
+ private lazy val replicationScheme: ReplicationScheme =
+ DeploymentConfig.replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
+
+ private lazy val isReplicated: Boolean = DeploymentConfig.isReplicated(replicationScheme)
+
+ private lazy val isWriteBehindReplication: Boolean = DeploymentConfig.isWriteBehindReplication(replicationScheme)
private lazy val replicationStorage: Either[TransactionLog, AnyRef] = {
- replicationScheme match {
- case _: Transient | Transient ⇒
- throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]")
+ if (DeploymentConfig.isReplicatedWithTransactionLog(replicationScheme)) {
+ EventHandler.debug(this,
+ "Creating a transaction log for Actor [%s] with replication strategy [%s]"
+ .format(address, replicationScheme))
- case Replication(storage, strategy) ⇒
- val isWriteBehind = strategy match {
- case _: WriteBehind | WriteBehind ⇒ true
- case _: WriteThrough | WriteThrough ⇒ false
- }
+ Left(transactionLog.newLogFor(_uuid.toString, isWriteBehindReplication, replicationScheme))
- storage match {
- case _: DeploymentConfig.TransactionLog | DeploymentConfig.TransactionLog ⇒
- EventHandler.debug(this,
- "Creating a transaction log for Actor [%s] with replication strategy [%s]"
- .format(address, replicationScheme))
- // Left(transactionLog.newLogFor(_uuid.toString, isWriteBehind, replicationScheme, serializer))
- // to fix null
- Left(transactionLog.newLogFor(_uuid.toString, isWriteBehind, replicationScheme, null))
+ } else if (DeploymentConfig.isReplicatedWithDataGrid(replicationScheme)) {
+ throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported")
- case _: DeploymentConfig.DataGrid | DeploymentConfig.DataGrid ⇒
- throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported")
-
- case unknown ⇒
- throw new ConfigurationException("Unknown replication storage type [" + unknown + "]")
- }
+ } else {
+ throw new ConfigurationException("Unknown replication storage type [" + replicationScheme + "]")
}
}
- //If it was started inside "newActor", initialize it
+ // If it was started inside "newActor", initialize it
if (isRunning) initializeActorInstance
// used only for deserialization
@@ -501,10 +487,9 @@ class LocalActorRef private[akka] (
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
- __factory: () ⇒ Actor,
- __replicationStrategy: ReplicationScheme) = {
+ __factory: () ⇒ Actor) = {
- this(__factory, __address, __replicationStrategy)
+ this(__factory, __address)
_uuid = __uuid
timeout = __timeout
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 6a202ab572..f681e8ab50 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -16,172 +16,6 @@ import akka.util.ReflectiveAccess._
import akka.serialization._
import akka.AkkaException
-/**
- * Module holding the programmatic deployment configuration classes.
- * Defines the deployment specification.
- * Most values have defaults and can be left out.
- *
- * @author Jonas Bonér
- */
-object DeploymentConfig {
-
- // --------------------------------
- // --- Deploy
- // --------------------------------
- case class Deploy(
- address: String,
- routing: Routing = Direct,
- format: String = Serializer.defaultSerializerName,
- scope: Scope = Local)
-
- // --------------------------------
- // --- Routing
- // --------------------------------
- sealed trait Routing
- case class CustomRouter(router: AnyRef) extends Routing
-
- // For Java API
- case class Direct() extends Routing
- case class RoundRobin() extends Routing
- case class Random() extends Routing
- case class LeastCPU() extends Routing
- case class LeastRAM() extends Routing
- case class LeastMessages() extends Routing
-
- // For Scala API
- case object Direct extends Routing
- case object RoundRobin extends Routing
- case object Random extends Routing
- case object LeastCPU extends Routing
- case object LeastRAM extends Routing
- case object LeastMessages extends Routing
-
- // --------------------------------
- // --- Scope
- // --------------------------------
- sealed trait Scope
- case class Clustered(
- preferredNodes: Iterable[Home] = Vector(Host("localhost")),
- replicas: Replicas = NoReplicas,
- replication: ReplicationScheme = Transient) extends Scope
-
- // For Java API
- case class Local() extends Scope
-
- // For Scala API
- case object Local extends Scope
-
- // --------------------------------
- // --- Home
- // --------------------------------
- sealed trait Home
- case class Host(hostName: String) extends Home
- case class Node(nodeName: String) extends Home
- case class IP(ipAddress: String) extends Home
-
- // --------------------------------
- // --- Replicas
- // --------------------------------
- sealed trait Replicas
- case class Replicate(factor: Int) extends Replicas {
- if (factor < 1) throw new IllegalArgumentException("Replicas factor can not be negative or zero")
- }
-
- // For Java API
- case class AutoReplicate() extends Replicas
- case class NoReplicas() extends Replicas
-
- // For Scala API
- case object AutoReplicate extends Replicas
- case object NoReplicas extends Replicas
-
- // --------------------------------
- // --- Replication
- // --------------------------------
- sealed trait ReplicationScheme
-
- // For Java API
- case class Transient() extends ReplicationScheme
-
- // For Scala API
- case object Transient extends ReplicationScheme
- case class Replication(
- storage: ReplicationStorage,
- strategy: ReplicationStrategy) extends ReplicationScheme
-
- // --------------------------------
- // --- ReplicationStorage
- // --------------------------------
- sealed trait ReplicationStorage
-
- // For Java API
- case class TransactionLog() extends ReplicationStorage
- case class DataGrid() extends ReplicationStorage
-
- // For Scala API
- case object TransactionLog extends ReplicationStorage
- case object DataGrid extends ReplicationStorage
-
- // --------------------------------
- // --- ReplicationStrategy
- // --------------------------------
- sealed trait ReplicationStrategy
-
- // For Java API
- case class WriteBehind() extends ReplicationStrategy
- case class WriteThrough() extends ReplicationStrategy
-
- // For Scala API
- case object WriteBehind extends ReplicationStrategy
- case object WriteThrough extends ReplicationStrategy
-
- // --------------------------------
- // --- Helper methods for parsing
- // --------------------------------
-
- def nodeNameFor(home: Home): String = {
- home match {
- case Node(nodename) ⇒ nodename
- case Host("localhost") ⇒ Config.nodename
- case IP("0.0.0.0") ⇒ Config.nodename
- case IP("127.0.0.1") ⇒ Config.nodename
- case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
- case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
- }
- }
-
- def isHomeNode(home: Home): Boolean = nodeNameFor(home) == Config.nodename
-
- def replicaValueFor(replicas: Replicas): Int = replicas match {
- case Replicate(replicas) ⇒ replicas
- case AutoReplicate ⇒ -1
- case AutoReplicate() ⇒ -1
- case NoReplicas ⇒ 0
- case NoReplicas() ⇒ 0
- }
-
- def routerTypeFor(routing: Routing): RouterType = routing match {
- case Direct ⇒ RouterType.Direct
- case Direct() ⇒ RouterType.Direct
- case RoundRobin ⇒ RouterType.RoundRobin
- case RoundRobin() ⇒ RouterType.RoundRobin
- case Random ⇒ RouterType.Random
- case Random() ⇒ RouterType.Random
- case LeastCPU ⇒ RouterType.LeastCPU
- case LeastCPU() ⇒ RouterType.LeastCPU
- case LeastRAM ⇒ RouterType.LeastRAM
- case LeastRAM() ⇒ RouterType.LeastRAM
- case LeastMessages ⇒ RouterType.LeastMessages
- case LeastMessages() ⇒ RouterType.LeastMessages
- case c: CustomRouter ⇒ throw new UnsupportedOperationException("Unknown Router [" + c + "]")
- }
-
- def isReplicationAsync(strategy: ReplicationStrategy): Boolean = strategy match {
- case _: WriteBehind | WriteBehind ⇒ true
- case _: WriteThrough | WriteThrough ⇒ false
- }
-}
-
/**
* Deployer maps actor deployments to actor addresses.
*
@@ -230,8 +64,8 @@ object Deployer {
}
def isLocal(deployment: Deploy): Boolean = deployment match {
- case Deploy(_, _, _, Local) ⇒ true
- case _ ⇒ false
+ case Deploy(_, _, Local) ⇒ true
+ case _ ⇒ false
}
def isClustered(deployment: Deploy): Boolean = isLocal(deployment)
@@ -306,7 +140,7 @@ object Deployer {
// --------------------------------
val addressPath = "akka.actor.deployment." + address
Config.config.getSection(addressPath) match {
- case None ⇒ Some(Deploy(address, Direct, Serializer.defaultSerializerName, Local))
+ case None ⇒ Some(Deploy(address, Direct, Local))
case Some(addressConfig) ⇒
// --------------------------------
@@ -330,17 +164,12 @@ object Deployer {
CustomRouter(customRouter)
}
- // --------------------------------
- // akka.actor.deployment.
.format
- // --------------------------------
- val format = addressConfig.getString("format", Serializer.defaultSerializerName)
-
// --------------------------------
// akka.actor.deployment..clustered
// --------------------------------
addressConfig.getSection("clustered") match {
case None ⇒
- Some(Deploy(address, router, Serializer.defaultSerializerName, Local)) // deploy locally
+ Some(Deploy(address, router, Local)) // deploy locally
case Some(clusteredConfig) ⇒
@@ -349,7 +178,7 @@ object Deployer {
// --------------------------------
val preferredNodes = clusteredConfig.getList("preferred-nodes") match {
- case Nil ⇒ Vector(Host("localhost"))
+ case Nil ⇒ Nil
case homes ⇒
def raiseHomeConfigError() = throw new ConfigurationException(
"Config option [" + addressPath +
@@ -375,19 +204,24 @@ object Deployer {
// --------------------------------
// akka.actor.deployment..clustered.replicas
// --------------------------------
- val replicas = clusteredConfig.getAny("replicas", "0") match {
- case "auto" ⇒ AutoReplicate
- case "0" ⇒ NoReplicas
- case nrOfReplicas: String ⇒
- try {
- Replicate(nrOfReplicas.toInt)
- } catch {
- case e: NumberFormatException ⇒
- throw new ConfigurationException(
- "Config option [" + addressPath +
- ".clustered.replicas] needs to be either [\"auto\"] or [0-N] - was [" +
- nrOfReplicas + "]")
+ val replicas = {
+ if (router == Direct) Replicate(1)
+ else {
+ clusteredConfig.getAny("replicas", "0") match {
+ case "auto" ⇒ AutoReplicate
+ case "0" ⇒ NoReplicas
+ case nrOfReplicas: String ⇒
+ try {
+ Replicate(nrOfReplicas.toInt)
+ } catch {
+ case e: NumberFormatException ⇒
+ throw new ConfigurationException(
+ "Config option [" + addressPath +
+ ".clustered.replicas] needs to be either [\"auto\"] or [0-N] - was [" +
+ nrOfReplicas + "]")
+ }
}
+ }
}
// --------------------------------
@@ -395,7 +229,7 @@ object Deployer {
// --------------------------------
clusteredConfig.getSection("replication") match {
case None ⇒
- Some(Deploy(address, router, format, Clustered(preferredNodes, replicas, Transient)))
+ Some(Deploy(address, router, Clustered(preferredNodes, replicas, Transient)))
case Some(replicationConfig) ⇒
val storage = replicationConfig.getString("storage", "transaction-log") match {
@@ -414,7 +248,7 @@ object Deployer {
".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]")
}
- Some(Deploy(address, router, format, Clustered(preferredNodes, replicas, Replication(storage, strategy))))
+ Some(Deploy(address, router, Clustered(preferredNodes, replicas, Replication(storage, strategy))))
}
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
new file mode 100644
index 0000000000..1d4f23e545
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala
@@ -0,0 +1,217 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.actor
+
+import akka.config.Config
+import akka.routing.RouterType
+import akka.serialization.Serializer
+
+/**
+ * Module holding the programmatic deployment configuration classes.
+ * Defines the deployment specification.
+ * Most values have defaults and can be left out.
+ *
+ * @author Jonas Bonér
+ */
+object DeploymentConfig {
+
+ // --------------------------------
+ // --- Deploy
+ // --------------------------------
+ case class Deploy(
+ address: String,
+ routing: Routing = Direct,
+ scope: Scope = Local)
+
+ // --------------------------------
+ // --- Routing
+ // --------------------------------
+ sealed trait Routing
+ case class CustomRouter(router: AnyRef) extends Routing
+
+ // For Java API
+ case class Direct() extends Routing
+ case class RoundRobin() extends Routing
+ case class Random() extends Routing
+ case class LeastCPU() extends Routing
+ case class LeastRAM() extends Routing
+ case class LeastMessages() extends Routing
+
+ // For Scala API
+ case object Direct extends Routing
+ case object RoundRobin extends Routing
+ case object Random extends Routing
+ case object LeastCPU extends Routing
+ case object LeastRAM extends Routing
+ case object LeastMessages extends Routing
+
+ // --------------------------------
+ // --- Scope
+ // --------------------------------
+ sealed trait Scope
+ case class Clustered(
+ preferredNodes: Iterable[Home] = Vector(Host("localhost")),
+ replicas: Replicas = NoReplicas,
+ replication: ReplicationScheme = Transient) extends Scope
+
+ // For Java API
+ case class Local() extends Scope
+
+ // For Scala API
+ case object Local extends Scope
+
+ // --------------------------------
+ // --- Home
+ // --------------------------------
+ sealed trait Home
+ case class Host(hostName: String) extends Home
+ case class Node(nodeName: String) extends Home
+ case class IP(ipAddress: String) extends Home
+
+ // --------------------------------
+ // --- Replicas
+ // --------------------------------
+ sealed trait Replicas
+ case class Replicate(factor: Int) extends Replicas {
+ if (factor < 1) throw new IllegalArgumentException("Replicas factor can not be negative or zero")
+ }
+
+ // For Java API
+ case class AutoReplicate() extends Replicas
+ case class NoReplicas() extends Replicas
+
+ // For Scala API
+ case object AutoReplicate extends Replicas
+ case object NoReplicas extends Replicas
+
+ // --------------------------------
+ // --- Replication
+ // --------------------------------
+ sealed trait ReplicationScheme
+
+ // For Java API
+ case class Transient() extends ReplicationScheme
+
+ // For Scala API
+ case object Transient extends ReplicationScheme
+ case class Replication(
+ storage: ReplicationStorage,
+ strategy: ReplicationStrategy) extends ReplicationScheme
+
+ // --------------------------------
+ // --- ReplicationStorage
+ // --------------------------------
+ sealed trait ReplicationStorage
+
+ // For Java API
+ case class TransactionLog() extends ReplicationStorage
+ case class DataGrid() extends ReplicationStorage
+
+ // For Scala API
+ case object TransactionLog extends ReplicationStorage
+ case object DataGrid extends ReplicationStorage
+
+ // --------------------------------
+ // --- ReplicationStrategy
+ // --------------------------------
+ sealed trait ReplicationStrategy
+
+ // For Java API
+ case class WriteBehind() extends ReplicationStrategy
+ case class WriteThrough() extends ReplicationStrategy
+
+ // For Scala API
+ case object WriteBehind extends ReplicationStrategy
+ case object WriteThrough extends ReplicationStrategy
+
+ // --------------------------------
+ // --- Helper methods for parsing
+ // --------------------------------
+
+ def nodeNameFor(home: Home): String = home match {
+ case Node(nodename) ⇒ nodename
+ case Host("localhost") ⇒ Config.nodename
+ case IP("0.0.0.0") ⇒ Config.nodename
+ case IP("127.0.0.1") ⇒ Config.nodename
+ case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
+ }
+
+ def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == Config.nodename)
+
+ def replicaValueFor(replicas: Replicas): Int = replicas match {
+ case Replicate(replicas) ⇒ replicas
+ case AutoReplicate ⇒ -1
+ case AutoReplicate() ⇒ -1
+ case NoReplicas ⇒ 0
+ case NoReplicas() ⇒ 0
+ }
+
+ def routerTypeFor(routing: Routing): RouterType = routing match {
+ case Direct ⇒ RouterType.Direct
+ case Direct() ⇒ RouterType.Direct
+ case RoundRobin ⇒ RouterType.RoundRobin
+ case RoundRobin() ⇒ RouterType.RoundRobin
+ case Random ⇒ RouterType.Random
+ case Random() ⇒ RouterType.Random
+ case LeastCPU ⇒ RouterType.LeastCPU
+ case LeastCPU() ⇒ RouterType.LeastCPU
+ case LeastRAM ⇒ RouterType.LeastRAM
+ case LeastRAM() ⇒ RouterType.LeastRAM
+ case LeastMessages ⇒ RouterType.LeastMessages
+ case LeastMessages() ⇒ RouterType.LeastMessages
+ case c: CustomRouter ⇒ throw new UnsupportedOperationException("Unknown Router [" + c + "]")
+ }
+
+ def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
+ case Deploy(_, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme)
+ case _ ⇒ None
+ }
+
+ def isReplicated(deployment: Deploy): Boolean = replicationSchemeFor(deployment) match {
+ case Some(replicationScheme) ⇒ isReplicated(replicationScheme)
+ case _ ⇒ false
+ }
+
+ def isReplicated(replicationScheme: ReplicationScheme): Boolean =
+ isReplicatedWithTransactionLog(replicationScheme) ||
+ isReplicatedWithDataGrid(replicationScheme)
+
+ def isWriteBehindReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
+ case _: Transient | Transient ⇒ false
+ case Replication(_, strategy) ⇒
+ strategy match {
+ case _: WriteBehind | WriteBehind ⇒ true
+ case _: WriteThrough | WriteThrough ⇒ false
+ }
+ }
+
+ def isWriteThroughReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
+ case _: Transient | Transient ⇒ false
+ case Replication(_, strategy) ⇒
+ strategy match {
+ case _: WriteBehind | WriteBehind ⇒ true
+ case _: WriteThrough | WriteThrough ⇒ false
+ }
+ }
+
+ def isReplicatedWithTransactionLog(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
+ case _: Transient | Transient ⇒ false
+ case Replication(storage, _) ⇒
+ storage match {
+ case _: TransactionLog | TransactionLog ⇒ true
+ case _: DataGrid | DataGrid ⇒ throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet")
+ }
+ }
+
+ def isReplicatedWithDataGrid(replicationScheme: ReplicationScheme): Boolean = replicationScheme match {
+ case _: Transient | Transient ⇒ false
+ case Replication(storage, _) ⇒
+ storage match {
+ case _: TransactionLog | TransactionLog ⇒ false
+ case _: DataGrid | DataGrid ⇒ throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet")
+ }
+ }
+}
diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
index 346aa06c62..714207458c 100644
--- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
+++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
@@ -316,28 +316,28 @@ trait ClusterNode {
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String): Option[ActorRef]
+ def use[T <: Actor](actorAddress: String): Option[LocalActorRef]
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef]
+ def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef]
/**
* Using (checking out) actor on a specific set of nodes.
*/
- def useActorOnNodes(nodes: Array[String], actorAddress: String)
+ def useActorOnNodes(nodes: Array[String], actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Using (checking out) actor on all nodes in the cluster.
*/
- def useActorOnAllNodes(actorAddress: String)
+ def useActorOnAllNodes(actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Using (checking out) actor on a specific node.
*/
- def useActorOnNode(node: String, actorAddress: String)
+ def useActorOnNode(node: String, actorAddress: String, replicateFromUuid: Option[UUID])
/**
* Checks in an actor after done using it on this node.
@@ -354,16 +354,6 @@ trait ClusterNode {
*/
def ref(actorAddress: String, router: RouterType): ActorRef
- /**
- * Migrate the actor from 'this' node to node 'to'.
- */
- def migrate(to: NodeAddress, actorAddress: String)
-
- /**
- * Migrate the actor from node 'from' to node 'to'.
- */
- def migrate(from: NodeAddress, to: NodeAddress, actorAddress: String)
-
/**
* Returns the addresses of all actors checked out on this node.
*/
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 26bd2ca21e..b1bfe83466 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -112,25 +112,24 @@ object ReflectiveAccess {
def newLogFor(
id: String,
isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer): TransactionLog
+ replicationScheme: ReplicationScheme): TransactionLog
def logFor(
id: String,
isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer): TransactionLog
+ replicationScheme: ReplicationScheme): TransactionLog
def shutdown()
}
type TransactionLog = {
- def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef)
+ def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef)
def recordEntry(entry: Array[Byte])
def recordSnapshot(snapshot: Array[Byte])
def entries: Vector[Array[Byte]]
def entriesFromLatestSnapshot: Tuple2[Array[Byte], Vector[Array[Byte]]]
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]]
+ def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]])
def latestEntryId: Long
def latestSnapshotId: Long
def delete()
diff --git a/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java b/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
index 54ca02a15f..8d18fc319b 100644
--- a/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
+++ b/akka-cluster/src/main/java/akka/cluster/ClusterProtocol.java
@@ -132,6 +132,11 @@ public final class ClusterProtocol {
// optional bytes payload = 5;
boolean hasPayload();
com.google.protobuf.ByteString getPayload();
+
+ // optional .UuidProtocol replicateActorFromUuid = 6;
+ boolean hasReplicateActorFromUuid();
+ akka.cluster.ClusterProtocol.UuidProtocol getReplicateActorFromUuid();
+ akka.cluster.ClusterProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder();
}
public static final class RemoteDaemonMessageProtocol extends
com.google.protobuf.GeneratedMessage
@@ -227,11 +232,25 @@ public final class ClusterProtocol {
return payload_;
}
+ // optional .UuidProtocol replicateActorFromUuid = 6;
+ public static final int REPLICATEACTORFROMUUID_FIELD_NUMBER = 6;
+ private akka.cluster.ClusterProtocol.UuidProtocol replicateActorFromUuid_;
+ public boolean hasReplicateActorFromUuid() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public akka.cluster.ClusterProtocol.UuidProtocol getReplicateActorFromUuid() {
+ return replicateActorFromUuid_;
+ }
+ public akka.cluster.ClusterProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder() {
+ return replicateActorFromUuid_;
+ }
+
private void initFields() {
messageType_ = akka.cluster.ClusterProtocol.RemoteDaemonMessageType.START;
actorUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
actorAddress_ = "";
payload_ = com.google.protobuf.ByteString.EMPTY;
+ replicateActorFromUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -248,6 +267,12 @@ public final class ClusterProtocol {
return false;
}
}
+ if (hasReplicateActorFromUuid()) {
+ if (!getReplicateActorFromUuid().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -267,6 +292,9 @@ public final class ClusterProtocol {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(5, payload_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeMessage(6, replicateActorFromUuid_);
+ }
getUnknownFields().writeTo(output);
}
@@ -292,6 +320,10 @@ public final class ClusterProtocol {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, payload_);
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(6, replicateActorFromUuid_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -402,13 +434,14 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getActorUuidFieldBuilder();
+ getReplicateActorFromUuidFieldBuilder();
}
}
private static Builder create() {
@@ -429,6 +462,12 @@ public final class ClusterProtocol {
bitField0_ = (bitField0_ & ~0x00000004);
payload_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000008);
+ if (replicateActorFromUuidBuilder_ == null) {
+ replicateActorFromUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
+ } else {
+ replicateActorFromUuidBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -487,6 +526,14 @@ public final class ClusterProtocol {
to_bitField0_ |= 0x00000008;
}
result.payload_ = payload_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ if (replicateActorFromUuidBuilder_ == null) {
+ result.replicateActorFromUuid_ = replicateActorFromUuid_;
+ } else {
+ result.replicateActorFromUuid_ = replicateActorFromUuidBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -515,6 +562,9 @@ public final class ClusterProtocol {
if (other.hasPayload()) {
setPayload(other.getPayload());
}
+ if (other.hasReplicateActorFromUuid()) {
+ mergeReplicateActorFromUuid(other.getReplicateActorFromUuid());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -530,6 +580,12 @@ public final class ClusterProtocol {
return false;
}
}
+ if (hasReplicateActorFromUuid()) {
+ if (!getReplicateActorFromUuid().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -586,6 +642,15 @@ public final class ClusterProtocol {
payload_ = input.readBytes();
break;
}
+ case 50: {
+ akka.cluster.ClusterProtocol.UuidProtocol.Builder subBuilder = akka.cluster.ClusterProtocol.UuidProtocol.newBuilder();
+ if (hasReplicateActorFromUuid()) {
+ subBuilder.mergeFrom(getReplicateActorFromUuid());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setReplicateActorFromUuid(subBuilder.buildPartial());
+ break;
+ }
}
}
}
@@ -766,6 +831,96 @@ public final class ClusterProtocol {
return this;
}
+ // optional .UuidProtocol replicateActorFromUuid = 6;
+ private akka.cluster.ClusterProtocol.UuidProtocol replicateActorFromUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ akka.cluster.ClusterProtocol.UuidProtocol, akka.cluster.ClusterProtocol.UuidProtocol.Builder, akka.cluster.ClusterProtocol.UuidProtocolOrBuilder> replicateActorFromUuidBuilder_;
+ public boolean hasReplicateActorFromUuid() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public akka.cluster.ClusterProtocol.UuidProtocol getReplicateActorFromUuid() {
+ if (replicateActorFromUuidBuilder_ == null) {
+ return replicateActorFromUuid_;
+ } else {
+ return replicateActorFromUuidBuilder_.getMessage();
+ }
+ }
+ public Builder setReplicateActorFromUuid(akka.cluster.ClusterProtocol.UuidProtocol value) {
+ if (replicateActorFromUuidBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ replicateActorFromUuid_ = value;
+ onChanged();
+ } else {
+ replicateActorFromUuidBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder setReplicateActorFromUuid(
+ akka.cluster.ClusterProtocol.UuidProtocol.Builder builderForValue) {
+ if (replicateActorFromUuidBuilder_ == null) {
+ replicateActorFromUuid_ = builderForValue.build();
+ onChanged();
+ } else {
+ replicateActorFromUuidBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder mergeReplicateActorFromUuid(akka.cluster.ClusterProtocol.UuidProtocol value) {
+ if (replicateActorFromUuidBuilder_ == null) {
+ if (((bitField0_ & 0x00000010) == 0x00000010) &&
+ replicateActorFromUuid_ != akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance()) {
+ replicateActorFromUuid_ =
+ akka.cluster.ClusterProtocol.UuidProtocol.newBuilder(replicateActorFromUuid_).mergeFrom(value).buildPartial();
+ } else {
+ replicateActorFromUuid_ = value;
+ }
+ onChanged();
+ } else {
+ replicateActorFromUuidBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000010;
+ return this;
+ }
+ public Builder clearReplicateActorFromUuid() {
+ if (replicateActorFromUuidBuilder_ == null) {
+ replicateActorFromUuid_ = akka.cluster.ClusterProtocol.UuidProtocol.getDefaultInstance();
+ onChanged();
+ } else {
+ replicateActorFromUuidBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000010);
+ return this;
+ }
+ public akka.cluster.ClusterProtocol.UuidProtocol.Builder getReplicateActorFromUuidBuilder() {
+ bitField0_ |= 0x00000010;
+ onChanged();
+ return getReplicateActorFromUuidFieldBuilder().getBuilder();
+ }
+ public akka.cluster.ClusterProtocol.UuidProtocolOrBuilder getReplicateActorFromUuidOrBuilder() {
+ if (replicateActorFromUuidBuilder_ != null) {
+ return replicateActorFromUuidBuilder_.getMessageOrBuilder();
+ } else {
+ return replicateActorFromUuid_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ akka.cluster.ClusterProtocol.UuidProtocol, akka.cluster.ClusterProtocol.UuidProtocol.Builder, akka.cluster.ClusterProtocol.UuidProtocolOrBuilder>
+ getReplicateActorFromUuidFieldBuilder() {
+ if (replicateActorFromUuidBuilder_ == null) {
+ replicateActorFromUuidBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ akka.cluster.ClusterProtocol.UuidProtocol, akka.cluster.ClusterProtocol.UuidProtocol.Builder, akka.cluster.ClusterProtocol.UuidProtocolOrBuilder>(
+ replicateActorFromUuid_,
+ getParentForChildren(),
+ isClean());
+ replicateActorFromUuid_ = null;
+ }
+ return replicateActorFromUuidBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:RemoteDaemonMessageProtocol)
}
@@ -1092,7 +1247,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@@ -1694,7 +1849,7 @@ public final class ClusterProtocol {
maybeForceBuilderInitialization();
}
- private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@@ -1912,23 +2067,24 @@ public final class ClusterProtocol {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\025ClusterProtocol.proto\"\225\001\n\033RemoteDaemon" +
+ "\n\025ClusterProtocol.proto\"\304\001\n\033RemoteDaemon" +
"MessageProtocol\022-\n\013messageType\030\001 \002(\0162\030.R" +
"emoteDaemonMessageType\022 \n\tactorUuid\030\002 \001(" +
"\0132\r.UuidProtocol\022\024\n\014actorAddress\030\003 \001(\t\022\017" +
- "\n\007payload\030\005 \001(\014\"\212\001\n\035DurableMailboxMessag" +
- "eProtocol\022\031\n\021ownerActorAddress\030\001 \002(\t\022\032\n\022" +
- "senderActorAddress\030\002 \001(\t\022!\n\nfutureUuid\030\003" +
- " \001(\0132\r.UuidProtocol\022\017\n\007message\030\004 \002(\014\")\n\014" +
- "UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004*" +
- "\232\002\n\027RemoteDaemonMessageType\022\t\n\005START\020\001\022\010",
- "\n\004STOP\020\002\022\007\n\003USE\020\003\022\013\n\007RELEASE\020\004\022\022\n\016MAKE_A" +
- "VAILABLE\020\005\022\024\n\020MAKE_UNAVAILABLE\020\006\022\016\n\nDISC" +
- "ONNECT\020\007\022\r\n\tRECONNECT\020\010\022\n\n\006RESIGN\020\t\022\031\n\025F" +
- "AIL_OVER_CONNECTIONS\020\n\022\026\n\022FUNCTION_FUN0_" +
- "UNIT\020\013\022\025\n\021FUNCTION_FUN0_ANY\020\014\022\032\n\026FUNCTIO" +
- "N_FUN1_ARG_UNIT\020\r\022\031\n\025FUNCTION_FUN1_ARG_A" +
- "NY\020\016B\020\n\014akka.clusterH\001"
+ "\n\007payload\030\005 \001(\014\022-\n\026replicateActorFromUui" +
+ "d\030\006 \001(\0132\r.UuidProtocol\"\212\001\n\035DurableMailbo" +
+ "xMessageProtocol\022\031\n\021ownerActorAddress\030\001 " +
+ "\002(\t\022\032\n\022senderActorAddress\030\002 \001(\t\022!\n\nfutur" +
+ "eUuid\030\003 \001(\0132\r.UuidProtocol\022\017\n\007message\030\004 " +
+ "\002(\014\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low",
+ "\030\002 \002(\004*\232\002\n\027RemoteDaemonMessageType\022\t\n\005ST" +
+ "ART\020\001\022\010\n\004STOP\020\002\022\007\n\003USE\020\003\022\013\n\007RELEASE\020\004\022\022\n" +
+ "\016MAKE_AVAILABLE\020\005\022\024\n\020MAKE_UNAVAILABLE\020\006\022" +
+ "\016\n\nDISCONNECT\020\007\022\r\n\tRECONNECT\020\010\022\n\n\006RESIGN" +
+ "\020\t\022\031\n\025FAIL_OVER_CONNECTIONS\020\n\022\026\n\022FUNCTIO" +
+ "N_FUN0_UNIT\020\013\022\025\n\021FUNCTION_FUN0_ANY\020\014\022\032\n\026" +
+ "FUNCTION_FUN1_ARG_UNIT\020\r\022\031\n\025FUNCTION_FUN" +
+ "1_ARG_ANY\020\016B\020\n\014akka.clusterH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1940,7 +2096,7 @@ public final class ClusterProtocol {
internal_static_RemoteDaemonMessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteDaemonMessageProtocol_descriptor,
- new java.lang.String[] { "MessageType", "ActorUuid", "ActorAddress", "Payload", },
+ new java.lang.String[] { "MessageType", "ActorUuid", "ActorAddress", "Payload", "ReplicateActorFromUuid", },
akka.cluster.ClusterProtocol.RemoteDaemonMessageProtocol.class,
akka.cluster.ClusterProtocol.RemoteDaemonMessageProtocol.Builder.class);
internal_static_DurableMailboxMessageProtocol_descriptor =
diff --git a/akka-cluster/src/main/protocol/ClusterProtocol.proto b/akka-cluster/src/main/protocol/ClusterProtocol.proto
index 1287c1d9f0..e5d2b5ebf0 100644
--- a/akka-cluster/src/main/protocol/ClusterProtocol.proto
+++ b/akka-cluster/src/main/protocol/ClusterProtocol.proto
@@ -19,6 +19,7 @@ message RemoteDaemonMessageProtocol {
optional UuidProtocol actorUuid = 2;
optional string actorAddress = 3;
optional bytes payload = 5;
+ optional UuidProtocol replicateActorFromUuid = 6;
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 5b8a72a66e..772d614264 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -22,9 +22,6 @@ import scala.collection.immutable.{ HashMap, HashSet }
import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._
-import ClusterProtocol._
-import RemoteDaemonMessageType._
-
import akka.util._
import Helpers._
@@ -42,12 +39,16 @@ import akka.config.{ Config, Supervision }
import Supervision._
import Config._
-import akka.serialization.{ Serialization, Serializer, Compression }
+import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization }
+import ActorSerialization._
import Compression.LZF
-import akka.AkkaException
import akka.cluster.zookeeper._
-import akka.cluster.ChangeListener._
+import ChangeListener._
+import ClusterProtocol._
+import RemoteDaemonMessageType._
+
+import akka.AkkaException
import com.eaio.uuid.UUID
@@ -113,7 +114,7 @@ trait ClusterNodeMBean {
}
/**
- * Module for the ClusterNode. Also holds global state such as configuration data etc.
+ * Module for the Cluster. Also holds global state such as configuration data etc.
*
* @author Jonas Bonér
*/
@@ -131,6 +132,10 @@ object Cluster {
val enableJMX = config.getBool("akka.enable-jmx", true)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
+ val clusterDirectory = config.getString("akka.cluster.log-directory", "_akka_cluster")
+
+ val clusterDataDirectory = clusterDirectory + "/data"
+ val clusterLogDirectory = clusterDirectory + "/log"
@volatile
private var properties = Map.empty[String, String]
@@ -189,19 +194,19 @@ object Cluster {
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(): ZkServer =
- startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, 5000)
+ startLocalCluster(clusterDataDirectory, clusterLogDirectory, 2181, 5000)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(port: Int, tickTime: Int): ZkServer =
- startLocalCluster("_akka_cluster/data", "_akka_cluster/log", port, tickTime)
+ startLocalCluster(clusterDataDirectory, clusterLogDirectory, port, tickTime)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(tickTime: Int): ZkServer =
- startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, tickTime)
+ startLocalCluster(clusterDataDirectory, clusterLogDirectory, 2181, tickTime)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
@@ -322,7 +327,7 @@ class DefaultClusterNode private[akka] (
}
}, "akka.cluster.RemoteClientLifeCycleListener").start()
- private[cluster] lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
+ private[cluster] lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.Address).start()
private[cluster] lazy val remoteDaemonSupervisor = Supervisor(
SupervisorConfig(
@@ -335,7 +340,7 @@ class DefaultClusterNode private[akka] (
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(hostname, port)
- remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon)
+ remote.register(RemoteClusterDaemon.Address, remoteDaemon)
remote.addListener(remoteClientLifeCycleListener)
remote
}
@@ -676,21 +681,21 @@ class DefaultClusterNode private[akka] (
case Left(path) ⇒ path
case Right(exception) ⇒ actorAddressRegistryPath
}
-
- // create ADDRESS -> SERIALIZER CLASS NAME mapping
- try {
- zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
- } catch {
- case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
- }
-
- // create ADDRESS -> NODE mapping
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress)))
-
- // create ADDRESS -> UUIDs mapping
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
}
+ // create ADDRESS -> SERIALIZER CLASS NAME mapping
+ try {
+ zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
+ } catch {
+ case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
+ }
+
+ // create ADDRESS -> NODE mapping
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress)))
+
+ // create ADDRESS -> UUIDs mapping
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
+
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
this
@@ -738,20 +743,20 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, serializerForActor(actorAddress))
+ def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) {
+ def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
- zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ ActorRef]]() {
- def call: Either[Exception, () ⇒ ActorRef] = {
+ zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ LocalActorRef]]() {
+ def call: Either[Exception, () ⇒ LocalActorRef] = {
try {
val actorFactoryBytes =
@@ -759,9 +764,9 @@ class DefaultClusterNode private[akka] (
else zkClient.connection.readData(actorFactoryPath, new Stat, false)
val actorFactory =
- Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ ActorRef], None) match {
+ Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match {
case Left(error) ⇒ throw error
- case Right(instance) ⇒ instance.asInstanceOf[() ⇒ ActorRef]
+ case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef]
}
Right(actorFactory)
@@ -825,16 +830,20 @@ class DefaultClusterNode private[akka] (
/**
* Using (checking out) actor on a specific set of nodes.
*/
- def useActorOnNodes(nodes: Array[String], actorAddress: String) {
+ def useActorOnNodes(nodes: Array[String], actorAddress: String, replicateFromUuid: Option[UUID] = None) {
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
if (isConnected.get) {
- val command = RemoteDaemonMessageProtocol.newBuilder
+ val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
- .build
+
+ // set the UUID to replicated from - if available
+ replicateFromUuid foreach (uuid ⇒ builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
+
+ val command = builder.build
nodes foreach { node ⇒
nodeConnections.get(node) foreach {
@@ -848,15 +857,15 @@ class DefaultClusterNode private[akka] (
/**
* Using (checking out) actor on all nodes in the cluster.
*/
- def useActorOnAllNodes(actorAddress: String) {
- useActorOnNodes(membershipNodes, actorAddress)
+ def useActorOnAllNodes(actorAddress: String, replicateFromUuid: Option[UUID] = None) {
+ useActorOnNodes(membershipNodes, actorAddress, replicateFromUuid)
}
/**
* Using (checking out) actor on a specific node.
*/
- def useActorOnNode(node: String, actorAddress: String) {
- useActorOnNodes(Array(node), actorAddress)
+ def useActorOnNode(node: String, actorAddress: String, replicateFromUuid: Option[UUID] = None) {
+ useActorOnNodes(Array(node), actorAddress, replicateFromUuid)
}
/**
@@ -922,29 +931,6 @@ class DefaultClusterNode private[akka] (
} else throw new ClusterException("Not connected to cluster")
- /**
- * Migrate the actor from 'this' node to node 'to'.
- */
- def migrate(to: NodeAddress, actorAddress: String) {
- migrate(nodeAddress, to, actorAddress)
- }
-
- /**
- * Migrate the actor from node 'from' to node 'to'.
- */
- def migrate(
- from: NodeAddress, to: NodeAddress, actorAddress: String) {
- if (isConnected.get) {
- if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
- if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
- if (isInUseOnNode(actorAddress, from)) {
- migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress)
- } else {
- throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node")
- }
- }
- }
-
/**
* Returns the UUIDs of all actors checked out on this node.
*/
@@ -1285,7 +1271,7 @@ class DefaultClusterNode private[akka] (
val preferredNodes =
if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
- case Deploy(_, _, _, Clustered(nodes, _, _)) ⇒
+ case Deploy(_, _, Clustered(nodes, _, _)) ⇒
nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor
case _ ⇒
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
@@ -1360,7 +1346,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
- val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
+ val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
nodeConnections.put(node, (address, clusterDaemon))
}
}
@@ -1457,7 +1443,16 @@ class DefaultClusterNode private[akka] (
nodeAddress
}
- migrateWithoutCheckingThatActorResidesOnItsHomeNode(failedNodeAddress, migrateToNodeAddress, actorAddress) // since the ephemeral node is already gone, so can't check
+ // if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
+ val replicateFromUuid =
+ if (isReplicated(actorAddress)) Some(uuid)
+ else None
+
+ migrateWithoutCheckingThatActorResidesOnItsHomeNode(
+ failedNodeAddress,
+ migrateToNodeAddress,
+ actorAddress,
+ replicateFromUuid)
}
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
@@ -1486,7 +1481,7 @@ class DefaultClusterNode private[akka] (
* Used when the ephemeral "home" node is already gone, so we can't check if it is available.
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
- from: NodeAddress, to: NodeAddress, actorAddress: String) {
+ from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to))
if (!isInUseOnNode(actorAddress, to)) {
@@ -1502,7 +1497,7 @@ class DefaultClusterNode private[akka] (
//ignore[ZkNoNodeException](zkClient.delete(nodeToUuidsPathFor(from.nodeName, uuid)))
// 'use' (check out) actor on the remote 'to' node
- useActorOnNode(to.nodeName, actorAddress)
+ useActorOnNode(to.nodeName, actorAddress, replicateFromUuid)
}
}
@@ -1542,6 +1537,8 @@ class DefaultClusterNode private[akka] (
connectToAllNewlyArrivedMembershipNodesInCluster(membershipNodes, Nil)
}
+ private def isReplicated(actorAddress: String): Boolean = DeploymentConfig.isReplicated(Deployer.deploymentFor(actorAddress))
+
private def createMBean = {
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
@@ -1672,7 +1669,7 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
trait ErrorHandler {
def withErrorHandler[T](body: ⇒ T) = {
try {
- ignore[ZkInterruptedException](body)
+ ignore[ZkInterruptedException](body) // FIXME Is it good to ignore ZkInterruptedException? If not, how should we handle it?
} catch {
case e: Throwable ⇒
EventHandler.error(e, this, e.toString)
@@ -1685,13 +1682,15 @@ trait ErrorHandler {
* @author Jonas Bonér
*/
object RemoteClusterDaemon {
- val ADDRESS = "akka-cluster-daemon".intern
+ val Address = "akka-cluster-daemon".intern
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
}
/**
+ * Internal "daemon" actor for cluster internal communication.
+ *
* @author Jonas Bonér
*/
class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
@@ -1718,14 +1717,83 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer ⇒
- cluster.use(actorAddress, serializer) foreach { actor ⇒
- cluster.remoteService.register(actorAddress, actor)
+ cluster.use(actorAddress, serializer) foreach { newActorRef ⇒
+ cluster.remoteService.register(actorAddress, newActorRef)
+
+ if (message.hasReplicateActorFromUuid) {
+ // replication is used - fetch the messages and replay them
+ import akka.remote.protocol.RemoteProtocol._
+ import akka.remote.MessageSerializer
+
+ val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
+ val deployment = Deployer.deploymentFor(actorAddress)
+ val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
+ throw new IllegalStateException(
+ "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
+ val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
+
+ try {
+ // get the transaction log for the actor UUID
+ val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
+
+ // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
+ val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
+
+ // deserialize and restore actor snapshot
+ val actorRefToUseForReplay =
+ snapshotAsBytes match {
+
+ // we have a new actor ref - the snapshot
+ case Some(bytes) ⇒
+ // stop the new actor ref and use the snapshot instead
+ cluster.remoteService.unregister(actorAddress)
+
+ // deserialize the snapshot actor ref and register it as remote actor
+ val uncompressedBytes =
+ if (Cluster.shouldCompressData) LZF.uncompress(bytes)
+ else bytes
+
+ val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
+ cluster.remoteService.register(actorAddress, snapshotActorRef)
+
+ // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
+ //newActorRef.stop()
+
+ snapshotActorRef
+
+ // we have no snapshot - use the new actor ref
+ case None ⇒
+ newActorRef
+ }
+
+ // deserialize the messages
+ val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒
+ val messageBytes =
+ if (Cluster.shouldCompressData) LZF.uncompress(bytes)
+ else bytes
+ MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
+ }
+
+ EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
+
+ // replay all messages
+ messages foreach { message ⇒
+ EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
+
+ // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
+ actorRefToUseForReplay ! message
+ }
+
+ } catch {
+ case e: Throwable ⇒
+ EventHandler.error(e, this, e.toString)
+ throw e
+ }
+ }
}
}
} else {
- EventHandler.error(this,
- "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]"
- .format(message))
+ EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success)
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index 55e1fb2c33..6f251eb593 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -167,7 +167,7 @@ object ClusterDeployer {
ensureRunning {
LocalDeployer.deploy(deployment)
deployment match {
- case Deploy(_, _, _, Local) ⇒ {} // local deployment, do nothing here
+ case Deploy(_, _, Local) ⇒ {} // local deployment, do nothing here
case _ ⇒ // cluster deployment
val path = deploymentAddressPath.format(deployment.address)
try {
diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
index 510fd9415e..7a15673754 100644
--- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
@@ -18,9 +18,8 @@ import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, Wri
import akka.event.EventHandler
import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation }
import akka.remote.MessageSerializer
-import akka.serialization.ActorSerialization._
import akka.cluster.zookeeper._
-import akka.serialization.{ Serializer, Compression }
+import akka.serialization.{ Serializer, Serialization, Compression }
import Compression.LZF
import akka.serialization.ActorSerialization._
@@ -33,15 +32,11 @@ import java.util.concurrent.atomic.AtomicLong
// FIXME delete tx log after migration of actor has been made and create a new one
/**
- * TODO: Improved documentation,
- *
* @author Jonas Bonér
*/
class ReplicationException(message: String) extends AkkaException(message)
/**
- * TODO: Improved documentation.
- *
* TODO: Explain something about threadsafety.
*
* A TransactionLog makes chunks of data durable.
@@ -52,39 +47,34 @@ class TransactionLog private (
ledger: LedgerHandle,
val id: String,
val isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer) {
+ replicationScheme: ReplicationScheme) {
import TransactionLog._
val logId = ledger.getId
val txLogPath = transactionLogNode + "/" + id
val snapshotPath = txLogPath + "/snapshot"
- val nrOfEntries = new AtomicLong(0)
private val isOpen = new Switch(true)
/**
- * TODO document method
+ * Record an Actor message invocation.
*/
- def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) {
- if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) {
- val snapshot =
- // FIXME ReplicationStrategy Transient is always used
- if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme))
- else toBinary(actorRef, false, replicationScheme)
- recordSnapshot(snapshot)
- }
- recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray)
+ def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) {
+ val entryId = ledger.getLastAddPushed + 1
+ if (entryId != 0 && (entryId % snapshotFrequency) == 0) {
+ recordSnapshot(toBinary(actorRef, false, replicationScheme))
+ } else recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray)
}
/**
- * TODO document method
+ * Record an entry.
*/
def recordEntry(entry: Array[Byte]) {
if (isOpen.isOn) {
- val bytes = if (Cluster.shouldCompressData) LZF.compress(entry)
- else entry
+ val bytes =
+ if (Cluster.shouldCompressData) LZF.compress(entry)
+ else entry
try {
if (isAsync) {
ledger.asyncAddEntry(
@@ -96,8 +86,7 @@ class TransactionLog private (
entryId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
- EventHandler.debug(this,
- "Writing entry [%s] to log [%s]".format(entryId, logId))
+ EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
}
},
null)
@@ -113,12 +102,13 @@ class TransactionLog private (
}
/**
- * TODO document method
+ * Record a snapshot.
*/
def recordSnapshot(snapshot: Array[Byte]) {
if (isOpen.isOn) {
- val bytes = if (Cluster.shouldCompressData) LZF.compress(snapshot)
- else snapshot
+ val bytes =
+ if (Cluster.shouldCompressData) LZF.compress(snapshot)
+ else snapshot
try {
if (isAsync) {
ledger.asyncAddEntry(
@@ -127,16 +117,20 @@ class TransactionLog private (
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
- entryId: Long,
+ snapshotId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
- storeSnapshotMetaDataInZooKeeper(entryId)
+ EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId))
+ storeSnapshotMetaDataInZooKeeper(snapshotId)
}
},
null)
} else {
handleReturnCode(ledger.addEntry(bytes))
- storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed)
+ val snapshotId = ledger.getLastAddPushed
+
+ EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId))
+ storeSnapshotMetaDataInZooKeeper(snapshotId)
}
} catch {
case e ⇒ handleError(e)
@@ -145,22 +139,36 @@ class TransactionLog private (
}
/**
- * TODO document method
+ * Get all the entries for this transaction log.
*/
def entries: Vector[Array[Byte]] = entriesInRange(0, ledger.getLastAddConfirmed)
/**
- * TODO document method
+ * Get the latest snapshot and all subsequent entries from this snapshot.
*/
- def toByteArraysLatestSnapshot: (Array[Byte], Vector[Array[Byte]]) = {
- val snapshotId = latestSnapshotId
- EventHandler.debug(this,
- "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId))
- (entriesInRange(snapshotId, snapshotId).head, entriesInRange(snapshotId + 1, ledger.getLastAddConfirmed))
+ def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]]) = {
+ latestSnapshotId match {
+ case Some(snapshotId) ⇒
+ EventHandler.debug(this, "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId))
+
+ val cursor = snapshotId + 1
+ val lastIndex = ledger.getLastAddConfirmed
+
+ val snapshot = Some(entriesInRange(snapshotId, snapshotId).head)
+
+ val entries =
+ if ((cursor - lastIndex) == 0) Vector.empty[Array[Byte]]
+ else entriesInRange(cursor, lastIndex)
+
+ (snapshot, entries)
+
+ case None ⇒
+ (None, entries)
+ }
}
/**
- * TODO document method
+ * Get a range of entries from 'from' to 'to' for this transaction log.
*/
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] = if (isOpen.isOn) {
try {
@@ -180,8 +188,10 @@ class TransactionLog private (
ledgerHandle: LedgerHandle,
enumeration: Enumeration[LedgerEntry],
ctx: AnyRef) {
+
val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]]
val entries = toByteArrays(enumeration)
+
if (returnCode == BKException.Code.OK) future.completeWithResult(entries)
else future.completeWithException(BKException.create(returnCode))
}
@@ -197,29 +207,27 @@ class TransactionLog private (
} else transactionClosedError
/**
- * TODO document method
+ * Get the last entry written to this transaction log.
*/
def latestEntryId: Long = ledger.getLastAddConfirmed
/**
- * TODO document method
+ * Get the id for the last snapshot written to this transaction log.
*/
- def latestSnapshotId: Long = {
+ def latestSnapshotId: Option[Long] = {
try {
val snapshotId = zkClient.readData(snapshotPath).asInstanceOf[Long]
EventHandler.debug(this,
"Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId))
- snapshotId
+ Some(snapshotId)
} catch {
- case e: ZkNoNodeException ⇒
- handleError(new ReplicationException(
- "Transaction log for UUID [" + id + "] does not have a snapshot recorded in ZooKeeper"))
- case e ⇒ handleError(e)
+ case e: ZkNoNodeException ⇒ None
+ case e ⇒ handleError(e)
}
}
/**
- * TODO document method
+ * Delete all entries for this transaction log.
*/
def delete() {
if (isOpen.isOn) {
@@ -244,7 +252,7 @@ class TransactionLog private (
}
/**
- * TODO document method
+ * Close this transaction log.
*/
def close() {
if (isOpen.switchOff) {
@@ -371,9 +379,8 @@ object TransactionLog {
ledger: LedgerHandle,
id: String,
isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer) =
- new TransactionLog(ledger, id, isAsync, replicationScheme, format)
+ replicationScheme: ReplicationScheme) =
+ new TransactionLog(ledger, id, isAsync, replicationScheme)
/**
* Shuts down the transaction log.
@@ -392,13 +399,12 @@ object TransactionLog {
}
/**
- * TODO document method
+ * Creates a new transaction log for the 'id' specified.
*/
def newLogFor(
id: String,
isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer): TransactionLog = {
+ replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
@@ -443,17 +449,16 @@ object TransactionLog {
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
- TransactionLog(ledger, id, isAsync, replicationScheme, format)
+ TransactionLog(ledger, id, isAsync, replicationScheme)
}
/**
- * TODO document method
+ * Fetches an existing transaction log for the 'id' specified.
*/
def logFor(
id: String,
isAsync: Boolean,
- replicationScheme: ReplicationScheme,
- format: Serializer): TransactionLog = {
+ replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
@@ -493,7 +498,7 @@ object TransactionLog {
case e ⇒ handleError(e)
}
- TransactionLog(ledger, id, isAsync, replicationScheme, format)
+ TransactionLog(ledger, id, isAsync, replicationScheme)
}
private[akka] def await[T](future: Promise[T]): T = {
diff --git a/akka-cluster/src/main/scala/akka/cluster/untitled b/akka-cluster/src/main/scala/akka/cluster/untitled
new file mode 100644
index 0000000000..ec128ad190
--- /dev/null
+++ b/akka-cluster/src/main/scala/akka/cluster/untitled
@@ -0,0 +1,41 @@
+
+diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
+index b7183ca..c267bc6 100644
+--- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
++++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
+@@ -107,7 +107,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
+
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
+
+ val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
+ entries.size must equal(4)
+@@ -136,7 +136,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
+
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
+
+ val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
+ entries.size must equal(2)
+@@ -251,7 +251,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
+ Thread.sleep(200)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+ Thread.sleep(200)
+- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
+
+ val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
+ Thread.sleep(200)
+@@ -290,7 +290,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
+ Thread.sleep(200)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+ Thread.sleep(200)
+- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
+ val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
+ Thread.sleep(200)
+ entries.size must equal(2)
\ No newline at end of file
diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala
index c481ac899e..cd64a83067 100644
--- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala
+++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala
@@ -20,6 +20,8 @@ import java.net.InetSocketAddress
import com.google.protobuf.ByteString
+import com.eaio.uuid.UUID
+
/**
* Module for local actor serialization.
*/
@@ -27,10 +29,13 @@ object ActorSerialization {
implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef =
- fromBinaryToLocalActorRef(bytes, Some(homeAddress))
+ fromBinaryToLocalActorRef(bytes, None, Some(homeAddress))
+
+ def fromBinary[T <: Actor](bytes: Array[Byte], uuid: UUID): ActorRef =
+ fromBinaryToLocalActorRef(bytes, Some(uuid), None)
def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef =
- fromBinaryToLocalActorRef(bytes, None)
+ fromBinaryToLocalActorRef(bytes, None, None)
def toBinary[T <: Actor](
a: ActorRef,
@@ -126,13 +131,16 @@ object ActorSerialization {
private def fromBinaryToLocalActorRef[T <: Actor](
bytes: Array[Byte],
+ uuid: Option[UUID],
homeAddress: Option[InetSocketAddress]): ActorRef = {
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
- fromProtobufToLocalActorRef(builder.build, None)
+ fromProtobufToLocalActorRef(builder.build, uuid, None)
}
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
- protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
+ protocol: SerializedActorRefProtocol,
+ overriddenUuid: Option[UUID],
+ loader: Option[ClassLoader]): ActorRef = {
val lifeCycle =
if (protocol.hasLifeCycle) {
@@ -147,28 +155,27 @@ object ActorSerialization {
if (protocol.hasSupervisor) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
- import ReplicationStorageType._
- import ReplicationStrategyType._
-
- val replicationScheme =
- if (protocol.hasReplicationStorage) {
- protocol.getReplicationStorage match {
- case TRANSIENT ⇒ Transient
- case store ⇒
- val storage = store match {
- case TRANSACTION_LOG ⇒ TransactionLog
- case DATA_GRID ⇒ DataGrid
- }
- val strategy = if (protocol.hasReplicationStrategy) {
- protocol.getReplicationStrategy match {
- case WRITE_THROUGH ⇒ WriteThrough
- case WRITE_BEHIND ⇒ WriteBehind
- }
- } else throw new IllegalActorStateException(
- "Expected replication strategy for replication storage [" + storage + "]")
- Replication(storage, strategy)
- }
- } else Transient
+ // import ReplicationStorageType._
+ // import ReplicationStrategyType._
+ // val replicationScheme =
+ // if (protocol.hasReplicationStorage) {
+ // protocol.getReplicationStorage match {
+ // case TRANSIENT ⇒ Transient
+ // case store ⇒
+ // val storage = store match {
+ // case TRANSACTION_LOG ⇒ TransactionLog
+ // case DATA_GRID ⇒ DataGrid
+ // }
+ // val strategy = if (protocol.hasReplicationStrategy) {
+ // protocol.getReplicationStrategy match {
+ // case WRITE_THROUGH ⇒ WriteThrough
+ // case WRITE_BEHIND ⇒ WriteBehind
+ // }
+ // } else throw new IllegalActorStateException(
+ // "Expected replication strategy for replication storage [" + storage + "]")
+ // Replication(storage, strategy)
+ // }
+ // } else Transient
val hotswap =
try {
@@ -197,16 +204,20 @@ object ActorSerialization {
}
}
+ val actorUuid = overriddenUuid match {
+ case Some(uuid) ⇒ uuid
+ case None ⇒ uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow)
+ }
+
val ar = new LocalActorRef(
- uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
+ actorUuid,
protocol.getAddress,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None,
lifeCycle,
supervisor,
hotswap,
- factory,
- replicationScheme)
+ factory)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message ⇒ ar ! MessageSerializer.deserialize(message.getMessage, Some(classLoader)))
diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
index bfffdd74c6..c267bc6f98 100644
--- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala
@@ -32,31 +32,31 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"A Transaction Log" should {
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
- intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, JavaSerializer))
+ intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
@@ -66,15 +66,15 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
- txlog1.close
+ // txlog1.close // should work without txlog.close
- val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
@@ -86,7 +86,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
@@ -94,7 +94,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@@ -105,9 +105,9 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer)
- val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(4)
@@ -120,7 +120,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, false, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -134,9 +134,9 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, false, null, JavaSerializer)
- val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
+ val txlog2 = TransactionLog.logFor(uuid, false, null)
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
+ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(2)
@@ -149,7 +149,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"A Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog = TransactionLog.newLogFor(uuid, true, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
Thread.sleep(200)
@@ -158,7 +158,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record and delete entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -167,11 +167,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.delete
Thread.sleep(200)
- intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, JavaSerializer))
+ intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null))
}
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -180,7 +180,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, true, null)
Thread.sleep(200)
val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8"))
Thread.sleep(200)
@@ -193,7 +193,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record entries and read entries with 'entries' - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -206,7 +206,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, true, null)
val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8"))
Thread.sleep(200)
entries.size must equal(4)
@@ -220,7 +220,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record a snapshot - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@@ -230,7 +230,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record and read a snapshot and following entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@@ -247,11 +247,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, true, null)
Thread.sleep(200)
- val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
Thread.sleep(200)
- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
+ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
Thread.sleep(200)
@@ -266,7 +266,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true, null, JavaSerializer)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null)
Thread.sleep(200)
val entry = "hello".getBytes("UTF-8")
@@ -286,11 +286,11 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true, null, JavaSerializer)
+ val txlog2 = TransactionLog.logFor(uuid, true, null)
Thread.sleep(200)
- val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
+ val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries
Thread.sleep(200)
- new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
+ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))
Thread.sleep(200)
entries.size must equal(2)
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala
deleted file mode 100644
index e715571a21..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Copyright (C) 2009-2011 Scalable Solutions AB
- */
-
-package akka.cluster.api.migration.explicit
-
-import org.scalatest.WordSpec
-import org.scalatest.matchers.MustMatchers
-import org.scalatest.BeforeAndAfterAll
-
-import akka.actor._
-import Actor._
-import akka.cluster._
-import ChangeListener._
-import Cluster._
-import akka.config.Config
-import akka.serialization.Serialization
-
-import java.util.concurrent._
-
-object MigrationExplicitMultiJvmSpec {
- var NrOfNodes = 2
-
- class HelloWorld extends Actor with Serializable {
- def receive = {
- case "Hello" ⇒
- self.reply("World from node [" + Config.nodename + "]")
- }
- }
-}
-
-class MigrationExplicitMultiJvmNode1 extends MasterClusterTestNode {
- import MigrationExplicitMultiJvmSpec._
-
- val testNodes = NrOfNodes
-
- "A cluster" must {
-
- "be able to migrate an actor from one node to another" in {
-
- barrier("start-node-1", NrOfNodes) {
- node.start()
- }
-
- barrier("start-node-2", NrOfNodes) {
- }
-
- barrier("store-1-in-node-1", NrOfNodes) {
- val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s)
- node.store("hello-world", classOf[HelloWorld], serializer)
- }
-
- barrier("use-1-in-node-2", NrOfNodes) {
- }
-
- barrier("migrate-from-node2-to-node1", NrOfNodes) {
- }
-
- barrier("check-actor-is-moved-to-node1", NrOfNodes) {
- node.isInUseOnNode("hello-world") must be(true)
-
- val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
- actorRef.address must be("hello-world")
- (actorRef ? "Hello").as[String].get must be("World from node [node1]")
- }
-
- node.shutdown()
- }
- }
-}
-
-class MigrationExplicitMultiJvmNode2 extends ClusterTestNode {
- import MigrationExplicitMultiJvmSpec._
-
- "A cluster" must {
-
- "be able to migrate an actor from one node to another" in {
-
- barrier("start-node-1", NrOfNodes) {
- }
-
- barrier("start-node-2", NrOfNodes) {
- node.start()
- }
-
- barrier("store-1-in-node-1", NrOfNodes) {
- }
-
- barrier("use-1-in-node-2", NrOfNodes) {
- val actorOrOption = node.use("hello-world")
- if (actorOrOption.isEmpty) fail("Actor could not be retrieved")
-
- val actorRef = actorOrOption.get
- actorRef.address must be("hello-world")
-
- (actorRef ? "Hello").as[String].get must be("World from node [node2]")
- }
-
- barrier("migrate-from-node2-to-node1", NrOfNodes) {
- node.migrate(NodeAddress(node.nodeAddress.clusterName, "node1"), "hello-world")
- Thread.sleep(2000)
- }
-
- barrier("check-actor-is-moved-to-node1", NrOfNodes) {
- }
-
- node.shutdown()
- }
- }
-}
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.conf
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode1.opts
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode1.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.conf
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode2.opts
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode2.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.conf
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmNode3.opts
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmNode3.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
similarity index 98%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
rename to akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
index 82f240a9df..c929fdeb6f 100644
--- a/akka-cluster/src/test/scala/akka/cluster/api/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/migration/automatic/MigrationAutomaticMultiJvmSpec.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Scalable Solutions AB
*/
-package akka.cluster.api.migration.automatic
+package akka.cluster.migration.automatic
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode1.conf
rename to akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode1.opts
rename to akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode1.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode2.conf
rename to akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmNode2.opts
rename to akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmNode2.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala
new file mode 100644
index 0000000000..0772b7798a
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/migration/explicit/MigrationExplicitMultiJvmSpec.scala
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ *
+ *
+ * package akka.cluster.migration.explicit
+ *
+ * import org.scalatest.WordSpec
+ * import org.scalatest.matchers.MustMatchers
+ * import org.scalatest.BeforeAndAfterAll
+ *
+ * import akka.actor._
+ * import Actor._
+ * import akka.cluster._
+ * import ChangeListener._
+ * import Cluster._
+ * import akka.config.Config
+ * import akka.serialization.Serialization
+ *
+ * import java.util.concurrent._
+ *
+ * object MigrationExplicitMultiJvmSpec {
+ * var NrOfNodes = 2
+ *
+ * class HelloWorld extends Actor with Serializable {
+ * def receive = {
+ * case "Hello" ⇒
+ * self.reply("World from node [" + Config.nodename + "]")
+ * }
+ * }
+ * }
+ *
+ * class MigrationExplicitMultiJvmNode1 extends MasterClusterTestNode {
+ * import MigrationExplicitMultiJvmSpec._
+ *
+ * val testNodes = NrOfNodes
+ *
+ * "A cluster" must {
+ *
+ * "be able to migrate an actor from one node to another" in {
+ *
+ * barrier("start-node-1", NrOfNodes) {
+ * node.start()
+ * }
+ *
+ * barrier("start-node-2", NrOfNodes) {
+ * }
+ *
+ * barrier("store-1-in-node-1", NrOfNodes) {
+ * val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x ⇒ fail("No serializer found"), s ⇒ s)
+ * node.store("hello-world", classOf[HelloWorld], serializer)
+ * }
+ *
+ * barrier("use-1-in-node-2", NrOfNodes) {
+ * }
+ *
+ * barrier("migrate-from-node2-to-node1", NrOfNodes) {
+ * }
+ *
+ * barrier("check-actor-is-moved-to-node1", NrOfNodes) {
+ * node.isInUseOnNode("hello-world") must be(true)
+ *
+ * val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ * actorRef.address must be("hello-world")
+ * (actorRef ? "Hello").as[String].get must be("World from node [node1]")
+ * }
+ *
+ * node.shutdown()
+ * }
+ * }
+ * }
+ *
+ * class MigrationExplicitMultiJvmNode2 extends ClusterTestNode {
+ * import MigrationExplicitMultiJvmSpec._
+ *
+ * "A cluster" must {
+ *
+ * "be able to migrate an actor from one node to another" in {
+ *
+ * barrier("start-node-1", NrOfNodes) {
+ * }
+ *
+ * barrier("start-node-2", NrOfNodes) {
+ * node.start()
+ * }
+ *
+ * barrier("store-1-in-node-1", NrOfNodes) {
+ * }
+ *
+ * barrier("use-1-in-node-2", NrOfNodes) {
+ * val actorOrOption = node.use("hello-world")
+ * if (actorOrOption.isEmpty) fail("Actor could not be retrieved")
+ *
+ * val actorRef = actorOrOption.get
+ * actorRef.address must be("hello-world")
+ *
+ * (actorRef ? "Hello").as[String].get must be("World from node [node2]")
+ * }
+ *
+ * barrier("migrate-from-node2-to-node1", NrOfNodes) {
+ * node.migrate(NodeAddress(node.nodeAddress.clusterName, "node1"), "hello-world")
+ * Thread.sleep(2000)
+ * }
+ *
+ * barrier("check-actor-is-moved-to-node1", NrOfNodes) {
+ * }
+ *
+ * node.shutdown()
+ * }
+ * }
+ * }
+ */
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
new file mode 100644
index 0000000000..d8bee0cb07
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
new file mode 100644
index 0000000000..d8bee0cb07
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
new file mode 100644
index 0000000000..7ed05307ae
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.replication.transactionlog.writebehind.nosnapshot
+
+import akka.actor._
+import akka.cluster._
+import Cluster._
+import akka.config.Config
+
+object ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec {
+ var NrOfNodes = 2
+
+ sealed trait TransactionLogMessage extends Serializable
+ case class Count(nr: Int) extends TransactionLogMessage
+ case class Log(full: String) extends TransactionLogMessage
+ case object GetLog extends TransactionLogMessage
+
+ class HelloWorld extends Actor with Serializable {
+ var log = ""
+ def receive = {
+ case Count(nr) ⇒
+ log += nr.toString
+ self.reply("World from node [" + Config.nodename + "]")
+ case GetLog ⇒
+ self.reply(Log(log))
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends ClusterTestNode {
+ import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ node.start()
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
+ node.isInUseOnNode("hello-world") must be(true)
+ actorRef.address must be("hello-world")
+ var counter = 0
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends MasterClusterTestNode {
+ import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ node.start()
+ }
+
+ Thread.sleep(5000) // wait for fail-over from node1 to node2
+
+ barrier("check-fail-over-to-node2", NrOfNodes - 1) {
+ // both remaining nodes should now have the replica
+ node.isInUseOnNode("hello-world") must be(true)
+ val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ actorRef.address must be("hello-world")
+ (actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
+ }
+
+ node.shutdown()
+ }
+ }
+
+ override def onReady() {
+ LocalBookKeeperEnsemble.start()
+ }
+
+ override def onShutdown() {
+ TransactionLog.shutdown()
+ LocalBookKeeperEnsemble.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf
new file mode 100644
index 0000000000..8aeaf3135f
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
+akka.cluster.replication.snapshot-frequency = 7
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf
new file mode 100644
index 0000000000..8aeaf3135f
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind"
+akka.cluster.replication.snapshot-frequency = 7
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala
new file mode 100644
index 0000000000..c37a863ba0
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.replication.transactionlog.writebehind.snapshot
+
+import akka.actor._
+import akka.cluster._
+import Cluster._
+import akka.config.Config
+
+object ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec {
+ var NrOfNodes = 2
+
+ sealed trait TransactionLogMessage extends Serializable
+ case class Count(nr: Int) extends TransactionLogMessage
+ case class Log(full: String) extends TransactionLogMessage
+ case object GetLog extends TransactionLogMessage
+
+ class HelloWorld extends Actor with Serializable {
+ var log = ""
+ println("Creating HelloWorld log =======> " + log)
+ def receive = {
+ case Count(nr) ⇒
+ log += nr.toString
+ println("Message to HelloWorld log =======> " + log)
+ self.reply("World from node [" + Config.nodename + "]")
+ case GetLog ⇒
+ self.reply(Log(log))
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterTestNode {
+ import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ node.start()
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
+ node.isInUseOnNode("hello-world") must be(true)
+ actorRef.address must be("hello-world")
+ var counter = 0
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterClusterTestNode {
+ import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ node.start()
+ }
+
+ Thread.sleep(5000) // wait for fail-over from node1 to node2
+
+ barrier("check-fail-over-to-node2", NrOfNodes - 1) {
+ // both remaining nodes should now have the replica
+ node.isInUseOnNode("hello-world") must be(true)
+ val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ actorRef.address must be("hello-world")
+ (actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
+ }
+
+ node.shutdown()
+ }
+ }
+
+ override def onReady() {
+ LocalBookKeeperEnsemble.start()
+ }
+
+ override def onShutdown() {
+ TransactionLog.shutdown()
+ LocalBookKeeperEnsemble.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
new file mode 100644
index 0000000000..470c4c7a33
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf
@@ -0,0 +1,8 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
new file mode 100644
index 0000000000..5fb92ab01f
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
new file mode 100644
index 0000000000..1f15db7c7c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.replication.transactionlog.writethrough.nosnapshot
+
+import akka.actor._
+import akka.cluster._
+import Cluster._
+import akka.config.Config
+
+object ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec {
+ var NrOfNodes = 2
+
+ sealed trait TransactionLogMessage extends Serializable
+ case class Count(nr: Int) extends TransactionLogMessage
+ case class Log(full: String) extends TransactionLogMessage
+ case object GetLog extends TransactionLogMessage
+
+ class HelloWorld extends Actor with Serializable {
+ var log = ""
+ def receive = {
+ case Count(nr) ⇒
+ log += nr.toString
+ self.reply("World from node [" + Config.nodename + "]")
+ case GetLog ⇒
+ self.reply(Log(log))
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends ClusterTestNode {
+ import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ node.start()
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
+ node.isInUseOnNode("hello-world") must be(true)
+ actorRef.address must be("hello-world")
+ var counter = 0
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends MasterClusterTestNode {
+ import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ node.start()
+ }
+
+ Thread.sleep(5000) // wait for fail-over from node1 to node2
+
+ barrier("check-fail-over-to-node2", NrOfNodes - 1) {
+ // both remaining nodes should now have the replica
+ node.isInUseOnNode("hello-world") must be(true)
+ val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ actorRef.address must be("hello-world")
+ (actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
+ }
+
+ node.shutdown()
+ }
+ }
+
+ override def onReady() {
+ LocalBookKeeperEnsemble.start()
+ }
+
+ override def onShutdown() {
+ TransactionLog.shutdown()
+ LocalBookKeeperEnsemble.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf
new file mode 100644
index 0000000000..470c4c7a33
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf
@@ -0,0 +1,8 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf
new file mode 100644
index 0000000000..5fb92ab01f
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 1000
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala
new file mode 100644
index 0000000000..10fc3883dc
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.replication.transactionlog.writethrough.nosnapshot
+
+import akka.actor._
+import akka.cluster._
+import Cluster._
+import akka.config.Config
+
+object ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec {
+ var NrOfNodes = 2
+
+ sealed trait TransactionLogMessage extends Serializable
+ case class Count(nr: Int) extends TransactionLogMessage
+ case class Log(full: String) extends TransactionLogMessage
+ case object GetLog extends TransactionLogMessage
+
+ class HelloWorld extends Actor with Serializable {
+ var log = ""
+ def receive = {
+ case Count(nr) ⇒
+ log += nr.toString
+ self.reply("World from node [" + Config.nodename + "]")
+ case GetLog ⇒
+ self.reply(Log(log))
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends ClusterTestNode {
+ import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ node.start()
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
+ node.isInUseOnNode("hello-world") must be(true)
+ actorRef.address must be("hello-world")
+ var counter = 0
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends MasterClusterTestNode {
+ import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ node.start()
+ }
+
+ Thread.sleep(5000) // wait for fail-over from node1 to node2
+
+ barrier("check-fail-over-to-node2", NrOfNodes - 1) {
+ // both remaining nodes should now have the replica
+ node.isInUseOnNode("hello-world") must be(true)
+ val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ actorRef.address must be("hello-world")
+ (actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
+ }
+
+ node.shutdown()
+ }
+ }
+
+ override def onReady() {
+ LocalBookKeeperEnsemble.start()
+ }
+
+ override def onShutdown() {
+ TransactionLog.shutdown()
+ LocalBookKeeperEnsemble.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf
new file mode 100644
index 0000000000..1d332847b6
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 7
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf
new file mode 100644
index 0000000000..1d332847b6
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf
@@ -0,0 +1,7 @@
+akka.enabled-modules = ["cluster"]
+akka.event-handler-level = "DEBUG"
+akka.actor.deployment.hello-world.router = "direct"
+akka.actor.deployment.hello-world.clustered.replicas = 1
+akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
+akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
+akka.cluster.replication.snapshot-frequency = 7
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala
new file mode 100644
index 0000000000..a7fbc7b4f1
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.replication.transactionlog.writethrough.snapshot
+
+import akka.actor._
+import akka.cluster._
+import Cluster._
+import akka.config.Config
+
+object ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec {
+ var NrOfNodes = 2
+
+ sealed trait TransactionLogMessage extends Serializable
+ case class Count(nr: Int) extends TransactionLogMessage
+ case class Log(full: String) extends TransactionLogMessage
+ case object GetLog extends TransactionLogMessage
+
+ class HelloWorld extends Actor with Serializable {
+ var log = ""
+ println("Creating HelloWorld log =======> " + log)
+ def receive = {
+ case Count(nr) ⇒
+ log += nr.toString
+ println("Message to HelloWorld log =======> " + log)
+ self.reply("World from node [" + Config.nodename + "]")
+ case GetLog ⇒
+ self.reply(Log(log))
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends ClusterTestNode {
+ import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ node.start()
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ val actorRef = Actor.actorOf[HelloWorld]("hello-world").start()
+ node.isInUseOnNode("hello-world") must be(true)
+ actorRef.address must be("hello-world")
+ var counter = 0
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ counter += 1
+ (actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ }
+
+ node.shutdown()
+ }
+ }
+}
+
+class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterClusterTestNode {
+ import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A cluster" must {
+
+ "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
+
+ barrier("start-node1", NrOfNodes) {
+ }
+
+ barrier("create-actor-on-node1", NrOfNodes) {
+ }
+
+ barrier("start-node2", NrOfNodes) {
+ node.start()
+ }
+
+ Thread.sleep(5000) // wait for fail-over from node1 to node2
+
+ barrier("check-fail-over-to-node2", NrOfNodes - 1) {
+ // both remaining nodes should now have the replica
+ node.isInUseOnNode("hello-world") must be(true)
+ val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
+ actorRef.address must be("hello-world")
+ (actorRef ? GetLog).as[Log].get must be(Log("0123456789"))
+ }
+
+ node.shutdown()
+ }
+ }
+
+ override def onReady() {
+ LocalBookKeeperEnsemble.start()
+ }
+
+ override def onShutdown() {
+ TransactionLog.shutdown()
+ LocalBookKeeperEnsemble.shutdown()
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
index 3053174bef..0a5f18c2b9 100644
--- a/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
@@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.preferred-nodes = ["host:node1"]
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
\ No newline at end of file
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
index d859f695ea..753ea97bf7 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala
@@ -18,8 +18,7 @@ import com.eaio.uuid.UUID
* @author Roland Kuhn
* @since 1.1
*/
-class TestActorRef[T <: Actor](factory: () ⇒ T, address: String)
- extends LocalActorRef(factory, address, DeploymentConfig.Transient) {
+class TestActorRef[T <: Actor](factory: () ⇒ T, address: String) extends LocalActorRef(factory, address) {
dispatcher = CallingThreadDispatcher.global
receiveTimeout = None
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index f6ac1e3fe9..d224fbe42d 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -40,13 +40,12 @@ akka {
service-ping { # stateless actor with replication factor 3 and round-robin load-balancer
- format = "akka.serialization.Format$Default$" # serializer for messages and actor instance
-
router = "least-cpu" # routing (load-balance) scheme to use
# available: "direct", "round-robin", "random",
# "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class
# default is "direct";
+ # if 'replication' is used then the only available router is "direct"
clustered { # makes the actor available in the cluster registry
# default (if omitted) is local non-clustered actor
@@ -56,16 +55,17 @@ akka {
# available: "host:", "ip:" and "node:"
# default is "host:localhost"
- replicas = 3 # number of actor replicas in the cluster
- # available: positivoe integer (0-N) or the string "auto" for auto-scaling
+ replicas = 3 # number of actor instances in the cluster
+ # available: positive integer (0-N) or the string "auto" for auto-scaling
# if "auto" is used then 'home' has no meaning
# default is '0', meaning no replicas;
- # if the "direct" router is used then this configuration element is ignored
+ # if the "direct" router is used then this element is ignored (always '1')
- replication { # use replication or not?
+ replication { # use replication or not? only makes sense for a stateful actor
# FIXME should we have this config option here? If so, implement it all through.
- serialize-mailbox = on # should the actor mailbox be part of the serialized snapshot?
+ serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot?
+ # default is 'off'
storage = "transaction-log" # storage model for replication
# available: "transaction-log" and "data-grid"
@@ -189,6 +189,8 @@ akka {
secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
# or using 'akka.util.Crypt.generateSecureCookie'
+ log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files
+
replication {
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
password = "secret" # FIXME: store open in file?