From 85e742373a14319befa8c0c664b4d4e7559dfc10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 7 Jun 2011 20:10:08 -0700 Subject: [PATCH] - Made ClusterActorRef not extends RemoteActorRef anymore - Refactored and cleaned up Transaction Log initialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/actor/Actor.scala | 2 +- .../src/main/scala/akka/actor/ActorRef.scala | 12 ++-- .../main/scala/akka/actor/ActorRegistry.scala | 2 +- .../src/main/scala/akka/actor/Deployer.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 2 +- .../scala/akka/util/ReflectiveAccess.scala | 18 +++-- .../src/main/scala/akka/cluster/Cluster.scala | 11 ++- .../scala/akka/cluster/ClusterActorRef.scala | 69 +++++++++++++++++-- .../scala/akka/cluster/ClusterDeployer.scala | 4 +- .../scala/akka/cluster/TransactionLog.scala | 37 +++++++--- .../scala/akka/cluster/ReplicationSpec.scala | 48 ++++++------- 11 files changed, 143 insertions(+), 64 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 887cebc379..5e9e311c7a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -411,7 +411,7 @@ object Actor extends ListenerManagement { .getOrElse(throw new ConfigurationException( "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) } else { - // FIXME later managed different 'storage' as well + // FIXME later manage different 'storage' (data grid) as well storeActorAndGetClusterRef(strategy, serializer) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6cdcf2f637..2665c5e8cc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -553,14 +553,14 @@ class LocalActorRef private[akka] ( case _ ⇒ true } - // FIXME how to get the matching serializerClassName? Now default is used + // FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot private val serializer = Actor.serializerFor(address, Format.defaultSerializerName) private lazy val txLog: TransactionLog = { val log = replicationStrategy match { case Transient ⇒ throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]") - case WriteThrough ⇒ transactionLog.newLogFor(_uuid.toString, false) - case WriteBehind ⇒ transactionLog.newLogFor(_uuid.toString, true) + case WriteThrough ⇒ transactionLog.newLogFor(_uuid.toString, false, replicationStrategy, serializer) + case WriteBehind ⇒ transactionLog.newLogFor(_uuid.toString, true, replicationStrategy, serializer) } EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]" @@ -773,7 +773,7 @@ class LocalActorRef private[akka] ( } finally { guard.lock.unlock() if (isReplicated) { - txLog.recordEntry(messageHandle, this, serializer) + txLog.recordEntry(messageHandle, this) } } } @@ -1038,8 +1038,6 @@ private[akka] case class RemoteActorRef private[akka] ( timeout = _timeout - // FIXME BAD, we should not have different ActorRefs - start() def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { @@ -1073,8 +1071,6 @@ private[akka] case class RemoteActorRef private[akka] ( } // ==== NOT SUPPORTED ==== - @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") - def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher) { unsupported } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 383c6d9545..c591b50d70 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -116,7 +116,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag } /** - * View over the local actor registry. + * Projection over the local actor registry. */ class LocalActorRegistry( private val actorsByAddress: ConcurrentHashMap[String, ActorRef], diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 44c2e34582..25cf92f318 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -143,7 +143,7 @@ object DeploymentConfig { def isHomeNode(home: Home): Boolean = home match { case Host(hostname) ⇒ hostname == Config.hostname - case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing + case IP(address) ⇒ address == "0.0.0.0" || address == "127.0.0.1" // FIXME look up IP address from the system case Node(nodename) ⇒ nodename == Config.nodename } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index f260cf39b4..b1c0f6e747 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -154,7 +154,7 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory { * @author Jonas Bonér */ object MonitorableThread { - val DEFAULT_NAME = "MonitorableThread" + val DEFAULT_NAME = "MonitorableThread".intern // FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring val created = new AtomicInteger diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index dd2971c5d5..336593af23 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ -import DeploymentConfig.Deploy +import DeploymentConfig.{ Deploy, ReplicationStrategy } import akka.event.EventHandler import akka.serialization.Format import akka.cluster.ClusterNode @@ -108,13 +108,23 @@ object ReflectiveAccess { } type TransactionLogObject = { - def newLogFor(id: String, isAsync: Boolean): TransactionLog - def logFor(id: String, isAsync: Boolean): TransactionLog + def newLogFor( + id: String, + isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer): TransactionLog + + def logFor( + id: String, + isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer): TransactionLog + def shutdown() } type TransactionLog = { - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, serializer: Serializer) + def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) def recordEntry(entry: Array[Byte]) def recordSnapshot(snapshot: Array[Byte]) def entries: Vector[Array[Byte]] diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 99697a2bfd..b516aa6e47 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -618,7 +618,7 @@ class DefaultClusterNode private[akka] ( val actorRegistryPath = actorRegistryPathFor(uuid) // create UUID -> Array[Byte] for actor registry - if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store in Data Grid not ZooKeeper + if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper else { zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() { def call: Either[String, Exception] = { @@ -789,14 +789,15 @@ class DefaultClusterNode private[akka] ( isConnected ifOn { EventHandler.debug(this, "Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid)) + val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) .setActorUuid(uuidToUuidProtocol(uuid)) .build + membershipNodes foreach { node ⇒ replicaConnections.get(node) foreach { - case (_, connection) ⇒ - connection ! command + case (_, connection) ⇒ connection ! command } } } @@ -1304,7 +1305,7 @@ class DefaultClusterNode private[akka] ( homeAddress.setAccessible(true) homeAddress.set(actor, Some(remoteServerAddress)) - remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here? + remoteService.register(actorAddress, actor) } } @@ -1547,8 +1548,6 @@ object RemoteClusterDaemon { val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build } -// FIXME supervise RemoteClusterDaemon - /** * @author Jonas Bonér */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index b7d5df6f10..40b3dbc133 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -7,11 +7,15 @@ import Cluster._ import akka.actor._ import Actor._ +import akka.dispatch._ +import akka.util._ +import ReflectiveAccess._ +import ClusterModule._ import akka.event.EventHandler -import akka.dispatch.Promise import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference +import java.util.{ Map ⇒ JMap } import com.eaio.uuid.UUID @@ -20,16 +24,20 @@ import com.eaio.uuid.UUID */ class ClusterActorRef private[akka] ( inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], - actorAddress: String, - timeout: Long) - extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef - this: ClusterActorRef with Router.Router ⇒ + val address: String, + _timeout: Long) + extends ActorRef with ScalaActorRef { this: Router.Router ⇒ + + timeout = _timeout private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]]( (Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) { - case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(actorAddress, inetSocketAddress)) + case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress)) }) + ClusterModule.ensureEnabled() + start() + def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = @@ -55,4 +63,53 @@ class ClusterActorRef private[akka] ( private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = { RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) } + + def start(): ActorRef = synchronized { + _status = ActorRefInternals.RUNNING + this + } + + def stop() { + synchronized { + if (_status == ActorRefInternals.RUNNING) { + _status = ActorRefInternals.SHUTDOWN + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + } + } + } + + // ==== NOT SUPPORTED ==== + // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated + def dispatcher_=(md: MessageDispatcher) { + unsupported + } + def dispatcher: MessageDispatcher = unsupported + def link(actorRef: ActorRef) { + unsupported + } + def unlink(actorRef: ActorRef) { + unsupported + } + def startLink(actorRef: ActorRef): ActorRef = unsupported + def supervisor: Option[ActorRef] = unsupported + def linkedActors: JMap[Uuid, ActorRef] = unsupported + protected[akka] def mailbox: AnyRef = unsupported + protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + unsupported + } + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + protected[akka] def invoke(messageHandle: MessageInvocation) { + unsupported + } + protected[akka] def supervisor_=(sup: Option[ActorRef]) { + unsupported + } + protected[akka] def actorInstance: AtomicReference[Actor] = unsupported + private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index 136f75463b..070a52d96b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -177,11 +177,11 @@ object ClusterDeployer { case e: NullPointerException ⇒ handleError(new DeploymentException( "Could not store deployment data [" + deployment + - "] in ZooKeeper since client session is closed")) + "] in ZooKeeper since client session is closed")) case e: Exception ⇒ handleError(new DeploymentException( "Could not store deployment data [" + - deployment + "] in ZooKeeper due to: " + e)) + deployment + "] in ZooKeeper due to: " + e)) } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 8658ba1825..7efec7df23 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -51,7 +51,9 @@ class ReplicationException(message: String) extends AkkaException(message) class TransactionLog private ( ledger: LedgerHandle, val id: String, - val isAsync: Boolean) { + val isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer) { import TransactionLog._ @@ -65,12 +67,12 @@ class TransactionLog private ( /** * TODO document method */ - def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, format: Serializer) { + def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) { if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) { val snapshot = // FIXME ReplicationStrategy Transient is always used - if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, Transient)(format)) - else toBinary(actorRef, false, Transient)(format) + if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationStrategy)(format)) + else toBinary(actorRef, false, replicationStrategy)(format) recordSnapshot(snapshot) } recordEntry(MessageSerializer.serialize(messageHandle.message).toByteArray) @@ -365,8 +367,13 @@ object TransactionLog { (bk, zk) } - private[akka] def apply(ledger: LedgerHandle, id: String, isAsync: Boolean = false) = - new TransactionLog(ledger, id, isAsync) + private[akka] def apply( + ledger: LedgerHandle, + id: String, + isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer) = + new TransactionLog(ledger, id, isAsync, replicationStrategy, format) /** * Shuts down the transaction log. @@ -387,7 +394,12 @@ object TransactionLog { /** * TODO document method */ - def newLogFor(id: String, isAsync: Boolean = false): TransactionLog = { + def newLogFor( + id: String, + isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer): TransactionLog = { + val txLogPath = transactionLogNode + "/" + id val ledger = try { @@ -431,13 +443,18 @@ object TransactionLog { } EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id)) - TransactionLog(ledger, id, isAsync) + TransactionLog(ledger, id, isAsync, replicationStrategy, format) } /** * TODO document method */ - def logFor(id: String, isAsync: Boolean = false): TransactionLog = { + def logFor( + id: String, + isAsync: Boolean, + replicationStrategy: ReplicationStrategy, + format: Serializer): TransactionLog = { + val txLogPath = transactionLogNode + "/" + id val logId = try { @@ -476,7 +493,7 @@ object TransactionLog { case e ⇒ handleError(e) } - TransactionLog(ledger, id, isAsync) + TransactionLog(ledger, id, isAsync, replicationStrategy, format) } private[akka] def await[T](future: Promise[T]): T = { diff --git a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala index 831807a7a2..79b706c9ea 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala @@ -32,31 +32,31 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "A Transaction Log" should { "be able to record entries - synchronous" in { val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid) + val txlog = TransactionLog.newLogFor(uuid, false, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog.recordEntry(entry) } "be able to record and delete entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.delete txlog1.close - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid)) + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, Format.Default)) } "be able to record entries and read entries with 'entriesInRange' - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid) + val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) entries(0) must equal("hello") @@ -66,7 +66,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries and read entries with 'entries' - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) @@ -74,7 +74,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid) + val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) entries(0) must equal("hello") @@ -86,7 +86,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record a snapshot - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) txlog1.close @@ -94,7 +94,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and read a snapshot and following entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) @@ -105,7 +105,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid) + val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -120,7 +120,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid) + val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) @@ -134,7 +134,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll txlog1.recordEntry(entry) txlog1.close - val txlog2 = TransactionLog.logFor(uuid) + val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -149,7 +149,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "A Transaction Log" should { "be able to record entries - asynchronous" in { val uuid = (new UUID).toString - val txlog = TransactionLog.newLogFor(uuid, true) + val txlog = TransactionLog.newLogFor(uuid, true, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog.recordEntry(entry) Thread.sleep(100) @@ -158,24 +158,24 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and delete entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) txlog1.delete Thread.sleep(100) - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true)) + intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, Format.Default)) } "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true) + val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(2) entries(0) must equal("hello") @@ -186,7 +186,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries and read entries with 'entries' - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) txlog1.recordEntry(entry) @@ -195,7 +195,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true) + val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8")) entries.size must equal(4) entries(0) must equal("hello") @@ -208,7 +208,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record a snapshot - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) Thread.sleep(100) @@ -217,7 +217,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record and read a snapshot and following entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val snapshot = "snapshot".getBytes("UTF-8") txlog1.recordSnapshot(snapshot) @@ -229,7 +229,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true) + val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") @@ -245,7 +245,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll "be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in { val uuid = (new UUID).toString - val txlog1 = TransactionLog.newLogFor(uuid, true) + val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default) val entry = "hello".getBytes("UTF-8") txlog1.recordEntry(entry) @@ -258,7 +258,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll Thread.sleep(100) txlog1.close - val txlog2 = TransactionLog.logFor(uuid, true) + val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default) val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot new String(snapshotAsBytes, "UTF-8") must equal("snapshot") val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))