Merge pull request #876 from akka/wip-misc-cleanup-√

Wip misc cleanup √
This commit is contained in:
Viktor Klang (√) 2012-11-16 08:34:06 -08:00
commit 1d768fa10c
6 changed files with 48 additions and 35 deletions

View file

@ -14,7 +14,7 @@ import akka.dispatch.MessageDispatcher
import akka.pattern.ask
import java.net.{ Socket, InetSocketAddress, InetAddress, SocketAddress }
import scala.util.Failure
import annotation.tailrec
import scala.annotation.tailrec
object IOActorSpec {
@ -55,6 +55,8 @@ object IOActorSpec {
def receive = {
case _: IO.Connected //don't care
case bytes: ByteString
val source = sender
socket write bytes
@ -65,9 +67,9 @@ object IOActorSpec {
case IO.Closed(`socket`, cause)
state(cause)
throw cause match {
case IO.Error(e) e
case _ new RuntimeException("Socket closed")
cause match {
case IO.Error(e) throw e
case _ throw new RuntimeException("Socket closed")
}
}
@ -154,6 +156,8 @@ object IOActorSpec {
case IO.Read(socket, bytes)
state(socket)(IO Chunk bytes)
case _: IO.Connected //don't care
case IO.Closed(socket, cause)
state -= socket
@ -181,6 +185,8 @@ object IOActorSpec {
readResult map (source !)
}
case _: IO.Connected //don't care
case IO.Read(`socket`, bytes)
state(IO Chunk bytes)
@ -276,7 +282,7 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
}
"an IO Actor" must {
implicit val ec = system.dispatcher
import system.dispatcher
"run echo server" in {
filterException[java.net.ConnectException] {
val addressPromise = Promise[SocketAddress]()

View file

@ -194,7 +194,7 @@ object SupervisorHierarchySpec {
case x (x, x)
}
override val supervisorStrategy = OneForOneStrategy()(unwrap andThen {
case _: Failure if pongsToGo > 0
case (_: Failure, _) if pongsToGo > 0
log :+= Event("pongOfDeath resuming " + sender, identityHashCode(this))
Resume
case (f: Failure, orig)
@ -391,10 +391,10 @@ object SupervisorHierarchySpec {
// dont escalate from this one!
override val supervisorStrategy = OneForOneStrategy() {
case f: Failure f.directive
case OriginalRestartException(f: Failure) f.directive
case ActorInitializationException(f: Failure) f.directive
case _ Stop
case f: Failure f.directive
case OriginalRestartException(f: Failure) f.directive
case ActorInitializationException(_, _, f: Failure) f.directive
case _ Stop
}
var children = Vector.empty[ActorRef]

View file

@ -45,6 +45,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10)
settings.DefaultVirtualNodesFactor must be(10)
getMilliseconds("akka.actor.unstarted-push-timeout") must be(10.seconds.toMillis)
settings.UnstartedPushTimeout.duration must be(10.seconds)
}
{

View file

@ -5,7 +5,6 @@ package akka.routing
import language.postfixOps
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import scala.collection.immutable
import akka.testkit._
@ -18,6 +17,8 @@ import java.util.concurrent.ConcurrentHashMap
import com.typesafe.config.Config
import akka.dispatch.Dispatchers
import akka.util.Collections.EmptyImmutableSeq
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
object RoutingSpec {
@ -100,33 +101,34 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
"be able to send their routees" in {
class TheActor extends Actor {
val routee1 = context.actorOf(Props[TestActor], "routee1")
val routee2 = context.actorOf(Props[TestActor], "routee2")
val routee3 = context.actorOf(Props[TestActor], "routee3")
val router = context.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = List(routee1, routee2, routee3),
within = 5 seconds)))
case class TestRun(id: String, names: immutable.Iterable[String], actors: Int)
val actor = system.actorOf(Props(new Actor {
def receive = {
case "doIt" router ! CurrentRoutees
case routees: RouterRoutees testActor forward routees
case TestRun(id, names, actors)
val routerProps = Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = names map { context.actorOf(Props(new TestActor), _) },
within = 5 seconds))
1 to actors foreach { i context.actorOf(routerProps, id + i).tell(CurrentRoutees, testActor) }
}
}
}))
val theActor = system.actorOf(Props(new TheActor), "theActor")
theActor ! "doIt"
val routees = expectMsgPF() {
case RouterRoutees(routees) routees.toSet
}
val actors = 15
val names = 1 to 20 map { "routee" + _ } toList
routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3"))
actor ! TestRun("test", names, actors)
1 to actors foreach { _
val routees = expectMsgType[RouterRoutees].routees
routees.map(_.path.name) must be === names
}
expectNoMsg(500.millis)
}
"use configured nr-of-instances when FromConfig" in {
val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1")
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
watch(router)
system.stop(router)
expectMsgType[Terminated]
@ -134,7 +136,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
"use configured nr-of-instances when router is specified" in {
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2")
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
system.stop(router)
}
@ -149,7 +152,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
}
val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3")
Await.ready(latch, remaining)
Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3)
router ! CurrentRoutees
expectMsgType[RouterRoutees].routees.size must be(3)
system.stop(router)
}

View file

@ -259,12 +259,12 @@ private[akka] trait Cell {
*/
def isLocal: Boolean
/**
* If the actor isLocal, returns whether messages are currently queued,
* If the actor isLocal, returns whether "user messages" are currently queued,
* false otherwise.
*/
def hasMessages: Boolean
/**
* If the actor isLocal, returns the number of messages currently queued,
* If the actor isLocal, returns the number of "user messages" currently queued,
* which may be a costly operation, 0 otherwise.
*/
def numberOfMessages: Int

View file

@ -536,7 +536,7 @@ private[akka] class VirtualPathContainer(
def hasChildren: Boolean = !children.isEmpty
def foreachChild(f: ActorRef Unit) = {
def foreachChild(f: ActorRef Unit): Unit = {
val iter = children.values.iterator
while (iter.hasNext) f(iter.next)
}