diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 887cebc379..5e9e311c7a 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -411,7 +411,7 @@ object Actor extends ListenerManagement {
.getOrElse(throw new ConfigurationException(
"Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
- // FIXME later managed different 'storage' as well
+ // FIXME later manage different 'storage' (data grid) as well
storeActorAndGetClusterRef(strategy, serializer)
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 6cdcf2f637..2665c5e8cc 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -553,14 +553,14 @@ class LocalActorRef private[akka] (
case _ ⇒ true
}
- // FIXME how to get the matching serializerClassName? Now default is used
+ // FIXME how to get the matching serializerClassName? Now default is used. Needed for transaction log snapshot
private val serializer = Actor.serializerFor(address, Format.defaultSerializerName)
private lazy val txLog: TransactionLog = {
val log = replicationStrategy match {
case Transient ⇒ throw new IllegalStateException("Can not replicate 'transient' actor [" + toString + "]")
- case WriteThrough ⇒ transactionLog.newLogFor(_uuid.toString, false)
- case WriteBehind ⇒ transactionLog.newLogFor(_uuid.toString, true)
+ case WriteThrough ⇒ transactionLog.newLogFor(_uuid.toString, false, replicationStrategy, serializer)
+ case WriteBehind ⇒ transactionLog.newLogFor(_uuid.toString, true, replicationStrategy, serializer)
}
EventHandler.debug(this,
"Creating a transaction log for Actor [%s] with replication strategy [%s]"
@@ -773,7 +773,7 @@ class LocalActorRef private[akka] (
} finally {
guard.lock.unlock()
if (isReplicated) {
- txLog.recordEntry(messageHandle, this, serializer)
+ txLog.recordEntry(messageHandle, this)
}
}
}
@@ -1038,8 +1038,6 @@ private[akka] case class RemoteActorRef private[akka] (
timeout = _timeout
- // FIXME BAD, we should not have different ActorRefs
-
start()
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
@@ -1073,8 +1071,6 @@ private[akka] case class RemoteActorRef private[akka] (
}
// ==== NOT SUPPORTED ====
- @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
- def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
index 383c6d9545..c591b50d70 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala
@@ -116,7 +116,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
}
/**
- * View over the local actor registry.
+ * Projection over the local actor registry.
*/
class LocalActorRegistry(
private val actorsByAddress: ConcurrentHashMap[String, ActorRef],
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 44c2e34582..25cf92f318 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -143,7 +143,7 @@ object DeploymentConfig {
def isHomeNode(home: Home): Boolean = home match {
case Host(hostname) ⇒ hostname == Config.hostname
- case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing
+ case IP(address) ⇒ address == "0.0.0.0" || address == "127.0.0.1" // FIXME look up IP address from the system
case Node(nodename) ⇒ nodename == Config.nodename
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index f260cf39b4..b1c0f6e747 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -154,7 +154,7 @@ class MonitorableThreadFactory(val name: String) extends ThreadFactory {
* @author Jonas Bonér
*/
object MonitorableThread {
- val DEFAULT_NAME = "MonitorableThread"
+ val DEFAULT_NAME = "MonitorableThread".intern
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
val created = new AtomicInteger
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index dd2971c5d5..336593af23 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -8,7 +8,7 @@ import akka.dispatch.{ Future, Promise, MessageInvocation }
import akka.config.{ Config, ModuleNotAvailableException }
import akka.remoteinterface.RemoteSupport
import akka.actor._
-import DeploymentConfig.Deploy
+import DeploymentConfig.{ Deploy, ReplicationStrategy }
import akka.event.EventHandler
import akka.serialization.Format
import akka.cluster.ClusterNode
@@ -108,13 +108,23 @@ object ReflectiveAccess {
}
type TransactionLogObject = {
- def newLogFor(id: String, isAsync: Boolean): TransactionLog
- def logFor(id: String, isAsync: Boolean): TransactionLog
+ def newLogFor(
+ id: String,
+ isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer): TransactionLog
+
+ def logFor(
+ id: String,
+ isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer): TransactionLog
+
def shutdown()
}
type TransactionLog = {
- def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, serializer: Serializer)
+ def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef)
def recordEntry(entry: Array[Byte])
def recordSnapshot(snapshot: Array[Byte])
def entries: Vector[Array[Byte]]
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index 99697a2bfd..b516aa6e47 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -618,7 +618,7 @@ class DefaultClusterNode private[akka] (
val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry
- if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store in Data Grid not ZooKeeper
+ if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper
else {
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
def call: Either[String, Exception] = {
@@ -789,14 +789,15 @@ class DefaultClusterNode private[akka] (
isConnected ifOn {
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
+
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
+
membershipNodes foreach { node ⇒
replicaConnections.get(node) foreach {
- case (_, connection) ⇒
- connection ! command
+ case (_, connection) ⇒ connection ! command
}
}
}
@@ -1304,7 +1305,7 @@ class DefaultClusterNode private[akka] (
homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress))
- remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
+ remoteService.register(actorAddress, actor)
}
}
@@ -1547,8 +1548,6 @@ object RemoteClusterDaemon {
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
}
-// FIXME supervise RemoteClusterDaemon
-
/**
* @author Jonas Bonér
*/
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index b7d5df6f10..40b3dbc133 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -7,11 +7,15 @@ import Cluster._
import akka.actor._
import Actor._
+import akka.dispatch._
+import akka.util._
+import ReflectiveAccess._
+import ClusterModule._
import akka.event.EventHandler
-import akka.dispatch.Promise
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
+import java.util.{ Map ⇒ JMap }
import com.eaio.uuid.UUID
@@ -20,16 +24,20 @@ import com.eaio.uuid.UUID
*/
class ClusterActorRef private[akka] (
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
- actorAddress: String,
- timeout: Long)
- extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef
- this: ClusterActorRef with Router.Router ⇒
+ val address: String,
+ _timeout: Long)
+ extends ActorRef with ScalaActorRef { this: Router.Router ⇒
+
+ timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
(Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) {
- case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(actorAddress, inetSocketAddress))
+ case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(address, inetSocketAddress))
})
+ ClusterModule.ensureEnabled()
+ start()
+
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
@@ -55,4 +63,53 @@ class ClusterActorRef private[akka] (
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
}
+
+ def start(): ActorRef = synchronized {
+ _status = ActorRefInternals.RUNNING
+ this
+ }
+
+ def stop() {
+ synchronized {
+ if (_status == ActorRefInternals.RUNNING) {
+ _status = ActorRefInternals.SHUTDOWN
+ postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
+ }
+ }
+ }
+
+ // ==== NOT SUPPORTED ====
+ // FIXME move these methods and the same ones in RemoteActorRef to a base class - now duplicated
+ def dispatcher_=(md: MessageDispatcher) {
+ unsupported
+ }
+ def dispatcher: MessageDispatcher = unsupported
+ def link(actorRef: ActorRef) {
+ unsupported
+ }
+ def unlink(actorRef: ActorRef) {
+ unsupported
+ }
+ def startLink(actorRef: ActorRef): ActorRef = unsupported
+ def supervisor: Option[ActorRef] = unsupported
+ def linkedActors: JMap[Uuid, ActorRef] = unsupported
+ protected[akka] def mailbox: AnyRef = unsupported
+ protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
+ protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
+ unsupported
+ }
+ protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ unsupported
+ }
+ protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ unsupported
+ }
+ protected[akka] def invoke(messageHandle: MessageInvocation) {
+ unsupported
+ }
+ protected[akka] def supervisor_=(sup: Option[ActorRef]) {
+ unsupported
+ }
+ protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
+ private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index 136f75463b..070a52d96b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -177,11 +177,11 @@ object ClusterDeployer {
case e: NullPointerException ⇒
handleError(new DeploymentException(
"Could not store deployment data [" + deployment +
- "] in ZooKeeper since client session is closed"))
+ "] in ZooKeeper since client session is closed"))
case e: Exception ⇒
handleError(new DeploymentException(
"Could not store deployment data [" +
- deployment + "] in ZooKeeper due to: " + e))
+ deployment + "] in ZooKeeper due to: " + e))
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
index 8658ba1825..7efec7df23 100644
--- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
@@ -51,7 +51,9 @@ class ReplicationException(message: String) extends AkkaException(message)
class TransactionLog private (
ledger: LedgerHandle,
val id: String,
- val isAsync: Boolean) {
+ val isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer) {
import TransactionLog._
@@ -65,12 +67,12 @@ class TransactionLog private (
/**
* TODO document method
*/
- def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef, format: Serializer) {
+ def recordEntry(messageHandle: MessageInvocation, actorRef: ActorRef) {
if (nrOfEntries.incrementAndGet % snapshotFrequency == 0) {
val snapshot =
// FIXME ReplicationStrategy Transient is always used
- if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, Transient)(format))
- else toBinary(actorRef, false, Transient)(format)
+ if (Cluster.shouldCompressData) LZF.compress(toBinary(actorRef, false, replicationStrategy)(format))
+ else toBinary(actorRef, false, replicationStrategy)(format)
recordSnapshot(snapshot)
}
recordEntry(MessageSerializer.serialize(messageHandle.message).toByteArray)
@@ -365,8 +367,13 @@ object TransactionLog {
(bk, zk)
}
- private[akka] def apply(ledger: LedgerHandle, id: String, isAsync: Boolean = false) =
- new TransactionLog(ledger, id, isAsync)
+ private[akka] def apply(
+ ledger: LedgerHandle,
+ id: String,
+ isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer) =
+ new TransactionLog(ledger, id, isAsync, replicationStrategy, format)
/**
* Shuts down the transaction log.
@@ -387,7 +394,12 @@ object TransactionLog {
/**
* TODO document method
*/
- def newLogFor(id: String, isAsync: Boolean = false): TransactionLog = {
+ def newLogFor(
+ id: String,
+ isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer): TransactionLog = {
+
val txLogPath = transactionLogNode + "/" + id
val ledger = try {
@@ -431,13 +443,18 @@ object TransactionLog {
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
- TransactionLog(ledger, id, isAsync)
+ TransactionLog(ledger, id, isAsync, replicationStrategy, format)
}
/**
* TODO document method
*/
- def logFor(id: String, isAsync: Boolean = false): TransactionLog = {
+ def logFor(
+ id: String,
+ isAsync: Boolean,
+ replicationStrategy: ReplicationStrategy,
+ format: Serializer): TransactionLog = {
+
val txLogPath = transactionLogNode + "/" + id
val logId = try {
@@ -476,7 +493,7 @@ object TransactionLog {
case e ⇒ handleError(e)
}
- TransactionLog(ledger, id, isAsync)
+ TransactionLog(ledger, id, isAsync, replicationStrategy, format)
}
private[akka] def await[T](future: Promise[T]): T = {
diff --git a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala
index 831807a7a2..79b706c9ea 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala
@@ -32,31 +32,31 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"A Transaction Log" should {
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog = TransactionLog.newLogFor(uuid)
+ val txlog = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
- intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid))
+ intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null, Format.Default))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid)
+ val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
@@ -66,7 +66,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries and read entries with 'entries' - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
@@ -74,7 +74,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid)
+ val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
@@ -86,7 +86,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record a snapshot - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
txlog1.close
@@ -94,7 +94,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and read a snapshot and following entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@@ -105,7 +105,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid)
+ val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@@ -120,7 +120,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - synchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid)
+ val txlog1 = TransactionLog.newLogFor(uuid, false, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -134,7 +134,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
txlog1.recordEntry(entry)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid)
+ val txlog2 = TransactionLog.logFor(uuid, false, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@@ -149,7 +149,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"A Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog = TransactionLog.newLogFor(uuid, true)
+ val txlog = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
Thread.sleep(100)
@@ -158,24 +158,24 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and delete entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.delete
Thread.sleep(100)
- intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true))
+ intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null, Format.Default))
}
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
Thread.sleep(100)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true)
+ val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val entries = txlog2.entriesInRange(0, 1).map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(2)
entries(0) must equal("hello")
@@ -186,7 +186,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries and read entries with 'entries' - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
@@ -195,7 +195,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true)
+ val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val entries = txlog2.entries.map(bytes ⇒ new String(bytes, "UTF-8"))
entries.size must equal(4)
entries(0) must equal("hello")
@@ -208,7 +208,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record a snapshot - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
Thread.sleep(100)
@@ -217,7 +217,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record and read a snapshot and following entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val snapshot = "snapshot".getBytes("UTF-8")
txlog1.recordSnapshot(snapshot)
@@ -229,7 +229,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true)
+ val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
@@ -245,7 +245,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
"be able to record entries then a snapshot then more entries - and then read from the snapshot and the following entries - asynchronous" in {
val uuid = (new UUID).toString
- val txlog1 = TransactionLog.newLogFor(uuid, true)
+ val txlog1 = TransactionLog.newLogFor(uuid, true, null, Format.Default)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
@@ -258,7 +258,7 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
Thread.sleep(100)
txlog1.close
- val txlog2 = TransactionLog.logFor(uuid, true)
+ val txlog2 = TransactionLog.logFor(uuid, true, null, Format.Default)
val (snapshotAsBytes, entriesAsBytes) = txlog2.toByteArraysLatestSnapshot
new String(snapshotAsBytes, "UTF-8") must equal("snapshot")
val entries = entriesAsBytes.map(bytes ⇒ new String(bytes, "UTF-8"))