Merge branch 'master' of https://github.com/jboner/akka
This commit is contained in:
commit
dd6430efc3
2 changed files with 58 additions and 0 deletions
|
|
@ -45,6 +45,12 @@ object Futures {
|
|||
future.get
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies the supplied function to the specified collection of Futures after awaiting each future to be completed
|
||||
*/
|
||||
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
||||
in map { f => fun(f.await) }
|
||||
|
||||
/*
|
||||
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
|
||||
import Actor.Sender.Self
|
||||
|
|
@ -83,6 +89,18 @@ sealed trait Future[T] {
|
|||
def timeoutInNanos: Long
|
||||
def result: Option[T]
|
||||
def exception: Option[Throwable]
|
||||
def map[O](f: (T) => O): Future[O] = {
|
||||
val wrapped = this
|
||||
new Future[O] {
|
||||
def await = { wrapped.await; this }
|
||||
def awaitBlocking = { wrapped.awaitBlocking; this }
|
||||
def isCompleted = wrapped.isCompleted
|
||||
def isExpired = wrapped.isExpired
|
||||
def timeoutInNanos = wrapped.timeoutInNanos
|
||||
def result: Option[O] = { wrapped.result map f }
|
||||
def exception: Option[Throwable] = wrapped.exception
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait CompletableFuture[T] extends Future[T] {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import org.scalatest.junit.JUnitSuite
|
|||
import org.junit.Test
|
||||
import se.scalablesolutions.akka.dispatch.Futures
|
||||
import Actor._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
|
||||
object FutureSpec {
|
||||
class TestActor extends Actor {
|
||||
|
|
@ -15,6 +16,18 @@ object FutureSpec {
|
|||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
|
||||
class TestDelayActor(await: StandardLatch) extends Actor {
|
||||
def receive = {
|
||||
case "Hello" =>
|
||||
await.await
|
||||
self.reply("World")
|
||||
case "NoReply" => { await.await }
|
||||
case "Failure" =>
|
||||
await.await
|
||||
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FutureSpec extends JUnitSuite {
|
||||
|
|
@ -103,4 +116,31 @@ class FutureSpec extends JUnitSuite {
|
|||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
|
||||
@Test def shouldFutureMapBeDeferred {
|
||||
val latch = new StandardLatch
|
||||
val actor1 = actorOf(new TestDelayActor(latch)).start
|
||||
|
||||
val mappedFuture = (actor1.!!).map(x => 5)
|
||||
assert(mappedFuture.isCompleted === false)
|
||||
assert(mappedFuture.isExpired === false)
|
||||
latch.open
|
||||
mappedFuture.await
|
||||
assert(mappedFuture.isCompleted === true)
|
||||
assert(mappedFuture.isExpired === false)
|
||||
assert(mappedFuture.result === Some(5))
|
||||
}
|
||||
|
||||
@Test def shouldFuturesAwaitMapHandleEmptySequence {
|
||||
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
|
||||
}
|
||||
|
||||
@Test def shouldFuturesAwaitMapHandleNonEmptySequence {
|
||||
val latches = (1 to 3) map (_ => new StandardLatch)
|
||||
val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start)
|
||||
val futures = actors map (actor => (actor.!!))
|
||||
latches foreach { _.open }
|
||||
|
||||
assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue