Merge branch 'async-system-messages' of github.com:jboner/akka into async-system-messages

This commit is contained in:
Viktor Klang 2011-09-26 15:45:43 +02:00
commit d46d768cda
2 changed files with 17 additions and 12 deletions

View file

@ -8,7 +8,7 @@ import akka.actor.{ ActorRef, Actor }
import java.util.concurrent.atomic.AtomicInteger
import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._
class Ticket1111Spec extends WordSpec with MustMatchers {
@ -16,7 +16,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
"return response, even if one of the connections has stopped" in {
val shutdownLatch = new CountDownLatch(1)
val shutdownLatch = new TestLatch(1)
val props = RoutedProps()
.withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
@ -26,14 +26,14 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
actor ! Broadcast(Stop(Some(0)))
shutdownLatch.await(5, TimeUnit.SECONDS) must be(true)
shutdownLatch.await
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
}
"throw an exception, if all the connections have stopped" in {
val shutdownLatch = new CountDownLatch(2)
val shutdownLatch = new TestLatch(2)
val props = RoutedProps()
.withConnections(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))
@ -43,7 +43,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
actor ! Broadcast(Stop())
shutdownLatch.await(5, TimeUnit.SECONDS) must be(true)
shutdownLatch.await
(intercept[RoutingException] {
actor ? Broadcast(0)
@ -100,7 +100,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
"deliver one-way messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
val doneLatch = new TestLatch(connectionCount)
var connections = new LinkedList[ActorRef]
var counters = new LinkedList[AtomicInteger]
@ -130,7 +130,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
doneLatch.await
for (i 0 until connectionCount) {
val counter = counters.get(i).get
@ -139,7 +139,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
}
"deliver a broadcast message using the !" in {
val doneLatch = new CountDownLatch(2)
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val connection1 = actorOf(new Actor {
@ -166,7 +166,7 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
actor ! Broadcast(1)
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
doneLatch.await
counter1.get must be(1)
counter2.get must be(1)
@ -174,13 +174,17 @@ class Ticket1111Spec extends WordSpec with MustMatchers {
case class Stop(id: Option[Int] = None)
def newActor(id: Int, shudownLatch: Option[CountDownLatch] = None) = actorOf(new Actor {
def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = actorOf(new Actor {
def receive = {
case Stop(None) self.stop(); shudownLatch.map(_.countDown())
case Stop(Some(_id)) if (_id == id) self.stop(); shudownLatch.map(_.countDown())
case Stop(None) self.stop()
case Stop(Some(_id)) if (_id == id) self.stop()
case _id: Int if (_id == id)
case _ Thread sleep 100 * id; tryReply(id)
}
override def postStop = {
shudownLatch foreach (_.countDown())
}
})
}

View file

@ -467,6 +467,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
val responses = connections.versionedIterable.iterable.flatMap { actor
try {
if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first")
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
} catch {
case e: Exception