diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 234392c0b2..f598e8dd32 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -147,6 +147,9 @@ object Actor extends ListenerManagement { object Timeout { def apply(timeout: Long) = new Timeout(timeout) def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) + implicit def default = defaultTimeout + val zero = new Timeout(Duration.Zero) + val never = new Timeout(Duration.Inf) implicit def durationToTimeout(duration: Duration) = new Timeout(duration) implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d50d6e855f..b7671df87c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,6 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.event.EventHandler import akka.actor.{ Actor, Channel, ForwardableChannel, NullChannel, UntypedChannel, ActorRef, Scheduler } +import akka.actor.Actor.Timeout import akka.util.{ Duration, BoxedType } import akka.japi.{ Procedure, Function ⇒ JFunc } @@ -32,6 +33,12 @@ object Futures { def future[T](body: Callable[T]): Future[T] = Future(body.call) + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T], timeout: Timeout): Future[T] = + Future(body.call, timeout) + /** * Java API, equivalent to Future.apply */ @@ -44,6 +51,12 @@ object Futures { def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + /** + * Java API, equivalent to Future.apply + */ + def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] = + Future(body.call, timeout)(dispatcher) + /** * Java API, equivalent to Future.apply */ @@ -53,7 +66,7 @@ object Futures { /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default): Future[T] = { val futureResult = new DefaultPromise[T](timeout) val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) @@ -66,7 +79,7 @@ object Futures { * Java API. * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Long): Future[T] = + def firstCompletedOf[T <: AnyRef](futures: java.lang.Iterable[Future[T]], timeout: Timeout): Future[T] = firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) /** @@ -79,7 +92,7 @@ object Futures { * val result = Futures.fold(0)(futures)(_ + _).await.result * */ - def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { + def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { if (futures.isEmpty) { new KeptPromise[R](Right(zero)) } else { @@ -121,9 +134,13 @@ object Futures { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun) + + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun) + /** * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first * Example: @@ -131,7 +148,7 @@ object Futures { * val result = Futures.reduce(futures)(_ + _).await.result * */ - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = { + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { if (futures.isEmpty) new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { @@ -156,15 +173,17 @@ object Futures { * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) + def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun) + /** * Java API. * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]], timeout: Long): Future[JIterable[A]] = + def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a @@ -176,7 +195,7 @@ object Futures { * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, Actor.TIMEOUT) + def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, Timeout.default) /** * Java API. @@ -184,7 +203,7 @@ object Futures { * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ - def traverse[A, B](in: JIterable[A], timeout: Long, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = + def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒ val fb = fn(a) for (r ← fr; b ← fb) yield { @@ -205,7 +224,7 @@ object Futures { * for (r <- fr; b <-fb) yield (r += b) * }.map(_.result) */ - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Actor.TIMEOUT, fn) + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Timeout.default, fn) } object Future { @@ -214,7 +233,7 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: ⇒ T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = + def apply[T](body: ⇒ T, timeout: Timeout = Timeout.default)(implicit dispatcher: MessageDispatcher): Future[T] = dispatcher.dispatchFuture(() ⇒ body, timeout) import scala.collection.mutable.Builder @@ -224,7 +243,7 @@ object Future { * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout = Timeout.default)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** @@ -235,7 +254,7 @@ object Future { * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x))) * */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = + def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout = Timeout.default)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) @@ -257,7 +276,7 @@ object Future { * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ - def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { + def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Timeout = Timeout.default): Future[A] = { val future = Promise[A](timeout) (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { _.exception match { @@ -338,10 +357,12 @@ sealed trait Future[+T] { */ def isExpired: Boolean + def timeout: Timeout + /** * This Future's timeout in nanoseconds. */ - def timeoutInNanos: Long + def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue /** * The contained value of this Future. Before this Future is completed @@ -443,7 +464,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this.asInstanceOf[Future[A]] case None ⇒ - val future = new DefaultPromise[A](timeoutInNanos, NANOS) + val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { self.value.get match { @@ -489,7 +510,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this.asInstanceOf[Future[A]] case None ⇒ - val future = new DefaultPromise[A](timeoutInNanos, NANOS) + val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { self.value.get match { @@ -533,7 +554,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this.asInstanceOf[Future[A]] case None ⇒ - val future = new DefaultPromise[A](timeoutInNanos, NANOS) + val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { self.value.get match { @@ -566,7 +587,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this.asInstanceOf[Future[A]] case None ⇒ - val fa = new DefaultPromise[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeout) onComplete { ft ⇒ fa complete (ft.value.get match { case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] @@ -607,7 +628,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this.asInstanceOf[Future[A]] case None ⇒ - val future = new DefaultPromise[A](timeoutInNanos, NANOS) + val future = new DefaultPromise[A](timeout) onComplete { _.value.get match { case Right(r) ⇒ @@ -655,7 +676,7 @@ sealed trait Future[+T] { case Some(_) ⇒ this case None ⇒ - val future = new DefaultPromise[T](timeoutInNanos, NANOS) + val future = new DefaultPromise[T](timeout) onComplete { self ⇒ future complete { self.value.get match { @@ -707,12 +728,12 @@ object Promise { /** * Creates a non-completed, new, Promise with the supplied timeout in milliseconds */ - def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout) + def apply[A](timeout: Timeout): Promise[A] = new DefaultPromise[A](timeout) /** * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) */ - def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + def apply[A](): Promise[A] = apply(Timeout.default) /** * Construct a completable channel @@ -759,7 +780,7 @@ trait Promise[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultPromise[Any](Actor.TIMEOUT) + val fr = new DefaultPromise[Any]() this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -773,7 +794,7 @@ trait Promise[T] extends Future[T] { } final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = Promise[Any](Actor.TIMEOUT) + val fr = Promise[Any]() stream.dequeue(this).onComplete { f ⇒ try { fr completeWith cont(f) @@ -791,14 +812,15 @@ trait Promise[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { +class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { self ⇒ - def this() = this(0, MILLIS) + def this() = this(Timeout.default) - def this(timeout: Long) = this(timeout, MILLIS) + def this(timeout: Long) = this(Timeout(timeout)) + + def this(timeout: Long, timeunit: TimeUnit) = this(Timeout(timeout, timeunit)) - val timeoutInNanos = timeunit.toNanos(timeout) private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock private val _signal = _lock.newCondition @@ -832,11 +854,20 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def await = { _lock.lock() - if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this - else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") + try { + if (timeout.duration.isFinite) { + if (awaitUnsafe(timeLeft())) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") + } else { + while (_value.isEmpty) { _signal.await } + this + } + } finally { + _lock.unlock + } } - def isExpired: Boolean = timeLeft() <= 0 + def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false def value: Option[Either[Throwable, T]] = { _lock.lock @@ -913,24 +944,26 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { } def onTimeout(func: Future[T] ⇒ Unit): this.type = { - _lock.lock - val runNow = try { - if (_value.isEmpty) { - if (!isExpired) { - val runnable = new Runnable { - def run() { - if (!isCompleted) func(self) + if (timeout.duration.isFinite) { + _lock.lock + val runNow = try { + if (_value.isEmpty) { + if (!isExpired) { + val runnable = new Runnable { + def run() { + if (!isCompleted) func(self) + } } - } - Scheduler.scheduleOnce(runnable, timeLeft, NANOS) - false - } else true - } else false - } finally { - _lock.unlock - } + Scheduler.scheduleOnce(runnable, timeLeft, NANOS) + false + } else true + } else false + } finally { + _lock.unlock + } - if (runNow) func(this) + if (runNow) func(this) + } this } @@ -949,9 +982,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) } -class ActorPromise(timeout: Long, timeunit: TimeUnit) extends DefaultPromise[Any](timeout, timeunit) with ForwardableChannel { - def this() = this(0, MILLIS) - def this(timeout: Long) = this(timeout, MILLIS) +class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel { def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message) @@ -970,7 +1001,7 @@ class ActorPromise(timeout: Long, timeunit: TimeUnit) extends DefaultPromise[Any object ActorPromise { def apply(f: Promise[Any]): ActorPromise = - new ActorPromise(f.timeoutInNanos, NANOS) { + new ActorPromise(f.timeout) { completeWith(f) override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message override def sendException(ex: Throwable) = f completeWithException ex @@ -989,7 +1020,7 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise def await(atMost: Duration): this.type = this def await: this.type = this def isExpired: Boolean = true - def timeoutInNanos: Long = 0 + def timeout: Timeout = Timeout.zero final def onTimeout(func: Future[T] ⇒ Unit): this.type = this diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index b5c50ea939..29ea723767 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -90,7 +90,7 @@ trait MessageDispatcher { dispatch(invocation) } - private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = { + private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Actor.Timeout): Future[T] = { futures.getAndIncrement() try { val future = new DefaultPromise[T](timeout)