From 98e48249dfa128ce06cb291604170fdd8531ace3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 15 Oct 2010 17:24:15 +0200 Subject: [PATCH] Closing #456 --- .../src/main/scala/dispatch/Future.scala | 18 +++++++++ .../src/test/scala/dispatch/FutureSpec.scala | 40 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 3fd1c708e4..ea06ebb4ec 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -45,6 +45,12 @@ object Futures { future.get } + /** + * Applies the supplied function to the specified collection of Futures after awaiting each future to be completed + */ + def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = + in map { f => fun(f.await) } + /* def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { import Actor.Sender.Self @@ -83,6 +89,18 @@ sealed trait Future[T] { def timeoutInNanos: Long def result: Option[T] def exception: Option[Throwable] + def map[O](f: (T) => O): Future[O] = { + val wrapped = this + new Future[O] { + def await = { wrapped.await; this } + def awaitBlocking = { wrapped.awaitBlocking; this } + def isCompleted = wrapped.isCompleted + def isExpired = wrapped.isExpired + def timeoutInNanos = wrapped.timeoutInNanos + def result: Option[O] = { wrapped.result map f } + def exception: Option[Throwable] = wrapped.exception + } + } } trait CompletableFuture[T] extends Future[T] { diff --git a/akka-actor/src/test/scala/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/dispatch/FutureSpec.scala index f740763fdf..04316f8a3d 100644 --- a/akka-actor/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/dispatch/FutureSpec.scala @@ -4,6 +4,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import se.scalablesolutions.akka.dispatch.Futures import Actor._ +import org.multiverse.api.latches.StandardLatch object FutureSpec { class TestActor extends Actor { @@ -15,6 +16,18 @@ object FutureSpec { throw new RuntimeException("Expected exception; to test fault-tolerance") } } + + class TestDelayActor(await: StandardLatch) extends Actor { + def receive = { + case "Hello" => + await.await + self.reply("World") + case "NoReply" => { await.await } + case "Failure" => + await.await + throw new RuntimeException("Expected exception; to test fault-tolerance") + } + } } class FutureSpec extends JUnitSuite { @@ -103,4 +116,31 @@ class FutureSpec extends JUnitSuite { actor1.stop actor2.stop } + + @Test def shouldFutureMapBeDeferred { + val latch = new StandardLatch + val actor1 = actorOf(new TestDelayActor(latch)).start + + val mappedFuture = (actor1.!!![String]("Hello")).map(x => 5) + assert(mappedFuture.isCompleted === false) + assert(mappedFuture.isExpired === false) + latch.open + mappedFuture.await + assert(mappedFuture.isCompleted === true) + assert(mappedFuture.isExpired === false) + assert(mappedFuture.result === Some(5)) + } + + @Test def shouldFuturesAwaitMapHandleEmptySequence { + assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil) + } + + @Test def shouldFuturesAwaitMapHandleNonEmptySequence { + val latches = (1 to 3) map (_ => new StandardLatch) + val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start) + val futures = actors map (actor => (actor.!!![String]("Hello"))) + latches foreach { _.open } + + assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length)) + } }