Start migrating Future to use Actor.Timeout, including support for never timing out. More testing and optimization still needed

This commit is contained in:
Derek Williams 2011-06-18 23:23:47 -06:00
parent daab5bdbcd
commit e639b2c74d
3 changed files with 88 additions and 54 deletions

View file

@ -147,6 +147,9 @@ object Actor extends ListenerManagement {
object Timeout {
def apply(timeout: Long) = new Timeout(timeout)
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
implicit def default = defaultTimeout
val zero = new Timeout(Duration.Zero)
val never = new Timeout(Duration.Inf)
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)

View file

@ -7,6 +7,7 @@ package akka.dispatch
import akka.AkkaException
import akka.event.EventHandler
import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler }
import akka.actor.Actor.Timeout
import akka.util.{ Duration, BoxedType }
import akka.japi.{ Procedure, Function JFunc }
@ -32,6 +33,12 @@ object Futures {
def future[T](body: Callable[T]): Future[T] =
Future(body.call)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Timeout): Future[T] =
Future(body.call, timeout)
/**
* Java API, equivalent to Future.apply
*/
@ -44,6 +51,12 @@ object Futures {
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
Future(body.call, timeout)(dispatcher)
/**
* Java API, equivalent to Future.apply
*/
@ -53,7 +66,7 @@ object Futures {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = {
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default): Future[T] = {
val futureResult = new DefaultPromise[T](timeout)
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
@ -66,7 +79,7 @@ object Futures {
* Java API.
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] =
def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Timeout): Future[T] =
firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
/**
@ -79,7 +92,7 @@ object Futures {
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) R): Future[R] = {
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) R): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
@ -121,9 +134,13 @@ object Futures {
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun)
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
@ -131,7 +148,7 @@ object Futures {
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) T): Future[R] = {
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) T): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
else {
@ -156,15 +173,17 @@ object Futures {
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] =
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
/**
* Java API.
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JIterable[A]] =
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] =
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
@ -176,7 +195,7 @@ object Futures {
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, Actor.TIMEOUT)
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, Timeout.default)
/**
* Java API.
@ -184,7 +203,7 @@ object Futures {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JIterable[B]] =
def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] =
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a)
for (r fr; b fb) yield {
@ -205,7 +224,7 @@ object Futures {
* for (r <- fr; b <-fb) yield (r += b)
* }.map(_.result)
*/
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Actor.TIMEOUT, fn)
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Timeout.default, fn)
}
object Future {
@ -214,7 +233,7 @@ object Future {
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] =
def apply[T](body: T, timeout: Timeout = Timeout.default)(implicit dispatcher: MessageDispatcher): Future[T] =
dispatcher.dispatchFuture(() body, timeout)
import scala.collection.mutable.Builder
@ -224,7 +243,7 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout = Timeout.default)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
@ -235,7 +254,7 @@ object Future {
* val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout = Timeout.default)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
@ -257,7 +276,7 @@ object Future {
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = {
def flow[A](body: A @cps[Future[Any]], timeout: Timeout = Timeout.default): Future[A] = {
val future = Promise[A](timeout)
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete {
_.exception match {
@ -338,10 +357,12 @@ sealed trait Future[+T] {
*/
def isExpired: Boolean
def timeout: Timeout
/**
* This Future's timeout in nanoseconds.
*/
def timeoutInNanos: Long
def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
/**
* The contained value of this Future. Before this Future is completed
@ -443,7 +464,7 @@ sealed trait Future[+T] {
case Some(_)
this.asInstanceOf[Future[A]]
case None
val future = new DefaultPromise[A](timeoutInNanos, NANOS)
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
self.value.get match {
@ -489,7 +510,7 @@ sealed trait Future[+T] {
case Some(_)
this.asInstanceOf[Future[A]]
case None
val future = new DefaultPromise[A](timeoutInNanos, NANOS)
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
self.value.get match {
@ -533,7 +554,7 @@ sealed trait Future[+T] {
case Some(_)
this.asInstanceOf[Future[A]]
case None
val future = new DefaultPromise[A](timeoutInNanos, NANOS)
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
self.value.get match {
@ -566,7 +587,7 @@ sealed trait Future[+T] {
case Some(_)
this.asInstanceOf[Future[A]]
case None
val fa = new DefaultPromise[A](timeoutInNanos, NANOS)
val fa = new DefaultPromise[A](timeout)
onComplete { ft
fa complete (ft.value.get match {
case l: Left[_, _] l.asInstanceOf[Either[Throwable, A]]
@ -607,7 +628,7 @@ sealed trait Future[+T] {
case Some(_)
this.asInstanceOf[Future[A]]
case None
val future = new DefaultPromise[A](timeoutInNanos, NANOS)
val future = new DefaultPromise[A](timeout)
onComplete {
_.value.get match {
case Right(r)
@ -655,7 +676,7 @@ sealed trait Future[+T] {
case Some(_)
this
case None
val future = new DefaultPromise[T](timeoutInNanos, NANOS)
val future = new DefaultPromise[T](timeout)
onComplete { self
future complete {
self.value.get match {
@ -707,12 +728,12 @@ object Promise {
/**
* Creates a non-completed, new, Promise with the supplied timeout in milliseconds
*/
def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout)
def apply[A](timeout: Timeout): Promise[A] = new DefaultPromise[A](timeout)
/**
* Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
*/
def apply[A](): Promise[A] = apply(Actor.TIMEOUT)
def apply[A](): Promise[A] = apply(Timeout.default)
/**
* Construct a completable channel
@ -759,7 +780,7 @@ trait Promise[T] extends Future[T] {
final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any]) cont(complete(Right(value))) }
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = new DefaultPromise[Any](Actor.TIMEOUT)
val fr = new DefaultPromise[Any]()
this completeWith other onComplete { f
try {
fr completeWith cont(f)
@ -773,7 +794,7 @@ trait Promise[T] extends Future[T] {
}
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any](Actor.TIMEOUT)
val fr = Promise[Any]()
stream.dequeue(this).onComplete { f
try {
fr completeWith cont(f)
@ -791,14 +812,15 @@ trait Promise[T] extends Future[T] {
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
self
def this() = this(0, MILLIS)
def this() = this(Timeout.default)
def this(timeout: Long) = this(timeout, MILLIS)
def this(timeout: Long) = this(Timeout(timeout))
def this(timeout: Long, timeunit: TimeUnit) = this(Timeout(timeout, timeunit))
val timeoutInNanos = timeunit.toNanos(timeout)
private val _startTimeInNanos = currentTimeInNanos
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
@ -832,11 +854,20 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
def await = {
_lock.lock()
if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
try {
if (timeout.duration.isFinite) {
if (awaitUnsafe(timeLeft())) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
} else {
while (_value.isEmpty) { _signal.await }
this
}
} finally {
_lock.unlock
}
}
def isExpired: Boolean = timeLeft() <= 0
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
def value: Option[Either[Throwable, T]] = {
_lock.lock
@ -913,24 +944,26 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
}
def onTimeout(func: Future[T] Unit): this.type = {
_lock.lock
val runNow = try {
if (_value.isEmpty) {
if (!isExpired) {
val runnable = new Runnable {
def run() {
if (!isCompleted) func(self)
if (timeout.duration.isFinite) {
_lock.lock
val runNow = try {
if (_value.isEmpty) {
if (!isExpired) {
val runnable = new Runnable {
def run() {
if (!isCompleted) func(self)
}
}
}
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
false
} else true
} else false
} finally {
_lock.unlock
}
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
false
} else true
} else false
} finally {
_lock.unlock
}
if (runNow) func(this)
if (runNow) func(this)
}
this
}
@ -949,9 +982,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] {
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
}
class ActorPromise(timeout: Long, timeunit: TimeUnit) extends DefaultPromise[Any](timeout, timeunit) with ForwardableChannel {
def this() = this(0, MILLIS)
def this(timeout: Long) = this(timeout, MILLIS)
class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel {
def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message)
@ -970,7 +1001,7 @@ class ActorPromise(timeout: Long, timeunit: TimeUnit) extends DefaultPromise[Any
object ActorPromise {
def apply(f: Promise[Any]): ActorPromise =
new ActorPromise(f.timeoutInNanos, NANOS) {
new ActorPromise(f.timeout) {
completeWith(f)
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
override def sendException(ex: Throwable) = f completeWithException ex
@ -989,7 +1020,7 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise
def await(atMost: Duration): this.type = this
def await: this.type = this
def isExpired: Boolean = true
def timeoutInNanos: Long = 0
def timeout: Timeout = Timeout.zero
final def onTimeout(func: Future[T] Unit): this.type = this

View file

@ -90,7 +90,7 @@ trait MessageDispatcher {
dispatch(invocation)
}
private[akka] final def dispatchFuture[T](block: () T, timeout: Long): Future[T] = {
private[akka] final def dispatchFuture[T](block: () T, timeout: Actor.Timeout): Future[T] = {
futures.getAndIncrement()
try {
val future = new DefaultPromise[T](timeout)