diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index f785723bec..a99fedaa74 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -84,7 +84,7 @@ class TransactionLog private ( def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { val entryBytes = - if (Cluster.shouldCompressData) LZF.compress(entry) + if (shouldCompressData) LZF.compress(entry) else entry try { @@ -118,7 +118,7 @@ class TransactionLog private ( def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { val snapshotBytes = - if (Cluster.shouldCompressData) LZF.compress(snapshot) + if (shouldCompressData) LZF.compress(snapshot) else snapshot try { @@ -311,7 +311,7 @@ class TransactionLog private ( while (enumeration.hasMoreElements) { val bytes = enumeration.nextElement.getEntry val entry = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) + if (shouldCompressData) LZF.uncompress(bytes) else bytes entries = entries :+ entry } @@ -356,6 +356,10 @@ class TransactionLog private ( */ object TransactionLog { + val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") + val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt + val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt + val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match { case "CRC32" ⇒ BookKeeper.DigestType.CRC32 case "MAC" ⇒ BookKeeper.DigestType.MAC @@ -367,40 +371,17 @@ object TransactionLog { val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2) val snapshotFrequency = config.getInt("akka.cluster.replication.snapshot-frequency", 1000) val timeout = Duration(config.getInt("akka.cluster.replication.timeout", 30), TIME_UNIT).toMillis + val shouldCompressData = config.getBool("akka.cluster.use-compression", false) private[akka] val transactionLogNode = "/transaction-log-ids" private val isConnected = new Switch(false) - private[akka] lazy val (bookieClient, zkClient) = { - val bk = new BookKeeper(Cluster.zooKeeperServers) + @volatile + private[akka] var bookieClient: BookKeeper = _ - val zk = new AkkaZkClient( - Cluster.zooKeeperServers, - Cluster.sessionTimeout, - Cluster.connectionTimeout, - Cluster.defaultZooKeeperSerializer) - - try { - zk.create(transactionLogNode, null, CreateMode.PERSISTENT) - } catch { - case e: ZkNodeExistsException ⇒ {} // do nothing - case e: Throwable ⇒ handleError(e) - } - - EventHandler.info(this, - ("Transaction log service started with" + - "\n\tdigest type [%s]" + - "\n\tensemble size [%s]" + - "\n\tquorum size [%s]" + - "\n\tlogging time out [%s]").format( - digestType, - ensembleSize, - quorumSize, - timeout)) - isConnected.switchOn - (bk, zk) - } + @volatile + private[akka] var zkClient: AkkaZkClient = _ private[akka] def apply( ledger: LedgerHandle, @@ -409,6 +390,34 @@ object TransactionLog { replicationScheme: ReplicationScheme) = new TransactionLog(ledger, id, isAsync, replicationScheme) + /** + * Starts up the transaction log. + */ + def start(): Unit = { + isConnected switchOn { + bookieClient = new BookKeeper(zooKeeperServers) + zkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout) + + try { + zkClient.create(transactionLogNode, null, CreateMode.PERSISTENT) + } catch { + case e: ZkNodeExistsException ⇒ {} // do nothing + case e: Throwable ⇒ handleError(e) + } + + EventHandler.info(this, + ("Transaction log service started with" + + "\n\tdigest type [%s]" + + "\n\tensemble size [%s]" + + "\n\tquorum size [%s]" + + "\n\tlogging time out [%s]").format( + digestType, + ensembleSize, + quorumSize, + timeout)) + } + } + /** * Shuts down the transaction log. */ @@ -575,10 +584,12 @@ object LocalBookKeeperEnsemble { */ def start() { isRunning switchOn { + EventHandler.info(this, "Starting up LocalBookKeeperEnsemble...") localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize) localBookKeeper.runZookeeper(port) localBookKeeper.initializeZookeper() localBookKeeper.runBookies() + EventHandler.info(this, "LocalBookKeeperEnsemble started up successfully") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala index c4d0e68a94..a43f6be62a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -49,8 +51,10 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "fail to be opened if non existing - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to overweite an existing txlog if one already exists - asynchronous" in { @@ -67,6 +71,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "be able to record and delete entries - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, true, null) Thread.sleep(200) @@ -78,6 +83,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef txlog1.delete Thread.sleep(200) intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { @@ -214,14 +220,11 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef override def beforeAll() = { LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala index 539a235e36..9bfb5a0257 100644 --- a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -33,8 +35,10 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } "fail to be opened if non existing - synchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to be checked for existence - synchronous" in { @@ -175,16 +179,12 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } override def beforeAll() = { - LocalCluster.startLocalCluster() LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 856298bd28..1f30b61968 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -232,10 +232,13 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac EventHandler.notify(TestEvent.Mute(filter)) val log = TestActorRef[Logger] EventHandler.addListener(log) + val eventHandlerLevel = EventHandler.level + EventHandler.level = EventHandler.WarningLevel boss link ref val la = log.underlyingActor la.count must be(1) la.msg must (include("supervisor") and include("CallingThreadDispatcher")) + EventHandler.level = eventHandlerLevel EventHandler.removeListener(log) EventHandler.notify(TestEvent.UnMute(filter)) } diff --git a/config/akka.test.conf b/config/akka.test.conf index 2f540f8c9b..59f9e520cf 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -6,5 +6,5 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] - event-handler-level = "INFO" + event-handler-level = "ERROR" }