From d8390a61f6ba34ee9f547cd004cc082370d3bdea Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 8 Sep 2011 15:54:06 +0200 Subject: [PATCH 1/2] #1180 - moving the Java API to Futures and Scala API to Future --- .../test/scala/akka/dispatch/FutureSpec.scala | 20 +- .../src/main/scala/akka/dispatch/Future.scala | 230 +++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 2 +- akka-docs/scala/futures.rst | 4 +- 4 files changed, 128 insertions(+), 128 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 4763ab4f92..cf40212e35 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -291,15 +291,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5)) - Futures.firstCompletedOf(futures).get must be(5) + Future.firstCompletedOf(futures).get must be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } - val result = Futures.find[Int](_ == 3)(futures) + val result = Future.find[Int](_ == 3)(futures) result.get must be(Some(3)) - val notFound = Futures.find[Int](_ == 11)(futures) + val notFound = Future.find[Int](_ == 11)(futures) notFound.get must be(None) } @@ -311,7 +311,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).get must be(45) + Future.fold(0, timeout)(futures)(_ + _).get must be(45) } "fold by composing" in { @@ -338,7 +338,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") + Future.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -346,7 +346,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { val fs = (0 to 1000) map (i ⇒ Future(i, 10000)) - val result = Futures.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { + val result = Future.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l }.get.asInstanceOf[ArrayBuffer[Int]].sum @@ -358,7 +358,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "return zero value if folding empty list" in { - Futures.fold(0)(List[Future[Int]]())(_ + _).get must be(0) + Future.fold(0)(List[Future[Int]]())(_ + _).get must be(0) } "shouldReduceResults" in { @@ -369,7 +369,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).get === 45) + assert(Future.reduce(futures, timeout)(_ + _).get === 45) } "shouldReduceResultsWithException" in { @@ -386,13 +386,13 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") + assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } } "shouldReduceThrowIAEOnEmptyInput" in { filterException[IllegalArgumentException] { - intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get } + intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 1a801c6123..8b0c4db5fc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -64,45 +64,13 @@ object Futures { def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher, timeout) - /** - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { - val futureResult = new DefaultPromise[T](timeout) - - val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) - futures.foreach(_ onComplete completeFirst) - - futureResult - } - - /** - * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate - */ - def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { - if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) - else { - val result = new DefaultPromise[Option[T]](timeout) - val ref = new AtomicInteger(futures.size) - val search: Future[T] ⇒ Unit = f ⇒ try { - f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) - } finally { - if (ref.decrementAndGet == 0) - result completeWithResult None - } - futures.foreach(_ onComplete search) - - result - } - } - /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = { val pred: T ⇒ Boolean = predicate.apply(_) - find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) + Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) } /** @@ -110,59 +78,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) - - /** - * A non-blocking fold over the specified futures. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - * Example: - *
-   *   val result = Futures.fold(0)(futures)(_ + _).await.result
-   * 
- */ - def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { - if (futures.isEmpty) { - new KeptPromise[R](Right(zero)) - } else { - val result = new DefaultPromise[R](timeout) - val results = new ConcurrentLinkedQueue[T]() - val done = new Switch(false) - val allDone = futures.size - - val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? - f.value.get match { - case Right(value) ⇒ - val added = results add value - if (added && results.size == allDone) { //Only one thread can get here - if (done.switchOn) { - try { - val i = results.iterator - var currentValue = zero - while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } - result completeWithResult currentValue - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - result completeWithException e - } finally { - results.clear - } - } - } - case Left(exception) ⇒ - if (done.switchOn) { - result completeWithException exception - results.clear - } - } - } - - futures foreach { _ onComplete aggregate } - result - } - } + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) /** * Java API @@ -172,50 +88,24 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun) def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun) - /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - * Example: - *
-   *   val result = Futures.reduce(futures)(_ + _).await.result
-   * 
- */ - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { - if (futures.isEmpty) - new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) - else { - val result = new DefaultPromise[R](timeout) - val seedFound = new AtomicBoolean(false) - val seedFold: Future[T] ⇒ Unit = f ⇒ { - if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold - f.value.get match { - case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) - case Left(exception) ⇒ result.completeWithException(exception) - } - } - } - for (f ← futures) f onComplete seedFold //Attach the listener to the Futures - result - } - } - /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun) /** * Java API. - * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. + * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = @@ -298,6 +188,116 @@ object Future { def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = sequence(in)(cbf, timeout) + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { + val futureResult = new DefaultPromise[T](timeout) + + val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) + futures.foreach(_ onComplete completeFirst) + + futureResult + } + + /** + * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate + */ + def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { + if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) + else { + val result = new DefaultPromise[Option[T]](timeout) + val ref = new AtomicInteger(futures.size) + val search: Future[T] ⇒ Unit = f ⇒ try { + f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) + } finally { + if (ref.decrementAndGet == 0) + result completeWithResult None + } + futures.foreach(_ onComplete search) + + result + } + } + + /** + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * Example: + *
+   *   val result = Futures.fold(0)(futures)(_ + _).await.result
+   * 
+ */ + def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { + if (futures.isEmpty) { + new KeptPromise[R](Right(zero)) + } else { + val result = new DefaultPromise[R](timeout) + val results = new ConcurrentLinkedQueue[T]() + val done = new Switch(false) + val allDone = futures.size + + val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? + f.value.get match { + case Right(value) ⇒ + val added = results add value + if (added && results.size == allDone) { //Only one thread can get here + if (done.switchOn) { + try { + val i = results.iterator + var currentValue = zero + while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } + result completeWithResult currentValue + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + result completeWithException e + } finally { + results.clear + } + } + } + case Left(exception) ⇒ + if (done.switchOn) { + result completeWithException exception + results.clear + } + } + } + + futures foreach { _ onComplete aggregate } + result + } + } + + /** + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + *
+   *   val result = Futures.reduce(futures)(_ + _).await.result
+   * 
+ */ + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { + if (futures.isEmpty) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) + else { + val result = new DefaultPromise[R](timeout) + val seedFound = new AtomicBoolean(false) + val seedFold: Future[T] ⇒ Unit = f ⇒ { + if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold + f.value.get match { + case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) + case Left(exception) ⇒ result.completeWithException(exception) + } + } + } + for (f ← futures) f onComplete seedFold //Attach the listener to the Futures + result + } + } + /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 3e862b0950..2ef59edca1 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -574,5 +574,5 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { */ class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter { - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results) + protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) } diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 4ae489bd22..77dff1856c 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -199,7 +199,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.fold(0)(futures)(_ + _) + val futureSum = Future.fold(0)(futures)(_ + _) That's all it takes! @@ -210,7 +210,7 @@ If the sequence passed to ``fold`` is empty, it will return the start-value, in val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.reduce(futures)(_ + _) + val futureSum = Future.reduce(futures)(_ + _) Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again. From 0430e284a67e5df6bb59b27f76d1169135ad5869 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 8 Sep 2011 17:19:27 +0200 Subject: [PATCH 2/2] Get the (non-multi-jvm) cluster tests working again - avoid initialising the cluster in TransactionLog - allow TransactionLog to be started and shutdown multiple times - correct the startup and shutdown in the transaction log tests --- .../scala/akka/cluster/TransactionLog.scala | 73 +++++++++++-------- .../AsynchronousTransactionLogSpec.scala | 11 ++- .../SynchronousTransactionLogSpec.scala | 10 +-- .../scala/akka/testkit/TestActorRefSpec.scala | 3 + config/akka.test.conf | 2 +- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index f785723bec..a99fedaa74 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -84,7 +84,7 @@ class TransactionLog private ( def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { val entryBytes = - if (Cluster.shouldCompressData) LZF.compress(entry) + if (shouldCompressData) LZF.compress(entry) else entry try { @@ -118,7 +118,7 @@ class TransactionLog private ( def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { val snapshotBytes = - if (Cluster.shouldCompressData) LZF.compress(snapshot) + if (shouldCompressData) LZF.compress(snapshot) else snapshot try { @@ -311,7 +311,7 @@ class TransactionLog private ( while (enumeration.hasMoreElements) { val bytes = enumeration.nextElement.getEntry val entry = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) + if (shouldCompressData) LZF.uncompress(bytes) else bytes entries = entries :+ entry } @@ -356,6 +356,10 @@ class TransactionLog private ( */ object TransactionLog { + val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") + val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt + val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt + val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match { case "CRC32" ⇒ BookKeeper.DigestType.CRC32 case "MAC" ⇒ BookKeeper.DigestType.MAC @@ -367,40 +371,17 @@ object TransactionLog { val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2) val snapshotFrequency = config.getInt("akka.cluster.replication.snapshot-frequency", 1000) val timeout = Duration(config.getInt("akka.cluster.replication.timeout", 30), TIME_UNIT).toMillis + val shouldCompressData = config.getBool("akka.cluster.use-compression", false) private[akka] val transactionLogNode = "/transaction-log-ids" private val isConnected = new Switch(false) - private[akka] lazy val (bookieClient, zkClient) = { - val bk = new BookKeeper(Cluster.zooKeeperServers) + @volatile + private[akka] var bookieClient: BookKeeper = _ - val zk = new AkkaZkClient( - Cluster.zooKeeperServers, - Cluster.sessionTimeout, - Cluster.connectionTimeout, - Cluster.defaultZooKeeperSerializer) - - try { - zk.create(transactionLogNode, null, CreateMode.PERSISTENT) - } catch { - case e: ZkNodeExistsException ⇒ {} // do nothing - case e: Throwable ⇒ handleError(e) - } - - EventHandler.info(this, - ("Transaction log service started with" + - "\n\tdigest type [%s]" + - "\n\tensemble size [%s]" + - "\n\tquorum size [%s]" + - "\n\tlogging time out [%s]").format( - digestType, - ensembleSize, - quorumSize, - timeout)) - isConnected.switchOn - (bk, zk) - } + @volatile + private[akka] var zkClient: AkkaZkClient = _ private[akka] def apply( ledger: LedgerHandle, @@ -409,6 +390,34 @@ object TransactionLog { replicationScheme: ReplicationScheme) = new TransactionLog(ledger, id, isAsync, replicationScheme) + /** + * Starts up the transaction log. + */ + def start(): Unit = { + isConnected switchOn { + bookieClient = new BookKeeper(zooKeeperServers) + zkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout) + + try { + zkClient.create(transactionLogNode, null, CreateMode.PERSISTENT) + } catch { + case e: ZkNodeExistsException ⇒ {} // do nothing + case e: Throwable ⇒ handleError(e) + } + + EventHandler.info(this, + ("Transaction log service started with" + + "\n\tdigest type [%s]" + + "\n\tensemble size [%s]" + + "\n\tquorum size [%s]" + + "\n\tlogging time out [%s]").format( + digestType, + ensembleSize, + quorumSize, + timeout)) + } + } + /** * Shuts down the transaction log. */ @@ -575,10 +584,12 @@ object LocalBookKeeperEnsemble { */ def start() { isRunning switchOn { + EventHandler.info(this, "Starting up LocalBookKeeperEnsemble...") localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize) localBookKeeper.runZookeeper(port) localBookKeeper.initializeZookeper() localBookKeeper.runBookies() + EventHandler.info(this, "LocalBookKeeperEnsemble started up successfully") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala index c4d0e68a94..a43f6be62a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -49,8 +51,10 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "fail to be opened if non existing - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to overweite an existing txlog if one already exists - asynchronous" in { @@ -67,6 +71,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "be able to record and delete entries - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, true, null) Thread.sleep(200) @@ -78,6 +83,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef txlog1.delete Thread.sleep(200) intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { @@ -214,14 +220,11 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef override def beforeAll() = { LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala index 539a235e36..9bfb5a0257 100644 --- a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -33,8 +35,10 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } "fail to be opened if non existing - synchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to be checked for existence - synchronous" in { @@ -175,16 +179,12 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } override def beforeAll() = { - LocalCluster.startLocalCluster() LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 856298bd28..1f30b61968 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -232,10 +232,13 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac EventHandler.notify(TestEvent.Mute(filter)) val log = TestActorRef[Logger] EventHandler.addListener(log) + val eventHandlerLevel = EventHandler.level + EventHandler.level = EventHandler.WarningLevel boss link ref val la = log.underlyingActor la.count must be(1) la.msg must (include("supervisor") and include("CallingThreadDispatcher")) + EventHandler.level = eventHandlerLevel EventHandler.removeListener(log) EventHandler.notify(TestEvent.UnMute(filter)) } diff --git a/config/akka.test.conf b/config/akka.test.conf index 2f540f8c9b..59f9e520cf 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -6,5 +6,5 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] - event-handler-level = "INFO" + event-handler-level = "ERROR" }