From 34c838d0f4506b6a087955f30d67697a8cd42cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 8 Jul 2011 19:35:27 +0200 Subject: [PATCH] 1. Completed replication over BookKeeper based transaction log with configurable actor snapshotting every X message. 2. Completed replay of of transaction log on all replicated actors on migration after node crash. 3. Added end to end tests for write behind and write through replication and replay on fail-over. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../scala/akka/cluster/ClusterInterface.scala | 4 +- .../scala/akka/util/ReflectiveAccess.scala | 4 +- .../src/main/scala/akka/cluster/Cluster.scala | 69 +++++++--- .../scala/akka/cluster/TransactionLog.scala | 73 ++++++----- .../src/main/scala/akka/cluster/untitled | 41 ++++++ .../serialization/SerializationProtocol.scala | 23 +++- .../akka/cluster/TransactionLogSpec.scala | 8 +- ...LogWriteBehindNoSnapshotMultiJvmNode1.conf | 7 + ...LogWriteBehindNoSnapshotMultiJvmNode1.opts | 1 + ...LogWriteBehindNoSnapshotMultiJvmNode2.conf | 7 + ...LogWriteBehindNoSnapshotMultiJvmNode2.opts | 1 + ...LogWriteBehindNoSnapshotMultiJvmSpec.scala | 118 +++++++++++++++++ ...onLogWriteBehindSnapshotMultiJvmNode1.conf | 7 + ...onLogWriteBehindSnapshotMultiJvmNode1.opts | 1 + ...onLogWriteBehindSnapshotMultiJvmNode2.conf | 7 + ...onLogWriteBehindSnapshotMultiJvmNode2.opts | 1 + ...onLogWriteBehindSnapshotMultiJvmSpec.scala | 120 ++++++++++++++++++ ...ogWriteThroughNoSnapshotMultiJvmNode1.conf | 8 ++ ...ogWriteThroughNoSnapshotMultiJvmNode1.opts | 1 + ...ogWriteThroughNoSnapshotMultiJvmNode2.conf | 7 + ...ogWriteThroughNoSnapshotMultiJvmNode2.opts | 1 + ...ogWriteThroughNoSnapshotMultiJvmSpec.scala | 118 +++++++++++++++++ ...nLogWriteThroughSnapshotMultiJvmNode1.conf | 7 + ...nLogWriteThroughSnapshotMultiJvmNode1.opts | 1 + ...nLogWriteThroughSnapshotMultiJvmNode2.conf | 7 + ...nLogWriteThroughSnapshotMultiJvmNode2.opts | 1 + ...nLogWriteThroughSnapshotMultiJvmSpec.scala | 120 ++++++++++++++++++ 27 files changed, 702 insertions(+), 61 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/untitled create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index be86c87b4d..714207458c 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -316,13 +316,13 @@ trait ClusterNode { * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Option[ActorRef] + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] /** * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] /** * Using (checking out) actor on a specific set of nodes. diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 856f339339..b1bfe83466 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -123,13 +123,13 @@ object ReflectiveAccess { } type TransactionLog = { - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) + def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) def recordEntry(entry: Array[Byte]) def recordSnapshot(snapshot: Array[Byte]) def entries: Vector[Array[Byte]] def entriesFromLatestSnapshot: Tuple2[Array[Byte], Vector[Array[Byte]]] def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] - def latestSnapshotAndSubsequentEntries: (Array[Byte], Vector[Array[Byte]]) + def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]]) def latestEntryId: Long def latestSnapshotId: Long def delete() diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1dc5aac97c..772d614264 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -22,9 +22,6 @@ import scala.collection.immutable.{ HashMap, HashSet } import scala.collection.mutable.ConcurrentMap import scala.collection.JavaConversions._ -import ClusterProtocol._ -import RemoteDaemonMessageType._ - import akka.util._ import Helpers._ @@ -42,12 +39,16 @@ import akka.config.{ Config, Supervision } import Supervision._ import Config._ -import akka.serialization.{ Serialization, Serializer, Compression } +import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization } +import ActorSerialization._ import Compression.LZF -import akka.AkkaException import akka.cluster.zookeeper._ -import akka.cluster.ChangeListener._ +import ChangeListener._ +import ClusterProtocol._ +import RemoteDaemonMessageType._ + +import akka.AkkaException import com.eaio.uuid.UUID @@ -742,20 +743,20 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, serializerForActor(actorAddress)) + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress)) /** * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) { + def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) { val nodeName = nodeAddress.nodeName ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName))) val actorFactoryPath = actorAddressRegistryPathFor(actorAddress) - zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ ActorRef]]() { - def call: Either[Exception, () ⇒ ActorRef] = { + zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ LocalActorRef]]() { + def call: Either[Exception, () ⇒ LocalActorRef] = { try { val actorFactoryBytes = @@ -763,9 +764,9 @@ class DefaultClusterNode private[akka] ( else zkClient.connection.readData(actorFactoryPath, new Stat, false) val actorFactory = - Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ ActorRef], None) match { + Serialization.deserialize(actorFactoryBytes, classOf[() ⇒ LocalActorRef], None) match { case Left(error) ⇒ throw error - case Right(instance) ⇒ instance.asInstanceOf[() ⇒ ActorRef] + case Right(instance) ⇒ instance.asInstanceOf[() ⇒ LocalActorRef] } Right(actorFactory) @@ -1716,8 +1717,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { if (message.hasActorAddress) { val actorAddress = message.getActorAddress cluster.serializerForActor(actorAddress) foreach { serializer ⇒ - cluster.use(actorAddress, serializer) foreach { actor ⇒ - cluster.remoteService.register(actorAddress, actor) + cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ + cluster.remoteService.register(actorAddress, newActorRef) if (message.hasReplicateActorFromUuid) { // replication is used - fetch the messages and replay them @@ -1735,10 +1736,37 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { // get the transaction log for the actor UUID val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) - // deserialize all messages - val entriesAsBytes = txLog.entries - // val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries // FIXME should work equally good if not a snapshot has been taken yet. => return all entries + // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries + // deserialize and restore actor snapshot + val actorRefToUseForReplay = + snapshotAsBytes match { + + // we have a new actor ref - the snapshot + case Some(bytes) ⇒ + // stop the new actor ref and use the snapshot instead + cluster.remoteService.unregister(actorAddress) + + // deserialize the snapshot actor ref and register it as remote actor + val uncompressedBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + + val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() + cluster.remoteService.register(actorAddress, snapshotActorRef) + + // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) + //newActorRef.stop() + + snapshotActorRef + + // we have no snapshot - use the new actor ref + case None ⇒ + newActorRef + } + + // deserialize the messages val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒ val messageBytes = if (Cluster.shouldCompressData) LZF.uncompress(bytes) @@ -1746,13 +1774,16 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) } - // replay all messages EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + // replay all messages messages foreach { message ⇒ EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) - actor ! message // FIXME how to handle '?' messages??? + + // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? + actorRefToUseForReplay ! message } + } catch { case e: Throwable ⇒ EventHandler.error(e, this, e.toString) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index f6d17f6238..7a15673754 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -19,7 +19,7 @@ import akka.event.EventHandler import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation } import akka.remote.MessageSerializer import akka.cluster.zookeeper._ -import akka.serialization.{ Serializer, Compression } +import akka.serialization.{ Serializer, Serialization, Compression } import Compression.LZF import akka.serialization.ActorSerialization._ @@ -54,22 +54,17 @@ class TransactionLog private ( val logId = ledger.getId val txLogPath = transactionLogNode + "/" + id val snapshotPath = txLogPath + "/snapshot" - val nrOfEntries = new AtomicLong(0) private val isOpen = new Switch(true) /** * Record an Actor message invocation. */ - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) { - if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) { - val snapshot = - // FIXME ReplicationStrategy Transient is always used - if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationScheme)) - else toBinary(actorRef, false, replicationScheme) - recordSnapshot(snapshot) - } - recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) + def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) { + val entryId = ledger.getLastAddPushed + 1 + if (entryId != 0 && (entryId % snapshotFrequency) == 0) { + recordSnapshot(toBinary(actorRef, false, replicationScheme)) + } else recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) } /** @@ -77,8 +72,9 @@ class TransactionLog private ( */ def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { - val bytes = if (Cluster.shouldCompressData) LZF.compress(entry) - else entry + val bytes = + if (Cluster.shouldCompressData) LZF.compress(entry) + else entry try { if (isAsync) { ledger.asyncAddEntry( @@ -110,8 +106,9 @@ class TransactionLog private ( */ def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { - val bytes = if (Cluster.shouldCompressData) LZF.compress(snapshot) - else snapshot + val bytes = + if (Cluster.shouldCompressData) LZF.compress(snapshot) + else snapshot try { if (isAsync) { ledger.asyncAddEntry( @@ -120,16 +117,20 @@ class TransactionLog private ( def addComplete( returnCode: Int, ledgerHandle: LedgerHandle, - entryId: Long, + snapshotId: Long, ctx: AnyRef) { handleReturnCode(returnCode) - storeSnapshotMetaDataInZooKeeper(entryId) + EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) + storeSnapshotMetaDataInZooKeeper(snapshotId) } }, null) } else { handleReturnCode(ledger.addEntry(bytes)) - storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed) + val snapshotId = ledger.getLastAddPushed + + EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) + storeSnapshotMetaDataInZooKeeper(snapshotId) } } catch { case e ⇒ handleError(e) @@ -145,11 +146,25 @@ class TransactionLog private ( /** * Get the latest snapshot and all subsequent entries from this snapshot. */ - def latestSnapshotAndSubsequentEntries: (Array[Byte], Vector[Array[Byte]]) = { - val snapshotId = latestSnapshotId - EventHandler.debug(this, - "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId)) - (entriesInRange(snapshotId, snapshotId).head, entriesInRange(snapshotId + 1, ledger.getLastAddConfirmed)) + def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]]) = { + latestSnapshotId match { + case Some(snapshotId) ⇒ + EventHandler.debug(this, "Reading entries from snapshot id [%s] for log [%s]".format(snapshotId, logId)) + + val cursor = snapshotId + 1 + val lastIndex = ledger.getLastAddConfirmed + + val snapshot = Some(entriesInRange(snapshotId, snapshotId).head) + + val entries = + if ((cursor - lastIndex) == 0) Vector.empty[Array[Byte]] + else entriesInRange(cursor, lastIndex) + + (snapshot, entries) + + case None ⇒ + (None, entries) + } } /** @@ -173,8 +188,10 @@ class TransactionLog private ( ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] val entries = toByteArrays(enumeration) + if (returnCode == BKException.Code.OK) future.completeWithResult(entries) else future.completeWithException(BKException.create(returnCode)) } @@ -197,17 +214,15 @@ class TransactionLog private ( /** * Get the id for the last snapshot written to this transaction log. */ - def latestSnapshotId: Long = { + def latestSnapshotId: Option[Long] = { try { val snapshotId = zkClient.readData(snapshotPath).asInstanceOf[Long] EventHandler.debug(this, "Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId)) - snapshotId + Some(snapshotId) } catch { - case e: ZkNoNodeException ⇒ - handleError(new ReplicationException( - "Transaction log for UUID [" + id + "] does not have a snapshot recorded in ZooKeeper")) - case e ⇒ handleError(e) + case e: ZkNoNodeException ⇒ None + case e ⇒ handleError(e) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/untitled b/akka-cluster/src/main/scala/akka/cluster/untitled new file mode 100644 index 0000000000..ec128ad190 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/untitled @@ -0,0 +1,41 @@ + +diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +index b7183ca..c267bc6 100644 +--- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala ++++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +@@ -107,7 +107,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(4) +@@ -136,7 +136,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + + val txlog2 = TransactionLog.logFor(uuid, false, null) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + entries.size must equal(2) +@@ -251,7 +251,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + Thread.sleep(200) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + Thread.sleep(200) +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + Thread.sleep(200) +@@ -290,7 +290,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA + Thread.sleep(200) + val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries + Thread.sleep(200) +- new String(snapshotAsBytes, "UTF-8") must equal("snapshot") ++ new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") + val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) + Thread.sleep(200) + entries.size must equal(2) \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index dc06e79038..cd64a83067 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -20,6 +20,8 @@ import java.net.InetSocketAddress import com.google.protobuf.ByteString +import com.eaio.uuid.UUID + /** * Module for local actor serialization. */ @@ -27,10 +29,13 @@ object ActorSerialization { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = - fromBinaryToLocalActorRef(bytes, Some(homeAddress)) + fromBinaryToLocalActorRef(bytes, None, Some(homeAddress)) + + def fromBinary[T <: Actor](bytes: Array[Byte], uuid: UUID): ActorRef = + fromBinaryToLocalActorRef(bytes, Some(uuid), None) def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef = - fromBinaryToLocalActorRef(bytes, None) + fromBinaryToLocalActorRef(bytes, None, None) def toBinary[T <: Actor]( a: ActorRef, @@ -126,13 +131,16 @@ object ActorSerialization { private def fromBinaryToLocalActorRef[T <: Actor]( bytes: Array[Byte], + uuid: Option[UUID], homeAddress: Option[InetSocketAddress]): ActorRef = { val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - fromProtobufToLocalActorRef(builder.build, None) + fromProtobufToLocalActorRef(builder.build, uuid, None) } private[akka] def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + protocol: SerializedActorRefProtocol, + overriddenUuid: Option[UUID], + loader: Option[ClassLoader]): ActorRef = { val lifeCycle = if (protocol.hasLifeCycle) { @@ -196,8 +204,13 @@ object ActorSerialization { } } + val actorUuid = overriddenUuid match { + case Some(uuid) ⇒ uuid + case None ⇒ uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow) + } + val ar = new LocalActorRef( - uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), + actorUuid, protocol.getAddress, if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None, diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala index b7183ca805..c267bc6f98 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala @@ -107,7 +107,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA val txlog2 = TransactionLog.logFor(uuid, false, null) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) @@ -136,7 +136,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA val txlog2 = TransactionLog.logFor(uuid, false, null) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) @@ -251,7 +251,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA Thread.sleep(200) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries Thread.sleep(200) - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) Thread.sleep(200) @@ -290,7 +290,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA Thread.sleep(200) val (snapshotAsBytes, entriesAsBytes) = txlog2.latestSnapshotAndSubsequentEntries Thread.sleep(200) - new String(snapshotAsBytes, "UTF-8") must equal("snapshot") + new String(snapshotAsBytes.getOrElse(fail("No snapshot")), "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8")) Thread.sleep(200) entries.size must equal(2) diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..d8bee0cb07 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..d8bee0cb07 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..7ed05307ae --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writebehind.nosnapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + def receive = { + case Count(nr) ⇒ + log += nr.toString + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..8aeaf3135f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..8aeaf3135f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-behind" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..c37a863ba0 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writebehind.snapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + println("Creating HelloWorld log =======> " + log) + def receive = { + case Count(nr) ⇒ + log += nr.toString + println("Message to HelloWorld log =======> " + log) + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..470c4c7a33 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf @@ -0,0 +1,8 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 + +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..5fb92ab01f --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..10fc3883dc --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writethrough.nosnapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + def receive = { + case Count(nr) ⇒ + log += nr.toString + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf new file mode 100644 index 0000000000..1d332847b6 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf new file mode 100644 index 0000000000..1d332847b6 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.conf @@ -0,0 +1,7 @@ +akka.enabled-modules = ["cluster"] +akka.event-handler-level = "DEBUG" +akka.actor.deployment.hello-world.router = "direct" +akka.actor.deployment.hello-world.clustered.replicas = 1 +akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" +akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" +akka.cluster.replication.snapshot-frequency = 7 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala new file mode 100644 index 0000000000..a7fbc7b4f1 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.replication.transactionlog.writethrough.snapshot + +import akka.actor._ +import akka.cluster._ +import Cluster._ +import akka.config.Config + +object ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec { + var NrOfNodes = 2 + + sealed trait TransactionLogMessage extends Serializable + case class Count(nr: Int) extends TransactionLogMessage + case class Log(full: String) extends TransactionLogMessage + case object GetLog extends TransactionLogMessage + + class HelloWorld extends Actor with Serializable { + var log = "" + println("Creating HelloWorld log =======> " + log) + def receive = { + case Count(nr) ⇒ + log += nr.toString + println("Message to HelloWorld log =======> " + log) + self.reply("World from node [" + Config.nodename + "]") + case GetLog ⇒ + self.reply(Log(log)) + } + } +} + +class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends ClusterTestNode { + import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._ + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + node.start() + } + + barrier("create-actor-on-node1", NrOfNodes) { + val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() + node.isInUseOnNode("hello-world") must be(true) + actorRef.address must be("hello-world") + var counter = 0 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + counter += 1 + (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + } + + barrier("start-node2", NrOfNodes) { + } + + node.shutdown() + } + } +} + +class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterClusterTestNode { + import ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec._ + + val testNodes = NrOfNodes + + "A cluster" must { + + "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { + + barrier("start-node1", NrOfNodes) { + } + + barrier("create-actor-on-node1", NrOfNodes) { + } + + barrier("start-node2", NrOfNodes) { + node.start() + } + + Thread.sleep(5000) // wait for fail-over from node1 to node2 + + barrier("check-fail-over-to-node2", NrOfNodes - 1) { + // both remaining nodes should now have the replica + node.isInUseOnNode("hello-world") must be(true) + val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) + actorRef.address must be("hello-world") + (actorRef ? GetLog).as[Log].get must be(Log("0123456789")) + } + + node.shutdown() + } + } + + override def onReady() { + LocalBookKeeperEnsemble.start() + } + + override def onShutdown() { + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() + } +}