diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index b1147affc8..ef951a90d0 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -404,7 +404,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd latch.open assert(f2.get === 10) - val f3 = Future({ Thread.sleep(100); 5 }, 10) + val f3 = Future({ Thread.sleep(10); 5 }, 10) intercept[FutureTimeoutException] { f3.get } @@ -431,7 +431,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd val x = Future("Hello") val y = x map (_.length) - val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 100) + val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) intercept[java.lang.ArithmeticException](r.get) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 7bd9867edb..68fa9daf3a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -55,13 +55,13 @@ object Futures { * Java API, equivalent to Future.apply */ def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] = - Future(body.call, timeout)(dispatcher) + Future(body.call)(dispatcher, timeout) /** * Java API, equivalent to Future.apply */ def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = - Future(body.call, timeout)(dispatcher) + Future(body.call)(dispatcher, timeout) /** * Returns a Future to the result of the first future in the list that is completed @@ -111,7 +111,8 @@ object Futures { case e: Exception ⇒ EventHandler.error(e, this, e.getMessage) result completeWithException e - } finally { + } + finally { results.clear } } @@ -232,9 +233,15 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: ⇒ T, timeout: Timeout = Timeout.default)(implicit dispatcher: MessageDispatcher): Future[T] = + def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher, timeout: Timeout = implicitly): Future[T] = dispatcher.dispatchFuture(() ⇒ body, timeout) + def apply[T](body: ⇒ T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] = + apply(body)(dispatcher, timeout) + + def apply[T](body: ⇒ T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] = + apply(body)(dispatcher, timeout) + import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -242,8 +249,11 @@ object Future { * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout = Timeout.default)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout): Future[M[A]] = + in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + + def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = + sequence(in)(cbf, timeout) /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. @@ -253,12 +263,15 @@ object Future { * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x))) * */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout = Timeout.default)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout): Future[M[B]] = + in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) + def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = + traverse(in)(fn)(cbf, timeout) + /** * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited * Continuations plugin. @@ -275,7 +288,7 @@ object Future { * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ - def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Timeout = Timeout.default): Future[A] = { + def flow[A](body: ⇒ A @cps[Future[Any]])(implicit timeout: Timeout): Future[A] = { val future = Promise[A](timeout) (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { _.exception match { @@ -450,7 +463,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def collect[A](pf: PartialFunction[Any, A]): Future[A] = value match { + final def collect[A](pf: PartialFunction[Any, A])(implicit timeout: Timeout): Future[A] = value match { case Some(Right(r)) ⇒ new KeptPromise[A](try { if (pf isDefinedAt r) @@ -496,7 +509,7 @@ sealed trait Future[+T] { * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * */ - final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = value match { + final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = value match { case Some(Left(e)) ⇒ try { if (pf isDefinedAt e) @@ -543,7 +556,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def map[A](f: T ⇒ A): Future[A] = value match { + final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = value match { case Some(Right(r)) ⇒ new KeptPromise[A](try { Right(f(r)) @@ -578,7 +591,7 @@ sealed trait Future[+T] { * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. */ - final def mapTo[A](implicit m: Manifest[A]): Future[A] = value match { + final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = value match { case Some(Right(t)) ⇒ new KeptPromise(try { Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) @@ -617,7 +630,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def flatMap[A](f: T ⇒ Future[A]): Future[A] = value match { + final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = value match { case Some(Right(r)) ⇒ try { f(r) @@ -655,14 +668,14 @@ sealed trait Future[+T] { final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) - final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { + final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) { def foreach(f: A ⇒ Unit): Unit = self filter p foreach f def map[B](f: A ⇒ B): Future[B] = self filter p map f def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - final def filter(p: T ⇒ Boolean): Future[T] = value match { + final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = value match { case Some(Right(r)) ⇒ try { if (p(r))