diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index cb7f8fab94..4d33bf03ce 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit */ object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). + map(time => Duration(time, TIME_UNIT)). + getOrElse(Duration(1000,TimeUnit.MILLISECONDS)) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 37527c2ffa..2ac412d36d 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package akka.dispatch @@ -35,6 +35,9 @@ object Futures { f } + /** + * (Blocking!) + */ def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) /** @@ -58,34 +61,10 @@ object Futures { def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = in map { f => fun(f.await) } - /* - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { - import Actor.Sender.Self - import Actor.{spawn, actor} - - case class Result(res: Option[T]) - val handOff = new SynchronousQueue[Option[T]] - spawn { - try { - println("f1 await") - f1.await - println("f1 offer") - handOff.offer(f1.result) - } catch {case _ => {}} - } - spawn { - try { - println("f2 await") - f2.await - println("f2 offer") - println("f2 offer: " + f2.result) - handOff.offer(f2.result) - } catch {case _ => {}} - } - Thread.sleep(100) - handOff.take - } -*/ + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException } sealed trait Future[T] { @@ -105,6 +84,19 @@ sealed trait Future[T] { def onComplete(func: Future[T] => Unit): Future[T] + /** + * Returns the current result, throws the exception is one has been raised, else returns None + */ + def resultOrException: Option[T] = { + val r = result + if (r.isDefined) result + else { + val problem = exception + if (problem.isDefined) throw problem.get + else None + } + } + /* Java API */ def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f)) diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index f7eb0ca170..33a7a62af3 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } - private[akka] def timeoutMs: Long = 1000 + /** + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms + * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second + */ + private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala index 6cd57c15fe..df088ce89c 100644 --- a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala @@ -192,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite { a.start assertDispatcher(dispatcher)(starts = 1, stops = 0) a.stop - await(dispatcher.stops.get == 1)(withinMs = 10000) + await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 1, stops = 1) assertRef(a,dispatcher)( suspensions = 0, diff --git a/akka-actor/src/test/scala/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/dispatch/FutureSpec.scala index d8b03bb0da..2b456a7cd3 100644 --- a/akka-actor/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/dispatch/FutureSpec.scala @@ -5,6 +5,7 @@ import org.junit.Test import akka.dispatch.Futures import Actor._ import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.CountDownLatch object FutureSpec { class TestActor extends Actor { @@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - /* // FIXME: implement Futures.awaitEither, and uncomment these two tests @Test def shouldFutureAwaitEitherLeft = { val actor1 = actorOf[TestActor].start @@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite { actor1.stop actor2.stop } - */ + @Test def shouldFutureAwaitOneLeft = { val actor1 = actorOf[TestActor].start val actor2 = actorOf[TestActor].start diff --git a/config/akka-reference.conf b/config/akka-reference.conf index c6f3647d0f..8bb6131524 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -26,6 +26,7 @@ akka { serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline + dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable