diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index be86c87b4d..714207458c 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -316,13 +316,13 @@ trait ClusterNode { * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Option[ActorRef] + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] /** * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] /** * Using (checking out) actor on a specific set of nodes. diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 856f339339..b1bfe83466 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -123,13 +123,13 @@ object ReflectiveAccess { } type TransactionLog = { - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) + def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) def recordEntry(entry: Array[Byte]) def recordSnapshot(snapshot: Array[Byte]) def entries: Vector[Array[Byte]] def entriesFromLatestSnapshot: Tuple2[Array[Byte], Vector[Array[Byte]]] def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] - def latestSnapshotAndSubsequentEntries: (Array[Byte], Vector[Array[Byte]]) + def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]]) def latestEntryId: Long def latestSnapshotId: Long def delete() diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1dc5aac97c..772d614264 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -22,9 +22,6 @@ import scala.collection.immutable.{ HashMap, HashSet } import scala.collection.mutable.ConcurrentMap import scala.collection.JavaConversions._ -import ClusterProtocol._ -import RemoteDaemonMessageType._ - import akka.util._ import Helpers._ @@ -42,12 +39,16 @@ import akka.config.{ Config, Supervision } import Supervision._ import Config._ -import akka.serialization.{ Serialization, Serializer, Compression } +import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization } +import ActorSerialization._ import Compression.LZF -import akka.AkkaException import akka.cluster.zookeeper._ -import akka.cluster.ChangeListener._ +import ChangeListener._ +import ClusterProtocol._ +import RemoteDaemonMessageType._ + +import akka.AkkaException import com.eaio.uuid.UUID @@ -742,20 +743,20 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, serializerForActor(actorAddress)) + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress)) /** * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) { + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) { val nodeName = nodeAddress.nodeName ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName))) val actorFactoryPath = actorAddressRegistryPathFor(actorAddress) - zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ ActorRef]]() { - def call: Either[Exception, () ⇒ ActorRef] = { + zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ LocalActorRef]]() { + def call: Either[Exception, () ⇒ LocalActorRef] = { try { val actorFactoryBytes = @@ -763,9 +764,9 @@ class DefaultClusterNode private[akka] ( else zkClient.connection.readData(actorFactoryPath, new Stat, false) val actorFactory = - Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ ActorRef], None) match { + Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match { case Left(error) ⇒ throw error - case Right(instance) ⇒ instance.asInstanceOf[() ⇒ ActorRef] + case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef] } Right(actorFactory) @@ -1716,8 +1717,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { if (message.hasActorAddress) { val actorAddress = message.getActorAddress cluster.serializerForActor(actorAddress) foreach { serializer ⇒ - cluster.use(actorAddress, serializer) foreach { actor ⇒ - cluster.remoteService.register(actorAddress, actor) + cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ + cluster.remoteService.register(actorAddress, newActorRef) if (message.hasReplicateActorFromUuid) { // replication is used - fetch the messages and replay them @@ -1735,10 +1736,37 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { // get the transaction log for the actor UUID val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) - // deserialize all messages - val entriesAsBytes = txLog.entries - // val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries // FIXME should work equally good if not a snapshot has been taken yet. => return all entries + // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries + // deserialize and restore actor snapshot + val actorRefToUseForReplay = + snapshotAsBytes match { + + // we have a new actor ref - the snapshot + case Some(bytes) ⇒ + // stop the new actor ref and use the snapshot instead + cluster.remoteService.unregister(actorAddress) + + // deserialize the snapshot actor ref and register it as remote actor + val uncompressedBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + + val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() + cluster.remoteService.register(actorAddress, snapshotActorRef) + + // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) + //newActorRef.stop() + + snapshotActorRef + + // we have no snapshot - use the new actor ref + case None ⇒ + newActorRef + } + + // deserialize the messages val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒ val messageBytes = if (Cluster.shouldCompressData) LZF.uncompress(bytes) @@ -1746,13 +1774,16 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) } - // replay all messages EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + // replay all messages messages foreach { message ⇒ EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) - actor ! message // FIXME how to handle '?' messages??? + + // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? + actorRefToUseForReplay ! message } + } catch { case e: Throwable ⇒ EventHandler.error(e, this, e.toString) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index f6d17f6238..7a15673754 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -19,7 +19,7 @@ import akka.event.EventHandler import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation } import akka.remote.MessageSerializer import akka.cluster.zookeeper._ -import akka.serialization.{ Serializer, Compression } +import akka.serialization.{ Serializer, Serialization, Compression } import Compression.LZF import akka.serialization.ActorSerialization._ @@ -54,22 +54,17 @@ class TransactionLog private ( val logId = ledger.getId val txLogPath = transactionLogNode + "/" + id val snapshotPath = txLogPath + "/snapshot" - val nrOfEntries = new AtomicLong(0) private val isOpen = new Switch(true) /** * Record an Actor message invocation. */ - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) { - if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) { - val snapshot = - // FIXME ReplicationStrategy Transient is always used - if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme)) - else toBinary(actorRef, false, replicationScheme) - recordSnapshot(snapshot) - } - recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) + def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) { + val entryId = ledger.getLastAddPushed + 1 + if (entryId != 0 && (entryId % snapshotFrequency) == 0) { + recordSnapshot(toBinary(actorRef, false, replicationScheme)) + } else recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) } /** @@ -77,8 +72,9 @@ class TransactionLog private ( */ def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { - val bytes = if (Cluster.shouldCompressData) LZF.compress(entry) - else entry + val bytes = + if (Cluster.shouldCompressData) LZF.compress(entry) + else entry try { if (isAsync) { ledger.asyncAddEntry( @@ -110,8 +106,9 @@ class TransactionLog private ( */ def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { - val bytes = if (Cluster.shouldCompressData) LZF.compress(snapshot) - else snapshot + val bytes = + if (Cluster.shouldCompressData) LZF.compress(snapshot) + else snapshot try { if (isAsync) { ledger.asyncAddEntry( @@ -120,16 +117,20 @@ class TransactionLog private ( def addComplete( returnCode: Int, ledgerHandle: LedgerHandle, - entryId: Long, + snapshotId: Long, ctx: AnyRef) { handleReturnCode(returnCode) - storeSnapshotMetaDataInZooKeeper(entryId) + EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) + storeSnapshotMetaDataInZooKeeper(snapshotId) } }, null) } else { handleReturnCode(ledger.addEntry(bytes)) - storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed) + val snapshotId = ledger.getLastAddPushed + + EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) + storeSnapshotMetaDataInZooKeeper(snapshotId) } } catch { case e ⇒ handleError(e) @@ -145,11 +146,25 @@ class TransactionLog private ( /** * Get the latest snapshot and all subsequent entries from this snapshot. */ - def latestSnapshotAndSubsequentEntries: (Array[Byte], Vector[Array[Byte]]) = { - val snapshotId = latestSnapshotId - EventHandler.debug(this, - "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId)) - (entriesInRange(snapshotId, snapshotId).head, entriesInRange(snapshotId + 1, ledger.getLastAddConfirmed)) + def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]]) = { + latestSnapshotId match { + case Some(snapshotId) ⇒ + EventHandler.debug(this, "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId)) + + val cursor = snapshotId + 1 + val lastIndex = ledger.getLastAddConfirmed + + val snapshot = Some(entriesInRange(snapshotId, snapshotId).head) + + val entries = + if ((cursor - lastIndex) == 0) Vector.empty[Array[Byte]] + else entriesInRange(cursor, lastIndex) + + (snapshot, entries) + + case None ⇒ + (None, entries) + } } /** @@ -173,8 +188,10 @@ class TransactionLog private ( ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] val entries = toByteArrays(enumeration) + if (returnCode == BKException.Code.OK) future.completeWithResult(entries) else future.completeWithException(BKException.create(returnCode)) } @@ -197,17 +214,15 @@ class TransactionLog private ( /** * Get the id for the last snapshot written to this transaction log. */ - def latestSnapshotId: Long = { + def latestSnapshotId: Option[Long] = { try { val snapshotId = zkClient.readData(snapshotPath).asInstanceOf[Long] EventHandler.debug(this, "Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId)) - snapshotId + Some(snapshotId) } catch { - case e: ZkNoNodeException ⇒ - handleError(new ReplicationException( - "Transaction log for UUID [" + id + "] does not have a snapshot recorded in ZooKeeper")) - case e ⇒ handleError(e) + case e: ZkNoNodeException ⇒ None + case e ⇒ handleError(e) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/untitled b/akka-cluster/src/main/scala/akka/cluster/untitled new file mode 100644 index 0000000000..ec128ad190 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/untitled @@ -0,0 +1,41 @@ + +diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +index b7183ca..c267bc6 100644 +--- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala ++++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +@@ -107,7 +107,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) +@@ -136,7 +136,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) +@@ -251,7 +251,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + Thread.sleep(200) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + Thread.sleep(200) +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + Thread.sleep(200) +@@ -290,7 +290,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + Thread.sleep(200) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + Thread.sleep(200) +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + Thread.sleep(200) + entries.size must equal(2) \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index dc06e79038..cd64a83067 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -20,6 +20,8 @@ import java.net.InetSocketAddress import com.google.protobuf.ByteString +import com.eaio.uuid.UUID + /** * Module for local actor serialization. */ @@ -27,10 +29,13 @@ object ActorSerialization { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = - fromBinaryToLocalActorRef(bytes, Some(homeAddress)) + fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) + + def fromBinary[T <: Actor](bytes: Array[Byte], uuid: UUID): ActorRef = + fromBinaryToLocalActorRef(bytes, Some(uuid), None) def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef = - fromBinaryToLocalActorRef(bytes, None) + fromBinaryToLocalActorRef(bytes, None, None) def toBinary[T <: Actor]( a: ActorRef, @@ -126,13 +131,16 @@ object ActorSerialization { private def fromBinaryToLocalActorRef[T <: Actor]( bytes: Array[Byte], + uuid: Option[UUID], homeAddress: Option[InetSocketAddress]): ActorRef = { val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - fromProtobufToLocalActorRef(builder.build, None) + fromProtobufToLocalActorRef(builder.build, uuid, None) } private[akka] def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + protocol: SerializedActorRefProtocol, + overriddenUuid: Option[UUID], + loader: Option[ClassLoader]): ActorRef = { val lifeCycle = if (protocol.hasLifeCycle) { @@ -196,8 +204,13 @@ object ActorSerialization { } } + val actorUuid = overriddenUuid match { + case Some(uuid) ⇒ uuid + case None ⇒ uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow) + } + val ar = new LocalActorRef( - uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), + actorUuid, protocol.getAddress, if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None, diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala index b7183ca805..c267bc6f98 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala @@ -107,7 +107,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA val txlog2 = TransactionLog.logFor(uuid, false, null) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) @@ -136,7 +136,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA val txlog2 = TransactionLog.logFor(uuid, false, null) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) @@ -251,7 +251,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA Thread.sleep(200) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries Thread.sleep(200) - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) Thread.sleep(200) @@ -290,7 +290,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA Thread.sleep(200) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries Thread.sleep(200) - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) Thread.sleep(200) entries.size must equal(2) diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..d8bee0cb07 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..d8bee0cb07 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..7ed05307ae --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writebehind.nosnapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + def receive = { + case Count(nr) ⇒ + log += nr.toString + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..8aeaf3135f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..8aeaf3135f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..c37a863ba0 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writebehind.snapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + println("Creating HelloWorld log =======> " + log) + def receive = { + case Count(nr) ⇒ + log += nr.toString + println("Message to HelloWorld log =======> " + log) + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..470c4c7a33 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf @@ -0,0 +1,8 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 + +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..5fb92ab01f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..10fc3883dc --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writethrough.nosnapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + def receive = { + case Count(nr) ⇒ + log += nr.toString + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..1d332847b6 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..1d332847b6 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..a7fbc7b4f1 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writethrough.snapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + println("Creating HelloWorld log =======> " + log) + def receive = { + case Count(nr) ⇒ + log += nr.toString + println("Message to HelloWorld log =======> " + log) + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +}