diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 441f1515be..5cd9075e38 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -14,7 +14,7 @@ import akka.dispatch.MessageDispatcher import akka.pattern.ask import java.net.{ Socket, InetSocketAddress, InetAddress, SocketAddress } import scala.util.Failure -import annotation.tailrec +import scala.annotation.tailrec object IOActorSpec { @@ -55,6 +55,8 @@ object IOActorSpec { def receive = { + case _: IO.Connected ⇒ //don't care + case bytes: ByteString ⇒ val source = sender socket write bytes @@ -65,9 +67,9 @@ object IOActorSpec { case IO.Closed(`socket`, cause) ⇒ state(cause) - throw cause match { - case IO.Error(e) ⇒ e - case _ ⇒ new RuntimeException("Socket closed") + cause match { + case IO.Error(e) ⇒ throw e + case _ ⇒ throw new RuntimeException("Socket closed") } } @@ -154,6 +156,8 @@ object IOActorSpec { case IO.Read(socket, bytes) ⇒ state(socket)(IO Chunk bytes) + case _: IO.Connected ⇒ //don't care + case IO.Closed(socket, cause) ⇒ state -= socket @@ -181,6 +185,8 @@ object IOActorSpec { readResult map (source !) } + case _: IO.Connected ⇒ //don't care + case IO.Read(`socket`, bytes) ⇒ state(IO Chunk bytes) @@ -276,7 +282,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { } "an IO Actor" must { - implicit val ec = system.dispatcher + import system.dispatcher "run echo server" in { filterException[java.net.ConnectException] { val addressPromise = Promise[SocketAddress]() diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index a34d220cc5..eb30bb182b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -194,7 +194,7 @@ object SupervisorHierarchySpec { case x ⇒ (x, x) } override val supervisorStrategy = OneForOneStrategy()(unwrap andThen { - case _: Failure if pongsToGo > 0 ⇒ + case (_: Failure, _) if pongsToGo > 0 ⇒ log :+= Event("pongOfDeath resuming " + sender, identityHashCode(this)) Resume case (f: Failure, orig) ⇒ @@ -391,10 +391,10 @@ object SupervisorHierarchySpec { // don’t escalate from this one! override val supervisorStrategy = OneForOneStrategy() { - case f: Failure ⇒ f.directive - case OriginalRestartException(f: Failure) ⇒ f.directive - case ActorInitializationException(f: Failure) ⇒ f.directive - case _ ⇒ Stop + case f: Failure ⇒ f.directive + case OriginalRestartException(f: Failure) ⇒ f.directive + case ActorInitializationException(_, _, f: Failure) ⇒ f.directive + case _ ⇒ Stop } var children = Vector.empty[ActorRef] diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 1aa168b924..9a43631894 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -45,6 +45,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10) settings.DefaultVirtualNodesFactor must be(10) + + getMilliseconds("akka.actor.unstarted-push-timeout") must be(10.seconds.toMillis) + settings.UnstartedPushTimeout.duration must be(10.seconds) } { diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 94a650e8c7..22a5e66971 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -5,7 +5,6 @@ package akka.routing import language.postfixOps -import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import scala.collection.immutable import akka.testkit._ @@ -18,6 +17,8 @@ import java.util.concurrent.ConcurrentHashMap import com.typesafe.config.Config import akka.dispatch.Dispatchers import akka.util.Collections.EmptyImmutableSeq +import akka.util.Timeout +import java.util.concurrent.atomic.AtomicInteger object RoutingSpec { @@ -100,33 +101,34 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "be able to send their routees" in { - class TheActor extends Actor { - val routee1 = context.actorOf(Props[TestActor], "routee1") - val routee2 = context.actorOf(Props[TestActor], "routee2") - val routee3 = context.actorOf(Props[TestActor], "routee3") - val router = context.actorOf(Props[TestActor].withRouter( - ScatterGatherFirstCompletedRouter( - routees = List(routee1, routee2, routee3), - within = 5 seconds))) - + case class TestRun(id: String, names: immutable.Iterable[String], actors: Int) + val actor = system.actorOf(Props(new Actor { def receive = { - case "doIt" ⇒ router ! CurrentRoutees - case routees: RouterRoutees ⇒ testActor forward routees + case TestRun(id, names, actors) ⇒ + val routerProps = Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter( + routees = names map { context.actorOf(Props(new TestActor), _) }, + within = 5 seconds)) + + 1 to actors foreach { i ⇒ context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) } } - } + })) - val theActor = system.actorOf(Props(new TheActor), "theActor") - theActor ! "doIt" - val routees = expectMsgPF() { - case RouterRoutees(routees) ⇒ routees.toSet - } + val actors = 15 + val names = 1 to 20 map { "routee" + _ } toList - routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3")) + actor ! TestRun("test", names, actors) + 1 to actors foreach { _ ⇒ + val routees = expectMsgType[RouterRoutees].routees + routees.map(_.path.name) must be === names + } + expectNoMsg(500.millis) } "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) watch(router) system.stop(router) expectMsgType[Terminated] @@ -134,7 +136,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when router is specified" in { val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } @@ -149,7 +152,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3") Await.ready(latch, remaining) - Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + router ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size must be(3) system.stop(router) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 75dbd3fefa..9cf2b5b3df 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -259,12 +259,12 @@ private[akka] trait Cell { */ def isLocal: Boolean /** - * If the actor isLocal, returns whether messages are currently queued, + * If the actor isLocal, returns whether "user messages" are currently queued, * “false” otherwise. */ def hasMessages: Boolean /** - * If the actor isLocal, returns the number of messages currently queued, + * If the actor isLocal, returns the number of "user messages" currently queued, * which may be a costly operation, 0 otherwise. */ def numberOfMessages: Int diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6bf3b3bf7c..a6685ae549 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -536,7 +536,7 @@ private[akka] class VirtualPathContainer( def hasChildren: Boolean = !children.isEmpty - def foreachChild(f: ActorRef ⇒ Unit) = { + def foreachChild(f: ActorRef ⇒ Unit): Unit = { val iter = children.values.iterator while (iter.hasNext) f(iter.next) }