diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 4763ab4f92..cf40212e35 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -291,15 +291,15 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd "firstCompletedOf" in { val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5)) - Futures.firstCompletedOf(futures).get must be(5) + Future.firstCompletedOf(futures).get must be(5) } "find" in { val futures = for (i ← 1 to 10) yield Future { i } - val result = Futures.find[Int](_ == 3)(futures) + val result = Future.find[Int](_ == 3)(futures) result.get must be(Some(3)) - val notFound = Futures.find[Int](_ == 11)(futures) + val notFound = Future.find[Int](_ == 11)(futures) notFound.get must be(None) } @@ -311,7 +311,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).get must be(45) + Future.fold(0, timeout)(futures)(_ + _).get must be(45) } "fold by composing" in { @@ -338,7 +338,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") + Future.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -346,7 +346,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd import scala.collection.mutable.ArrayBuffer def test(testNumber: Int) { val fs = (0 to 1000) map (i ⇒ Future(i, 10000)) - val result = Futures.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { + val result = Future.fold(ArrayBuffer.empty[AnyRef], 10000)(fs) { case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l }.get.asInstanceOf[ArrayBuffer[Int]].sum @@ -358,7 +358,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } "return zero value if folding empty list" in { - Futures.fold(0)(List[Future[Int]]())(_ + _).get must be(0) + Future.fold(0)(List[Future[Int]]())(_ + _).get must be(0) } "shouldReduceResults" in { @@ -369,7 +369,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).get === 45) + assert(Future.reduce(futures, timeout)(_ + _).get === 45) } "shouldReduceResultsWithException" in { @@ -386,13 +386,13 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") + assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected") } } "shouldReduceThrowIAEOnEmptyInput" in { filterException[IllegalArgumentException] { - intercept[UnsupportedOperationException] { Futures.reduce(List[Future[Int]]())(_ + _).get } + intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 1a801c6123..8b0c4db5fc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -64,45 +64,13 @@ object Futures { def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher, timeout) - /** - * Returns a Future to the result of the first future in the list that is completed - */ - def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { - val futureResult = new DefaultPromise[T](timeout) - - val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) - futures.foreach(_ onComplete completeFirst) - - futureResult - } - - /** - * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate - */ - def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { - if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) - else { - val result = new DefaultPromise[Option[T]](timeout) - val ref = new AtomicInteger(futures.size) - val search: Future[T] ⇒ Unit = f ⇒ try { - f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) - } finally { - if (ref.decrementAndGet == 0) - result completeWithResult None - } - futures.foreach(_ onComplete search) - - result - } - } - /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = { val pred: T ⇒ Boolean = predicate.apply(_) - find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) + Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_)) } /** @@ -110,59 +78,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] = - firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) - - /** - * A non-blocking fold over the specified futures. - * The fold is performed on the thread where the last future is completed, - * the result will be the first failure of any of the futures, or any failure in the actual fold, - * or the result of the fold. - * Example: - *
- * val result = Futures.fold(0)(futures)(_ + _).await.result - *- */ - def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { - if (futures.isEmpty) { - new KeptPromise[R](Right(zero)) - } else { - val result = new DefaultPromise[R](timeout) - val results = new ConcurrentLinkedQueue[T]() - val done = new Switch(false) - val allDone = futures.size - - val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? - f.value.get match { - case Right(value) ⇒ - val added = results add value - if (added && results.size == allDone) { //Only one thread can get here - if (done.switchOn) { - try { - val i = results.iterator - var currentValue = zero - while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } - result completeWithResult currentValue - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - result completeWithException e - } finally { - results.clear - } - } - } - case Left(exception) ⇒ - if (done.switchOn) { - result completeWithException exception - results.clear - } - } - } - - futures foreach { _ onComplete aggregate } - result - } - } + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout) /** * Java API @@ -172,50 +88,24 @@ object Futures { * or the result of the fold. */ def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = - fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) + Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _) def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun) def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun) - /** - * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first - * Example: - *
- * val result = Futures.reduce(futures)(_ + _).await.result - *- */ - def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { - if (futures.isEmpty) - new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) - else { - val result = new DefaultPromise[R](timeout) - val seedFound = new AtomicBoolean(false) - val seedFold: Future[T] ⇒ Unit = f ⇒ { - if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold - f.value.get match { - case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) - case Left(exception) ⇒ result.completeWithException(exception) - } - } - } - for (f ← futures) f onComplete seedFold //Attach the listener to the Futures - result - } - } - /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] = - reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _) def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun) /** * Java API. - * Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. + * Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]]. * Useful for reducing many Futures into a single Future. */ def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = @@ -298,6 +188,116 @@ object Future { def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = sequence(in)(cbf, timeout) + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = { + val futureResult = new DefaultPromise[T](timeout) + + val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) + futures.foreach(_ onComplete completeFirst) + + futureResult + } + + /** + * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate + */ + def find[T](predicate: T ⇒ Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = { + if (futures.isEmpty) new KeptPromise[Option[T]](Right(None)) + else { + val result = new DefaultPromise[Option[T]](timeout) + val ref = new AtomicInteger(futures.size) + val search: Future[T] ⇒ Unit = f ⇒ try { + f.result.filter(predicate).foreach(r ⇒ result completeWithResult Some(r)) + } finally { + if (ref.decrementAndGet == 0) + result completeWithResult None + } + futures.foreach(_ onComplete search) + + result + } + } + + /** + * A non-blocking fold over the specified futures. + * The fold is performed on the thread where the last future is completed, + * the result will be the first failure of any of the futures, or any failure in the actual fold, + * or the result of the fold. + * Example: + *
+ * val result = Futures.fold(0)(futures)(_ + _).await.result + *+ */ + def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { + if (futures.isEmpty) { + new KeptPromise[R](Right(zero)) + } else { + val result = new DefaultPromise[R](timeout) + val results = new ConcurrentLinkedQueue[T]() + val done = new Switch(false) + val allDone = futures.size + + val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? + f.value.get match { + case Right(value) ⇒ + val added = results add value + if (added && results.size == allDone) { //Only one thread can get here + if (done.switchOn) { + try { + val i = results.iterator + var currentValue = zero + while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } + result completeWithResult currentValue + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + result completeWithException e + } finally { + results.clear + } + } + } + case Left(exception) ⇒ + if (done.switchOn) { + result completeWithException exception + results.clear + } + } + } + + futures foreach { _ onComplete aggregate } + result + } + } + + /** + * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first + * Example: + *
+ * val result = Futures.reduce(futures)(_ + _).await.result + *+ */ + def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) ⇒ T): Future[R] = { + if (futures.isEmpty) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) + else { + val result = new DefaultPromise[R](timeout) + val seedFound = new AtomicBoolean(false) + val seedFold: Future[T] ⇒ Unit = f ⇒ { + if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold + f.value.get match { + case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) + case Left(exception) ⇒ result.completeWithException(exception) + } + } + } + for (f ← futures) f onComplete seedFold //Attach the listener to the Futures + result + } + } + /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 3e862b0950..2ef59edca1 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -574,5 +574,5 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { */ class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter { - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results) + protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) } diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 4ae489bd22..77dff1856c 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -199,7 +199,7 @@ Then there's a method that's called ``fold`` that takes a start-value, a sequenc val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.fold(0)(futures)(_ + _) + val futureSum = Future.fold(0)(futures)(_ + _) That's all it takes! @@ -210,7 +210,7 @@ If the sequence passed to ``fold`` is empty, it will return the start-value, in val futures = for(i <- 1 to 1000) yield Future(i * 2) // Create a sequence of Futures - val futureSum = Futures.reduce(futures)(_ + _) + val futureSum = Future.reduce(futures)(_ + _) Same as with ``fold``, the execution will be done by the Thread that completes the last of the Futures, you can also parallize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again.