2011-03-01 15:23:29 -07:00
|
|
|
package akka.dispatch
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import org.scalatest.junit.JUnitSuite
|
|
|
|
|
import org.junit.Test
|
2011-03-01 15:23:29 -07:00
|
|
|
import akka.actor.{ Actor, ActorRef }
|
2010-08-24 23:21:28 +02:00
|
|
|
import Actor._
|
2010-10-15 17:24:15 +02:00
|
|
|
import org.multiverse.api.latches.StandardLatch
|
2011-02-07 18:59:49 +01:00
|
|
|
import java.util.concurrent. {TimeUnit, CountDownLatch}
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
object FutureSpec {
|
|
|
|
|
class TestActor extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "Hello" =>
|
|
|
|
|
self.reply("World")
|
|
|
|
|
case "NoReply" => {}
|
|
|
|
|
case "Failure" =>
|
|
|
|
|
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-10-15 17:24:15 +02:00
|
|
|
|
|
|
|
|
class TestDelayActor(await: StandardLatch) extends Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case "Hello" =>
|
|
|
|
|
await.await
|
|
|
|
|
self.reply("World")
|
|
|
|
|
case "NoReply" => { await.await }
|
|
|
|
|
case "Failure" =>
|
|
|
|
|
await.await
|
|
|
|
|
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
2011-03-01 15:23:29 -07:00
|
|
|
class JavaFutureSpec extends JavaFutureTests with JUnitSuite
|
|
|
|
|
|
2010-08-24 23:21:28 +02:00
|
|
|
class FutureSpec extends JUnitSuite {
|
|
|
|
|
import FutureSpec._
|
|
|
|
|
|
|
|
|
|
@Test def shouldActorReplyResultThroughExplicitFuture {
|
|
|
|
|
val actor = actorOf[TestActor]
|
|
|
|
|
actor.start
|
|
|
|
|
val future = actor !!! "Hello"
|
|
|
|
|
future.await
|
|
|
|
|
assert(future.result.isDefined)
|
|
|
|
|
assert("World" === future.result.get)
|
|
|
|
|
actor.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldActorReplyExceptionThroughExplicitFuture {
|
|
|
|
|
val actor = actorOf[TestActor]
|
|
|
|
|
actor.start
|
|
|
|
|
val future = actor !!! "Failure"
|
|
|
|
|
future.await
|
|
|
|
|
assert(future.exception.isDefined)
|
|
|
|
|
assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage)
|
|
|
|
|
actor.stop
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-22 18:44:50 -07:00
|
|
|
@Test def shouldFutureCompose {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
|
2011-03-11 10:46:36 -07:00
|
|
|
val future1 = actor1 !!! "Hello" flatMap ((s: String) => actor2 !!! s)
|
|
|
|
|
val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String))
|
|
|
|
|
val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int))
|
|
|
|
|
assert(Some(Right("WORLD")) === future1.await.value)
|
|
|
|
|
assert(Some(Right("WORLD")) === future2.await.value)
|
|
|
|
|
intercept[ClassCastException] { future3.await.resultOrException }
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureComposePatternMatch {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
|
|
|
|
|
val future1 = actor1 !!! "Hello" collect { case (s: String) => s } flatMap (actor2 !!! _)
|
|
|
|
|
val future2 = actor1 !!! "Hello" collect { case (n: Int) => n } flatMap (actor2 !!! _)
|
2011-02-22 18:44:50 -07:00
|
|
|
assert(Some(Right("WORLD")) === future1.await.value)
|
2011-03-03 19:43:00 -07:00
|
|
|
intercept[MatchError] { future2.await.resultOrException }
|
2011-02-22 18:44:50 -07:00
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureForComprehension {
|
|
|
|
|
val actor = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case s: String => self reply s.length
|
|
|
|
|
case i: Int => self reply (i * 2).toString
|
|
|
|
|
}
|
|
|
|
|
}).start
|
|
|
|
|
|
|
|
|
|
val future1 = for {
|
|
|
|
|
a: Int <- actor !!! "Hello" // returns 5
|
|
|
|
|
b: String <- actor !!! a // returns "10"
|
|
|
|
|
c: String <- actor !!! 7 // returns "14"
|
|
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
val future2 = for {
|
|
|
|
|
a: Int <- actor !!! "Hello"
|
|
|
|
|
b: Int <- actor !!! a
|
|
|
|
|
c: String <- actor !!! 7
|
|
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
assert(Some(Right("10-14")) === future1.await.value)
|
2011-03-03 19:43:00 -07:00
|
|
|
intercept[ClassCastException] { future2.await.resultOrException }
|
2011-02-22 18:44:50 -07:00
|
|
|
actor.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureForComprehensionPatternMatch {
|
|
|
|
|
case class Req[T](req: T)
|
|
|
|
|
case class Res[T](res: T)
|
|
|
|
|
val actor = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case Req(s: String) => self reply Res(s.length)
|
|
|
|
|
case Req(i: Int) => self reply Res((i * 2).toString)
|
|
|
|
|
}
|
|
|
|
|
}).start
|
|
|
|
|
|
|
|
|
|
val future1 = for {
|
2011-03-11 10:46:36 -07:00
|
|
|
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
|
|
|
|
|
b <- actor !!! Req(a) collect { case Res(x: String) => x }
|
|
|
|
|
c <- actor !!! Req(7) collect { case Res(x: String) => x }
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
val future2 = for {
|
2011-03-11 10:46:36 -07:00
|
|
|
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
|
|
|
|
|
b <- actor !!! Req(a) collect { case Res(x: Int) => x }
|
|
|
|
|
c <- actor !!! Req(7) collect { case Res(x: String) => x }
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
assert(Some(Right("10-14")) === future1.await.value)
|
2011-03-03 19:43:00 -07:00
|
|
|
intercept[MatchError] { future2.await.resultOrException }
|
2011-02-22 18:44:50 -07:00
|
|
|
actor.stop
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-24 23:21:28 +02:00
|
|
|
// FIXME: implement Futures.awaitEither, and uncomment these two tests
|
|
|
|
|
@Test def shouldFutureAwaitEitherLeft = {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf[TestActor].start
|
|
|
|
|
val future1 = actor1 !!! "Hello"
|
|
|
|
|
val future2 = actor2 !!! "NoReply"
|
|
|
|
|
val result = Futures.awaitEither(future1, future2)
|
|
|
|
|
assert(result.isDefined)
|
|
|
|
|
assert("World" === result.get)
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureAwaitEitherRight = {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf[TestActor].start
|
|
|
|
|
val future1 = actor1 !!! "NoReply"
|
|
|
|
|
val future2 = actor2 !!! "Hello"
|
|
|
|
|
val result = Futures.awaitEither(future1, future2)
|
|
|
|
|
assert(result.isDefined)
|
|
|
|
|
assert("World" === result.get)
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
2010-11-12 14:04:06 +01:00
|
|
|
|
2010-08-24 23:21:28 +02:00
|
|
|
@Test def shouldFutureAwaitOneLeft = {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf[TestActor].start
|
|
|
|
|
val future1 = actor1 !!! "NoReply"
|
|
|
|
|
val future2 = actor2 !!! "Hello"
|
|
|
|
|
val result = Futures.awaitOne(List(future1, future2))
|
|
|
|
|
assert(result.result.isDefined)
|
|
|
|
|
assert("World" === result.result.get)
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureAwaitOneRight = {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf[TestActor].start
|
|
|
|
|
val future1 = actor1 !!! "Hello"
|
|
|
|
|
val future2 = actor2 !!! "NoReply"
|
|
|
|
|
val result = Futures.awaitOne(List(future1, future2))
|
|
|
|
|
assert(result.result.isDefined)
|
|
|
|
|
assert("World" === result.result.get)
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFutureAwaitAll = {
|
|
|
|
|
val actor1 = actorOf[TestActor].start
|
|
|
|
|
val actor2 = actorOf[TestActor].start
|
|
|
|
|
val future1 = actor1 !!! "Hello"
|
|
|
|
|
val future2 = actor2 !!! "Hello"
|
|
|
|
|
Futures.awaitAll(List(future1, future2))
|
|
|
|
|
assert(future1.result.isDefined)
|
|
|
|
|
assert("World" === future1.result.get)
|
|
|
|
|
assert(future2.result.isDefined)
|
|
|
|
|
assert("World" === future2.result.get)
|
|
|
|
|
actor1.stop
|
|
|
|
|
actor2.stop
|
|
|
|
|
}
|
2010-10-15 17:24:15 +02:00
|
|
|
|
|
|
|
|
@Test def shouldFuturesAwaitMapHandleEmptySequence {
|
|
|
|
|
assert(Futures.awaitMap[Nothing,Unit](Nil)(x => ()) === Nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldFuturesAwaitMapHandleNonEmptySequence {
|
|
|
|
|
val latches = (1 to 3) map (_ => new StandardLatch)
|
|
|
|
|
val actors = latches map (latch => actorOf(new TestDelayActor(latch)).start)
|
|
|
|
|
val futures = actors map (actor => (actor.!!))
|
|
|
|
|
latches foreach { _.open }
|
|
|
|
|
|
|
|
|
|
assert(Futures.awaitMap(futures)(_.result.map(_.length).getOrElse(0)).sum === (latches.size * "World".length))
|
|
|
|
|
}
|
2011-01-24 13:31:07 +01:00
|
|
|
|
|
|
|
|
@Test def shouldFoldResults {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add }
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!) }
|
|
|
|
|
assert(Futures.fold(0)(futures)(_ + _).awaitBlocking.result.get === 45)
|
|
|
|
|
}
|
|
|
|
|
|
2011-02-22 18:57:25 -07:00
|
|
|
@Test def shouldFoldResultsByComposing {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add }
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!) }
|
2011-02-28 16:29:03 -07:00
|
|
|
assert(futures.foldLeft(Future(0))((fr, fa) => for (r <- fr; a <- fa) yield (r + a)).awaitBlocking.result.get === 45)
|
2011-02-22 18:57:25 -07:00
|
|
|
}
|
|
|
|
|
|
2011-01-24 13:31:07 +01:00
|
|
|
@Test def shouldFoldResultsWithException {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case (add: Int, wait: Int) =>
|
|
|
|
|
Thread.sleep(wait)
|
|
|
|
|
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
|
|
|
|
|
self reply_? add
|
|
|
|
|
}
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!) }
|
|
|
|
|
assert(Futures.fold(0)(futures)(_ + _).awaitBlocking.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
|
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
|
2011-01-24 17:42:56 +01:00
|
|
|
@Test def shouldFoldReturnZeroOnEmptyInput {
|
|
|
|
|
assert(Futures.fold(0)(List[Future[Int]]())(_ + _).awaitBlocking.result.get === 0)
|
|
|
|
|
}
|
|
|
|
|
|
2011-01-24 16:37:08 +01:00
|
|
|
@Test def shouldReduceResults {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add }
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!) }
|
|
|
|
|
assert(Futures.reduce(futures)(_ + _).awaitBlocking.result.get === 45)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test def shouldReduceResultsWithException {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = {
|
|
|
|
|
case (add: Int, wait: Int) =>
|
|
|
|
|
Thread.sleep(wait)
|
|
|
|
|
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
|
|
|
|
|
self reply_? add
|
|
|
|
|
}
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!!) }
|
|
|
|
|
assert(Futures.reduce(futures)(_ + _).awaitBlocking.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
|
|
|
|
}
|
2011-01-24 17:42:56 +01:00
|
|
|
|
|
|
|
|
@Test(expected = classOf[UnsupportedOperationException]) def shouldReduceThrowIAEOnEmptyInput {
|
|
|
|
|
Futures.reduce(List[Future[Int]]())(_ + _).await.resultOrException
|
|
|
|
|
}
|
2011-02-07 18:59:49 +01:00
|
|
|
|
|
|
|
|
@Test def resultWithinShouldNotThrowExceptions {
|
|
|
|
|
val actors = (1 to 10).toList map { _ =>
|
|
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = { case (add: Int, wait: Int) => Thread.sleep(wait); self reply_? add }
|
|
|
|
|
}).start
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) => actor.!! 5000 else 0 )) }
|
|
|
|
|
val result = for(f <- futures) yield f.resultWithin(2, TimeUnit.SECONDS)
|
2011-02-11 14:46:39 -07:00
|
|
|
val done = result collect { case Some(Right(x)) => x }
|
2011-02-07 18:59:49 +01:00
|
|
|
val undone = result collect { case None => None }
|
2011-02-11 14:46:39 -07:00
|
|
|
val errors = result collect { case Some(Left(t)) => t }
|
2011-02-07 18:59:49 +01:00
|
|
|
assert(done.size === 5)
|
|
|
|
|
assert(undone.size === 5)
|
|
|
|
|
assert(errors.size === 0)
|
|
|
|
|
}
|
2011-02-13 21:11:37 -07:00
|
|
|
|
|
|
|
|
@Test def receiveShouldExecuteOnComplete {
|
|
|
|
|
val latch = new StandardLatch
|
|
|
|
|
val actor = actorOf[TestActor].start
|
|
|
|
|
actor !!! "Hello" receive { case "World" => latch.open }
|
|
|
|
|
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
|
|
|
|
actor.stop
|
|
|
|
|
}
|
2011-03-20 17:15:23 -06:00
|
|
|
|
|
|
|
|
@Test def shouldHandleThrowables {
|
|
|
|
|
class ThrowableTest(m: String) extends Throwable(m)
|
|
|
|
|
|
|
|
|
|
val f1 = Future { throw new ThrowableTest("test") }
|
|
|
|
|
f1.await
|
|
|
|
|
intercept[ThrowableTest] { f1.resultOrException }
|
|
|
|
|
|
|
|
|
|
val latch = new StandardLatch
|
|
|
|
|
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
|
|
|
|
|
f2 foreach (_ => throw new ThrowableTest("dispatcher foreach"))
|
|
|
|
|
f2 receive { case _ => throw new ThrowableTest("dispatcher receive") }
|
|
|
|
|
val f3 = f2 map (s => s.toUpperCase)
|
|
|
|
|
latch.open
|
|
|
|
|
f2.await
|
|
|
|
|
assert(f2.resultOrException === Some("success"))
|
|
|
|
|
f2 foreach (_ => throw new ThrowableTest("current thread foreach"))
|
|
|
|
|
f2 receive { case _ => throw new ThrowableTest("current thread receive") }
|
|
|
|
|
f3.await
|
|
|
|
|
assert(f3.resultOrException === Some("SUCCESS"))
|
|
|
|
|
|
|
|
|
|
// make sure all futures are completed in dispatcher
|
|
|
|
|
assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0)
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|