diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ActorFireForgetRequestReplySpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala similarity index 97% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala index 040e06e62a..b1cfeba83f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRestartSpec.scala @@ -100,7 +100,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Kill within(1 second) { expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg } @@ -121,7 +120,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo tRef.underlyingActor must be(tActor) expectMsg((tActor, tRef)) tRef.stop() - expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg } @@ -139,7 +137,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Kill within(1 second) { expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg } @@ -159,7 +156,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Kill within(1 second) { expectMsg(("preRestart", Some(Kill), 1)) - expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ActorTimeoutSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/Bench.scala rename to akka-actor-tests/src/test/scala/akka/actor/Bench.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ChannelSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ChannelSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ChannelSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala similarity index 99% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala index 363a9df662..c014bc0ec5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala @@ -1,4 +1,4 @@ -package akka.actor.actor +package akka.actor import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/FSMActorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/FSMTimingSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ForwardActorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ForwardActorSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/HotSwapSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala rename to akka-actor-tests/src/test/scala/akka/actor/IOActor.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/LoggingReceiveSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/ReceiveTimeoutSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/RestartStrategySpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/misc/SchedulerSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorHierarchySpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala similarity index 99% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 242802589a..1f11eea777 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -371,7 +371,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach def receive = { case Ping ⇒ self.tryReply(PongMessage) - case Die ⇒ throw new Exception("expected") + case Die ⇒ throw new RuntimeException("Expected") } })) @@ -381,7 +381,7 @@ class SupervisorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach OneForOneStrategy(classOf[Exception] :: Nil, 3, 10000), Supervise(dyingActor, Permanent) :: Nil)) - intercept[Exception] { + intercept[RuntimeException] { (dyingActor.?(Die, TimeoutMillis)).get } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala similarity index 98% rename from akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala rename to akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index d10e6f1052..73943c40e6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2009-2011 Typesafe Inc. */ -package akka.actor.supervisor +package akka.actor import java.util.concurrent.{ CountDownLatch, TimeUnit } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/dispatch/BalancingDispatcherSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorsSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/dispatch/DispatcherActorsSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorsSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatchersSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala similarity index 94% rename from akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index aa477ce9a4..90948aba07 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -64,6 +64,7 @@ class PinnedActorSpec extends JUnitSuite { @Test def shouldSendReceiveException = { val actor = actorOf(Props[TestActor].withDispatcher(new PinnedDispatcher())) + EventHandler.notify(Mute(EventFilter[RuntimeException]("Expected exception; to test fault-tolerance"))) try { (actor ? "Failure").get fail("Should have thrown an exception") diff --git a/akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala similarity index 100% rename from akka-actor-tests/src/test/scala/akka/routing/ListenerSpec.scala rename to akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 4763ab4f92..cf40212e35 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -291,15 +291,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5)) - Futures.firstCompletedOf(futures).get must be(5) + Future.firstCompletedOf(futures).get must be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } - val result = Futures.find[Int](_ == 3)(futures) + val result = Future.find[Int](_ == 3)(futures) result.get must be(Some(3)) - val notFound = Futures.find[Int](_ == 11)(futures) + val notFound = Future.find[Int](_ == 11)(futures) notFound.get must be(None) } @@ -311,7 +311,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).get must be(45) + Future.fold(0, timeout)(futures)(_ + _).get must be(45) } "fold by composing" in { @@ -338,7 +338,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") + Future.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -346,7 +346,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { val fs = (0 to 1000) map (i ⇒ Future(i, 10000)) - val result = Futures.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { + val result = Future.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l }.get.asInstanceOf[ArrayBuffer[Int]].sum @@ -358,7 +358,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "return zero value if folding empty list" in { - Futures.fold(0)(List[Future[Int]]())(_ + _).get must be(0) + Future.fold(0)(List[Future[Int]]())(_ + _).get must be(0) } "shouldReduceResults" in { @@ -369,7 +369,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).get === 45) + assert(Future.reduce(futures, timeout)(_ + _).get === 45) } "shouldReduceResultsWithException" in { @@ -386,13 +386,13 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") + assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } } "shouldReduceThrowIAEOnEmptyInput" in { filterException[IllegalArgumentException] { - intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get } + intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get } } } diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket001Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket001Spec.scala index c22445e19d..662332316a 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket001Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket001Spec.scala @@ -1,4 +1,4 @@ -package akka.actor.ticket +package akka.ticket import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 98272849d3..28ee27bae3 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -386,30 +386,6 @@ object Actor { new LocalActorRef(props, address).start() } - def localActorOf[T <: Actor: Manifest]: ActorRef = { - newLocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString) - } - - def localActorOf[T <: Actor: Manifest](address: String): ActorRef = { - newLocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address) - } - - def localActorOf[T <: Actor](clazz: Class[T]): ActorRef = { - newLocalActorRef(clazz, newUuid().toString) - } - - def localActorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { - newLocalActorRef(clazz, address) - } - - def localActorOf[T <: Actor](factory: ⇒ T): ActorRef = { - new LocalActorRef(Props(creator = () ⇒ factory), newUuid().toString) - } - - def localActorOf[T <: Actor](factory: ⇒ T, address: String): ActorRef = { - new LocalActorRef(Props(creator = () ⇒ factory), address) - } - /** * Use to spawn out a block of code in an event-driven actor. Will shut actor down when * the block has been executed. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 8a7d271a1b..42d0223265 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -848,7 +848,6 @@ class LocalActorRef private[akka] (private[this] val props: Props, val address: val freshActor = newActor setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.preStart() freshActor.postRestart(reason) if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") } diff --git a/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala b/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala index 6abdd83b50..3568c50972 100644 --- a/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/RemoteInterface.scala @@ -183,7 +183,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule val eventHandler: ActorRef = { implicit object format extends StatelessActorFormat[RemoteEventHandler] val clazz = classOf[RemoteEventHandler] - val handler = Actor.localActorOf(clazz, clazz.getName).start() + val handler = Actor.actorOf(Props(clazz).withLocalOnly(true), clazz.getName) // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d2165d0290..59ff643a18 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -64,45 +64,13 @@ object Futures { def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher, timeout) - /** - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { - val futureResult = new DefaultPromise[T](timeout) - - val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) - futures.foreach(_ onComplete completeFirst) - - futureResult - } - - /** - * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate - */ - def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { - if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) - else { - val result = new DefaultPromise[Option[T]](timeout) - val ref = new AtomicInteger(futures.size) - val search: Future[T] ⇒ Unit = f ⇒ try { - f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) - } finally { - if (ref.decrementAndGet == 0) - result completeWithResult None - } - futures.foreach(_ onComplete search) - - result - } - } - /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = { val pred: T ⇒ Boolean = predicate.apply(_) - find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) + Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) } /** @@ -110,59 +78,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) - - /** - * A non-blocking fold over the specified futures. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - * Example: - *
-   *   val result = Futures.fold(0)(futures)(_ + _).await.result
-   * 
- */ - def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { - if (futures.isEmpty) { - new KeptPromise[R](Right(zero)) - } else { - val result = new DefaultPromise[R](timeout) - val results = new ConcurrentLinkedQueue[T]() - val done = new Switch(false) - val allDone = futures.size - - val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? - f.value.get match { - case Right(value) ⇒ - val added = results add value - if (added && results.size == allDone) { //Only one thread can get here - if (done.switchOn) { - try { - val i = results.iterator - var currentValue = zero - while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } - result completeWithResult currentValue - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - result completeWithException e - } finally { - results.clear - } - } - } - case Left(exception) ⇒ - if (done.switchOn) { - result completeWithException exception - results.clear - } - } - } - - futures foreach { _ onComplete aggregate } - result - } - } + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) /** * Java API @@ -172,50 +88,24 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun) def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun) - /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - * Example: - *
-   *   val result = Futures.reduce(futures)(_ + _).await.result
-   * 
- */ - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { - if (futures.isEmpty) - new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) - else { - val result = new DefaultPromise[R](timeout) - val seedFound = new AtomicBoolean(false) - val seedFold: Future[T] ⇒ Unit = f ⇒ { - if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold - f.value.get match { - case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) - case Left(exception) ⇒ result.completeWithException(exception) - } - } - } - for (f ← futures) f onComplete seedFold //Attach the listener to the Futures - result - } - } - /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun) /** * Java API. - * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. + * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = @@ -298,6 +188,116 @@ object Future { def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = sequence(in)(cbf, timeout) + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { + val futureResult = new DefaultPromise[T](timeout) + + val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) + futures.foreach(_ onComplete completeFirst) + + futureResult + } + + /** + * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate + */ + def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { + if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) + else { + val result = new DefaultPromise[Option[T]](timeout) + val ref = new AtomicInteger(futures.size) + val search: Future[T] ⇒ Unit = f ⇒ try { + f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) + } finally { + if (ref.decrementAndGet == 0) + result completeWithResult None + } + futures.foreach(_ onComplete search) + + result + } + } + + /** + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * Example: + *
+   *   val result = Futures.fold(0)(futures)(_ + _).await.result
+   * 
+ */ + def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { + if (futures.isEmpty) { + new KeptPromise[R](Right(zero)) + } else { + val result = new DefaultPromise[R](timeout) + val results = new ConcurrentLinkedQueue[T]() + val done = new Switch(false) + val allDone = futures.size + + val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? + f.value.get match { + case Right(value) ⇒ + val added = results add value + if (added && results.size == allDone) { //Only one thread can get here + if (done.switchOn) { + try { + val i = results.iterator + var currentValue = zero + while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } + result completeWithResult currentValue + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + result completeWithException e + } finally { + results.clear + } + } + } + case Left(exception) ⇒ + if (done.switchOn) { + result completeWithException exception + results.clear + } + } + } + + futures foreach { _ onComplete aggregate } + result + } + } + + /** + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + *
+   *   val result = Futures.reduce(futures)(_ + _).await.result
+   * 
+ */ + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { + if (futures.isEmpty) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) + else { + val result = new DefaultPromise[R](timeout) + val seedFound = new AtomicBoolean(false) + val seedFold: Future[T] ⇒ Unit = f ⇒ { + if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold + f.value.get match { + case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) + case Left(exception) ⇒ result.completeWithException(exception) + } + } + } + for (f ← futures) f onComplete seedFold //Attach the listener to the Futures + result + } + } + /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list @@ -397,9 +397,12 @@ sealed trait Future[+T] extends japi.Future[T] { /** * Blocks the current thread until the Future has been completed or the - * timeout has expired. The timeout will be the least value of 'atMost' and the timeout - * supplied at the constructuion of this Future. - * In the case of the timeout expiring a FutureTimeoutException will be thrown. + * timeout has expired, additionally bounding the waiting period according to + * the atMost parameter. The timeout will be the lesser value of + * 'atMost' and the timeout supplied at the constructuion of this Future. In + * the case of the timeout expiring a FutureTimeoutException will be thrown. + * Other callers of this method are not affected by the additional bound + * imposed by atMost. */ def await(atMost: Duration): Future[T] @@ -878,12 +881,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS) + if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else func(DefaultPromise.this) } } } - Scheduler.scheduleOnce(runnable, timeLeft, NANOS) + Scheduler.scheduleOnce(runnable, timeLeft(), NANOS) false } else true } else false @@ -904,12 +907,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS) + if (!isExpired) Scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) }) } } } - Scheduler.scheduleOnce(runnable, timeLeft, NANOS) + Scheduler.scheduleOnce(runnable, timeLeft(), NANOS) promise } } else this @@ -923,6 +926,8 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi //TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs. @inline private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) + + private def timeLeftNoinline(): Long = timeLeft() } class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with ForwardableChannel with ExceptionChannel[Any] { diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 6f17b20530..eb66a8abca 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -67,23 +67,23 @@ object EventHandler extends ListenerManagement { sealed trait Event { @transient val thread: Thread = Thread.currentThread - val level: Int + def level: Int } case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends Event { - override val level = ErrorLevel + def level = ErrorLevel } case class Warning(instance: AnyRef, message: Any = "") extends Event { - override val level = WarningLevel + def level = WarningLevel } case class Info(instance: AnyRef, message: Any = "") extends Event { - override val level = InfoLevel + def level = InfoLevel } case class Debug(instance: AnyRef, message: Any = "") extends Event { - override val level = DebugLevel + def level = DebugLevel } val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 95344fe157..a270146d59 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -499,5 +499,5 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { */ class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter { - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results) + protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 53a3e6c1a5..7f08280698 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -26,11 +26,6 @@ object Helpers { (0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) } - def flatten[T: ClassManifest](array: Array[Any]) = array.flatMap { - case arr: Array[T] ⇒ arr - case elem: T ⇒ Array(elem) - } - def ignore[E: Manifest](body: ⇒ Unit) { try { body diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala index d9b7e40b63..b1014743e6 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedCamel.scala @@ -7,6 +7,7 @@ package akka.camel import org.apache.camel.CamelContext import akka.actor.Actor._ +import akka.actor.Props import akka.actor.ActorRef import akka.camel.component.TypedActorComponent @@ -32,13 +33,13 @@ private[camel] object TypedCamel { * and re-uses the activationTracker of service. */ def onCamelServiceStart(service: CamelService) { - consumerPublisher = localActorOf(new TypedConsumerPublisher(service.activationTracker)) - publishRequestor = localActorOf(new TypedConsumerPublishRequestor) + consumerPublisher = actorOf(Props(new TypedConsumerPublisher(service.activationTracker)).withLocalOnly(true)) + publishRequestor = actorOf(Props(new TypedConsumerPublishRequestor).withLocalOnly(true)) registerPublishRequestor for (event ← PublishRequestor.pastActorRegisteredEvents) publishRequestor ! event - publishRequestor ! InitPublishRequestor(consumerPublisher.start) + publishRequestor ! InitPublishRequestor(consumerPublisher) } /** diff --git a/akka-camel/src/main/scala/akka/camel/CamelService.scala b/akka-camel/src/main/scala/akka/camel/CamelService.scala index 0f9cf017c8..abae389630 100644 --- a/akka-camel/src/main/scala/akka/camel/CamelService.scala +++ b/akka-camel/src/main/scala/akka/camel/CamelService.scala @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit import org.apache.camel.CamelContext +import akka.actor.Props import akka.actor.Actor._ import akka.config.Config._ import akka.japi.{ SideEffect, Option ⇒ JOption } @@ -26,9 +27,9 @@ import TypedCamelAccess._ * @author Martin Krasser */ trait CamelService extends Bootable { - private[camel] val activationTracker = localActorOf(new ActivationTracker) - private[camel] val consumerPublisher = localActorOf(new ConsumerPublisher(activationTracker)) - private[camel] val publishRequestor = localActorOf(new ConsumerPublishRequestor) + private[camel] val activationTracker = actorOf(Props(new ActivationTracker).withLocalOnly(true)) + private[camel] val consumerPublisher = actorOf(Props(new ConsumerPublisher(activationTracker)).withLocalOnly(true)) + private[camel] val publishRequestor = actorOf(Props(new ConsumerPublishRequestor).withLocalOnly(true)) private val serviceEnabled = config.getList("akka.enabled-modules").exists(_ == "camel") diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index f785723bec..a99fedaa74 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -84,7 +84,7 @@ class TransactionLog private ( def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { val entryBytes = - if (Cluster.shouldCompressData) LZF.compress(entry) + if (shouldCompressData) LZF.compress(entry) else entry try { @@ -118,7 +118,7 @@ class TransactionLog private ( def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { val snapshotBytes = - if (Cluster.shouldCompressData) LZF.compress(snapshot) + if (shouldCompressData) LZF.compress(snapshot) else snapshot try { @@ -311,7 +311,7 @@ class TransactionLog private ( while (enumeration.hasMoreElements) { val bytes = enumeration.nextElement.getEntry val entry = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) + if (shouldCompressData) LZF.uncompress(bytes) else bytes entries = entries :+ entry } @@ -356,6 +356,10 @@ class TransactionLog private ( */ object TransactionLog { + val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") + val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt + val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt + val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match { case "CRC32" ⇒ BookKeeper.DigestType.CRC32 case "MAC" ⇒ BookKeeper.DigestType.MAC @@ -367,40 +371,17 @@ object TransactionLog { val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2) val snapshotFrequency = config.getInt("akka.cluster.replication.snapshot-frequency", 1000) val timeout = Duration(config.getInt("akka.cluster.replication.timeout", 30), TIME_UNIT).toMillis + val shouldCompressData = config.getBool("akka.cluster.use-compression", false) private[akka] val transactionLogNode = "/transaction-log-ids" private val isConnected = new Switch(false) - private[akka] lazy val (bookieClient, zkClient) = { - val bk = new BookKeeper(Cluster.zooKeeperServers) + @volatile + private[akka] var bookieClient: BookKeeper = _ - val zk = new AkkaZkClient( - Cluster.zooKeeperServers, - Cluster.sessionTimeout, - Cluster.connectionTimeout, - Cluster.defaultZooKeeperSerializer) - - try { - zk.create(transactionLogNode, null, CreateMode.PERSISTENT) - } catch { - case e: ZkNodeExistsException ⇒ {} // do nothing - case e: Throwable ⇒ handleError(e) - } - - EventHandler.info(this, - ("Transaction log service started with" + - "\n\tdigest type [%s]" + - "\n\tensemble size [%s]" + - "\n\tquorum size [%s]" + - "\n\tlogging time out [%s]").format( - digestType, - ensembleSize, - quorumSize, - timeout)) - isConnected.switchOn - (bk, zk) - } + @volatile + private[akka] var zkClient: AkkaZkClient = _ private[akka] def apply( ledger: LedgerHandle, @@ -409,6 +390,34 @@ object TransactionLog { replicationScheme: ReplicationScheme) = new TransactionLog(ledger, id, isAsync, replicationScheme) + /** + * Starts up the transaction log. + */ + def start(): Unit = { + isConnected switchOn { + bookieClient = new BookKeeper(zooKeeperServers) + zkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout) + + try { + zkClient.create(transactionLogNode, null, CreateMode.PERSISTENT) + } catch { + case e: ZkNodeExistsException ⇒ {} // do nothing + case e: Throwable ⇒ handleError(e) + } + + EventHandler.info(this, + ("Transaction log service started with" + + "\n\tdigest type [%s]" + + "\n\tensemble size [%s]" + + "\n\tquorum size [%s]" + + "\n\tlogging time out [%s]").format( + digestType, + ensembleSize, + quorumSize, + timeout)) + } + } + /** * Shuts down the transaction log. */ @@ -575,10 +584,12 @@ object LocalBookKeeperEnsemble { */ def start() { isRunning switchOn { + EventHandler.info(this, "Starting up LocalBookKeeperEnsemble...") localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize) localBookKeeper.runZookeeper(port) localBookKeeper.initializeZookeper() localBookKeeper.runBookies() + EventHandler.info(this, "LocalBookKeeperEnsemble started up successfully") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala index c4d0e68a94..a43f6be62a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AsynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -49,8 +51,10 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "fail to be opened if non existing - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to overweite an existing txlog if one already exists - asynchronous" in { @@ -67,6 +71,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef } "be able to record and delete entries - asynchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, true, null) Thread.sleep(200) @@ -78,6 +83,7 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef txlog1.delete Thread.sleep(200) intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { @@ -214,14 +220,11 @@ class AsynchronousTransactionLogSpec extends WordSpec with MustMatchers with Bef override def beforeAll() = { LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala index 539a235e36..9bfb5a0257 100644 --- a/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/SynchronousTransactionLogSpec.scala @@ -9,6 +9,8 @@ import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.actor._ +import akka.event.EventHandler +import akka.testkit.{ EventFilter, TestEvent } import com.eaio.uuid.UUID @@ -33,8 +35,10 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } "fail to be opened if non existing - synchronous" in { + EventHandler.notify(TestEvent.Mute(EventFilter[ReplicationException])) val uuid = (new UUID).toString intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + EventHandler.notify(TestEvent.UnMuteAll) } "be able to be checked for existence - synchronous" in { @@ -175,16 +179,12 @@ class SynchronousTransactionLogSpec extends WordSpec with MustMatchers with Befo } override def beforeAll() = { - LocalCluster.startLocalCluster() LocalBookKeeperEnsemble.start() + TransactionLog.start() } override def afterAll() = { - Cluster.node.shutdown() - LocalCluster.shutdownLocalCluster() TransactionLog.shutdown() LocalBookKeeperEnsemble.shutdown() - Actor.registry.local.shutdownAll() - Scheduler.shutdown() } } diff --git a/akka-cluster/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-cluster/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 67a29396f0..9d0ff3766d 100644 --- a/akka-cluster/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-cluster/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -21,7 +21,7 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll describe("Serializable actor") { it("should be able to serialize and de-serialize a stateful actor with a given serializer") { - val actor1 = localActorOf[MyJavaSerializableActor].start().asInstanceOf[LocalActorRef] + val actor1 = actorOf(Props[MyJavaSerializableActor].withLocalOnly(true)).asInstanceOf[LocalActorRef] (actor1 ? "hello").get should equal("world 1") (actor1 ? "hello").get should equal("world 2") @@ -36,17 +36,9 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { - val actor1 = localActorOf[MyStatelessActorWithMessagesInMailbox].start().asInstanceOf[LocalActorRef] - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") - (actor1 ! "hello") + val actor1 = actorOf(Props[MyStatelessActorWithMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] + for (i ← 1 to 10) actor1 ! "hello" + actor1.getDispatcher.mailboxSize(actor1) should be > (0) val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) @@ -62,7 +54,7 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll it("should be able to serialize and deserialize a PersonActorWithMessagesInMailbox") { val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) - val actor1 = localActorOf[PersonActorWithMessagesInMailbox].start().asInstanceOf[LocalActorRef] + val actor1 = actorOf(Props[PersonActorWithMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) @@ -106,19 +98,10 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll describe("serialize actor that accepts protobuf message") { it("should serialize") { - val actor1 = localActorOf[MyActorWithProtobufMessagesInMailbox].start().asInstanceOf[LocalActorRef] + val actor1 = actorOf(Props[MyActorWithProtobufMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] val msg = MyMessage(123, "debasish ghosh", true) val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) - (actor1 ! b) + for (i ← 1 to 10) actor1 ! b actor1.getDispatcher.mailboxSize(actor1) should be > (0) val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index abcfd6d9bf..4d60b5a1ba 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -80,6 +80,7 @@ The 'Dispatcher' binds a set of Actors to a thread pool backed up by a 'Blocking The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g. the Actors you define in the declarative supervisor config, to get their own dispatcher but reuse the dispatcher for each new Actor that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to design and implement your system in the most efficient way in regards to performance, throughput and latency. It comes with many different predefined BlockingQueue configurations: + * Bounded LinkedBlockingQueue * Unbounded LinkedBlockingQueue * Bounded ArrayBlockingQueue diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 4ae489bd22..77dff1856c 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -199,7 +199,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.fold(0)(futures)(_ + _) + val futureSum = Future.fold(0)(futures)(_ + _) That's all it takes! @@ -210,7 +210,7 @@ If the sequence passed to ``fold`` is empty, it will return the start-value, in val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.reduce(futures)(_ + _) + val futureSum = Future.reduce(futures)(_ + _) Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again. diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index c79246966c..2709c220c4 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -32,7 +32,7 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM "should handle reply to ! for 1 message" in { val latch = new CountDownLatch(1) val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = localActorOf(new Actor { def receive = { case "sum" ⇒ latch.countDown } }).start + val sender = actorOf(Props(self ⇒ { case "sum" ⇒ latch.countDown }).withLocalOnly(true)) queueActor.!("sum")(Some(sender)) latch.await(10, TimeUnit.SECONDS) must be(true) @@ -41,13 +41,10 @@ abstract class DurableMailboxSpec(val backendName: String, val storage: DurableM "should handle reply to ! for multiple messages" in { val latch = new CountDownLatch(5) val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = localActorOf(new Actor { def receive = { case "sum" ⇒ latch.countDown } }).start + val sender = actorOf(Props(self ⇒ { case "sum" ⇒ latch.countDown }).withLocalOnly(true)) + + for (i ← 1 to 5) queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) latch.await(10, TimeUnit.SECONDS) must be(true) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 7b02bc6ce7..f6c894ccba 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -1,8 +1,8 @@ package sample.fsm.dining.fsm -import akka.actor.{ ActorRef, Actor, FSM } +import akka.actor.{ ActorRef, Actor, FSM, UntypedChannel, NullChannel } import akka.actor.FSM._ -import Actor._ +import akka.actor.Actor._ import akka.util.Duration import akka.util.duration._ @@ -25,7 +25,7 @@ case object Taken extends ChopstickState /** * Some state container for the chopstick */ -case class TakenBy(hakker: Option[ActorRef]) +case class TakenBy(hakker: UntypedChannel) /* * A chopstick is an actor, it can be taken, and put back @@ -33,12 +33,12 @@ case class TakenBy(hakker: Option[ActorRef]) class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { // A chopstick begins its existence as available and taken by no one - startWith(Available, TakenBy(None)) + startWith(Available, TakenBy(NullChannel)) // When a chopstick is available, it can be taken by a some hakker when(Available) { case Event(Take, _) ⇒ - goto(Taken) using TakenBy(self.sender) replying Taken(self) + goto(Taken) using TakenBy(self.channel) replying Taken(self) } // When a chopstick is taken by a hakker @@ -47,8 +47,8 @@ class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { when(Taken) { case Event(Take, currentState) ⇒ stay replying Busy(self) - case Event(Put, TakenBy(hakker)) if self.sender == hakker ⇒ - goto(Available) using TakenBy(None) + case Event(Put, TakenBy(hakker)) if self.channel == hakker ⇒ + goto(Available) using TakenBy(NullChannel) } // Initialze the chopstick diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 28d8c5e0cd..1f30b61968 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -58,7 +58,7 @@ object TestActorRefSpec { self.reply("workDone") self.stop() } - case replyTo: Channel[Any] ⇒ { + case replyTo: UntypedChannel ⇒ { replyTo ! "complexReply" } } @@ -232,10 +232,13 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac EventHandler.notify(TestEvent.Mute(filter)) val log = TestActorRef[Logger] EventHandler.addListener(log) + val eventHandlerLevel = EventHandler.level + EventHandler.level = EventHandler.WarningLevel boss link ref val la = log.underlyingActor la.count must be(1) la.msg must (include("supervisor") and include("CallingThreadDispatcher")) + EventHandler.level = eventHandlerLevel EventHandler.removeListener(log) EventHandler.notify(TestEvent.UnMute(filter)) } diff --git a/config/akka.test.conf b/config/akka.test.conf index 2f540f8c9b..59f9e520cf 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -6,5 +6,5 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] - event-handler-level = "INFO" + event-handler-level = "ERROR" }