Adding configurable default dispatcher timeout and re-instating awaitEither

This commit is contained in:
Viktor Klang 2010-11-12 14:04:06 +01:00
parent 49d9151c70
commit 9898655f4a
6 changed files with 33 additions and 33 deletions

View file

@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
map(time => Duration(time, TIME_UNIT)).
getOrElse(Duration(1000,TimeUnit.MILLISECONDS))
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
@ -35,6 +35,9 @@ object Futures {
f
}
/**
* (Blocking!)
*/
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
/**
@ -58,34 +61,10 @@ object Futures {
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
in map { f => fun(f.await) }
/*
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
import Actor.Sender.Self
import Actor.{spawn, actor}
case class Result(res: Option[T])
val handOff = new SynchronousQueue[Option[T]]
spawn {
try {
println("f1 await")
f1.await
println("f1 offer")
handOff.offer(f1.result)
} catch {case _ => {}}
}
spawn {
try {
println("f2 await")
f2.await
println("f2 offer")
println("f2 offer: " + f2.result)
handOff.offer(f2.result)
} catch {case _ => {}}
}
Thread.sleep(100)
handOff.take
}
*/
/**
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
*/
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
}
sealed trait Future[T] {
@ -105,6 +84,19 @@ sealed trait Future[T] {
def onComplete(func: Future[T] => Unit): Future[T]
/**
* Returns the current result, throws the exception is one has been raised, else returns None
*/
def resultOrException: Option[T] = {
val r = result
if (r.isDefined) result
else {
val problem = exception
if (problem.isDefined) throw problem.get
else None
}
}
/* Java API */
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f))

View file

@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging {
}
}
private[akka] def timeoutMs: Long = 1000
/**
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
*/
private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference

View file

@ -192,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start
assertDispatcher(dispatcher)(starts = 1, stops = 0)
a.stop
await(dispatcher.stops.get == 1)(withinMs = 10000)
await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 1, stops = 1)
assertRef(a,dispatcher)(
suspensions = 0,

View file

@ -5,6 +5,7 @@ import org.junit.Test
import akka.dispatch.Futures
import Actor._
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.CountDownLatch
object FutureSpec {
class TestActor extends Actor {
@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite {
actor.stop
}
/*
// FIXME: implement Futures.awaitEither, and uncomment these two tests
@Test def shouldFutureAwaitEitherLeft = {
val actor1 = actorOf[TestActor].start
@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite {
actor1.stop
actor2.stop
}
*/
@Test def shouldFutureAwaitOneLeft = {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf[TestActor].start

View file

@ -26,6 +26,7 @@ akka {
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable