2011-03-01 15:23:29 -07:00
|
|
|
package akka.dispatch
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import org.scalatest.junit.JUnitSuite
|
|
|
|
|
import org.junit.Test
|
2011-03-01 15:23:29 -07:00
|
|
|
import akka.actor.{ Actor, ActorRef }
|
2010-08-24 23:21:28 +02:00
|
|
|
import Actor._
|
2010-10-15 17:24:15 +02:00
|
|
|
import org.multiverse.api.latches.StandardLatch
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
object FutureSpec {
|
|
|
|
|
class TestActor extends Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case "Hello" ⇒
|
2010-08-24 23:21:28 +02:00
|
|
|
self.reply("World")
|
2011-05-18 17:25:30 +02:00
|
|
|
case "NoReply" ⇒ {}
|
|
|
|
|
case "Failure" ⇒
|
2010-08-24 23:21:28 +02:00
|
|
|
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-10-15 17:24:15 +02:00
|
|
|
|
|
|
|
|
class TestDelayActor(await: StandardLatch) extends Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case "Hello" ⇒
|
2010-10-15 17:24:15 +02:00
|
|
|
await.await
|
|
|
|
|
self.reply("World")
|
2011-05-18 17:25:30 +02:00
|
|
|
case "NoReply" ⇒ { await.await }
|
|
|
|
|
case "Failure" ⇒
|
2010-10-15 17:24:15 +02:00
|
|
|
await.await
|
|
|
|
|
throw new RuntimeException("Expected exception; to test fault-tolerance")
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
2011-03-01 15:23:29 -07:00
|
|
|
class JavaFutureSpec extends JavaFutureTests with JUnitSuite
|
|
|
|
|
|
2010-08-24 23:21:28 +02:00
|
|
|
class FutureSpec extends JUnitSuite {
|
|
|
|
|
import FutureSpec._
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldActorReplyResultThroughExplicitFuture {
|
2010-08-24 23:21:28 +02:00
|
|
|
val actor = actorOf[TestActor]
|
2011-04-12 09:55:32 +02:00
|
|
|
actor.start()
|
2011-06-13 13:43:21 +02:00
|
|
|
val future = actor ? "Hello"
|
2010-08-24 23:21:28 +02:00
|
|
|
future.await
|
|
|
|
|
assert(future.result.isDefined)
|
|
|
|
|
assert("World" === future.result.get)
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldActorReplyExceptionThroughExplicitFuture {
|
2010-08-24 23:21:28 +02:00
|
|
|
val actor = actorOf[TestActor]
|
2011-04-12 09:55:32 +02:00
|
|
|
actor.start()
|
2011-06-13 13:43:21 +02:00
|
|
|
val future = actor ? "Failure"
|
2010-08-24 23:21:28 +02:00
|
|
|
future.await
|
|
|
|
|
assert(future.exception.isDefined)
|
|
|
|
|
assert("Expected exception; to test fault-tolerance" === future.exception.get.getMessage)
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFutureCompose {
|
2011-04-12 09:55:32 +02:00
|
|
|
val actor1 = actorOf[TestActor].start()
|
2011-05-18 17:25:30 +02:00
|
|
|
val actor2 = actorOf(new Actor { def receive = { case s: String ⇒ self reply s.toUpperCase } }).start()
|
2011-06-13 22:31:06 -06:00
|
|
|
val future1 = actor1 ? "Hello" flatMap { _ match { case s: String ⇒ actor2 ? s } }
|
|
|
|
|
val future2 = actor1 ? "Hello" flatMap { _ match { case i: Int ⇒ actor2 ? i } }
|
2011-04-25 14:55:49 -06:00
|
|
|
assert((future1.get: Any) === "WORLD")
|
2011-04-24 16:05:28 -06:00
|
|
|
intercept[MatchError] { future2.get }
|
2011-04-12 10:53:56 +02:00
|
|
|
actor1.stop()
|
|
|
|
|
actor2.stop()
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFutureForComprehension {
|
2011-02-22 18:44:50 -07:00
|
|
|
val actor = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case s: String ⇒ self reply s.length
|
|
|
|
|
case i: Int ⇒ self reply (i * 2).toString
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-02-22 18:44:50 -07:00
|
|
|
|
2011-06-13 13:43:21 +02:00
|
|
|
val future0 = actor ? "Hello"
|
2011-04-02 15:05:47 -06:00
|
|
|
|
2011-02-22 18:44:50 -07:00
|
|
|
val future1 = for {
|
2011-06-14 00:19:54 +02:00
|
|
|
a: Int ← future0.mapTo[Int] // returns 5
|
|
|
|
|
b: String ← (actor ? a).mapTo[String] // returns "10"
|
|
|
|
|
c: String ← (actor ? 7).mapTo[String] // returns "14"
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
val future2 = for {
|
2011-06-14 00:19:54 +02:00
|
|
|
a: Int ← future0.mapTo[Int]
|
|
|
|
|
b: Int ← (actor ? a).mapTo[Int]
|
|
|
|
|
c: String ← (actor ? 7).mapTo[String]
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
2011-04-24 16:05:28 -06:00
|
|
|
assert(future1.get === "10-14")
|
2011-04-25 14:55:49 -06:00
|
|
|
intercept[ClassCastException] { future2.get }
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFutureForComprehensionPatternMatch {
|
2011-02-22 18:44:50 -07:00
|
|
|
case class Req[T](req: T)
|
|
|
|
|
case class Res[T](res: T)
|
|
|
|
|
val actor = actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case Req(s: String) ⇒ self reply Res(s.length)
|
|
|
|
|
case Req(i: Int) ⇒ self reply Res((i * 2).toString)
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-02-22 18:44:50 -07:00
|
|
|
|
|
|
|
|
val future1 = for {
|
2011-06-13 20:25:31 -06:00
|
|
|
Res(a: Int) ← actor ? Req("Hello")
|
|
|
|
|
Res(b: String) ← actor ? Req(a)
|
|
|
|
|
Res(c: String) ← actor ? Req(7)
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
|
|
|
|
val future2 = for {
|
2011-06-13 20:25:31 -06:00
|
|
|
Res(a: Int) ← actor ? Req("Hello")
|
|
|
|
|
Res(b: Int) ← actor ? Req(a)
|
|
|
|
|
Res(c: Int) ← actor ? Req(7)
|
2011-02-22 18:44:50 -07:00
|
|
|
} yield b + "-" + c
|
|
|
|
|
|
2011-04-24 16:05:28 -06:00
|
|
|
assert(future1.get === "10-14")
|
|
|
|
|
intercept[MatchError] { future2.get }
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldMapMatchedExceptionsToResult {
|
2011-04-25 16:14:07 -06:00
|
|
|
val future1 = Future(5)
|
|
|
|
|
val future2 = future1 map (_ / 0)
|
|
|
|
|
val future3 = future2 map (_.toString)
|
|
|
|
|
|
2011-06-02 13:33:49 -07:00
|
|
|
val future4 = future1 recover {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: ArithmeticException ⇒ 0
|
2011-04-25 16:14:07 -06:00
|
|
|
} map (_.toString)
|
|
|
|
|
|
2011-06-02 13:33:49 -07:00
|
|
|
val future5 = future2 recover {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: ArithmeticException ⇒ 0
|
2011-04-25 16:14:07 -06:00
|
|
|
} map (_.toString)
|
|
|
|
|
|
2011-06-02 13:33:49 -07:00
|
|
|
val future6 = future2 recover {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: MatchError ⇒ 0
|
2011-04-25 16:14:07 -06:00
|
|
|
} map (_.toString)
|
|
|
|
|
|
2011-06-02 13:33:49 -07:00
|
|
|
val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" }
|
2011-04-25 16:14:07 -06:00
|
|
|
|
|
|
|
|
val actor = actorOf[TestActor].start()
|
|
|
|
|
|
2011-06-13 13:43:21 +02:00
|
|
|
val future8 = actor ? "Failure"
|
|
|
|
|
val future9 = actor ? "Failure" recover {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: RuntimeException ⇒ "FAIL!"
|
2011-04-25 16:14:07 -06:00
|
|
|
}
|
2011-06-13 13:43:21 +02:00
|
|
|
val future10 = actor ? "Hello" recover {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: RuntimeException ⇒ "FAIL!"
|
2011-04-25 16:14:07 -06:00
|
|
|
}
|
2011-06-13 13:43:21 +02:00
|
|
|
val future11 = actor ? "Failure" recover { case _ ⇒ "Oops!" }
|
2011-04-25 16:14:07 -06:00
|
|
|
|
|
|
|
|
assert(future1.get === 5)
|
|
|
|
|
intercept[ArithmeticException] { future2.get }
|
|
|
|
|
intercept[ArithmeticException] { future3.get }
|
|
|
|
|
assert(future4.get === "5")
|
|
|
|
|
assert(future5.get === "0")
|
|
|
|
|
intercept[ArithmeticException] { future6.get }
|
|
|
|
|
assert(future7.get === "You got ERROR")
|
|
|
|
|
intercept[RuntimeException] { future8.get }
|
|
|
|
|
assert(future9.get === "FAIL!")
|
|
|
|
|
assert(future10.get === "World")
|
|
|
|
|
assert(future11.get === "Oops!")
|
|
|
|
|
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2011-02-22 18:44:50 -07:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFoldResults {
|
|
|
|
|
val actors = (1 to 10).toList map { _ ⇒
|
2011-01-24 13:31:07 +01:00
|
|
|
actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-01-24 13:31:07 +01:00
|
|
|
}
|
2011-04-28 16:23:03 +02:00
|
|
|
val timeout = 10000
|
2011-06-14 14:26:13 +02:00
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
2011-04-28 16:23:03 +02:00
|
|
|
assert(Futures.fold(0, timeout)(futures)(_ + _).await.result.get === 45)
|
2011-01-24 13:31:07 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFoldResultsByComposing {
|
|
|
|
|
val actors = (1 to 10).toList map { _ ⇒
|
2011-02-22 18:57:25 -07:00
|
|
|
actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-02-22 18:57:25 -07:00
|
|
|
}
|
2011-06-14 14:26:13 +02:00
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), 10000).mapTo[Int] }
|
2011-05-18 17:25:30 +02:00
|
|
|
assert(futures.foldLeft(Future(0))((fr, fa) ⇒ for (r ← fr; a ← fa) yield (r + a)).get === 45)
|
2011-02-22 18:57:25 -07:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFoldResultsWithException {
|
|
|
|
|
val actors = (1 to 10).toList map { _ ⇒
|
2011-01-24 13:31:07 +01:00
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case (add: Int, wait: Int) ⇒
|
2011-01-24 13:31:07 +01:00
|
|
|
Thread.sleep(wait)
|
|
|
|
|
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
|
|
|
|
|
self reply_? add
|
|
|
|
|
}
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-01-24 13:31:07 +01:00
|
|
|
}
|
2011-04-28 16:23:03 +02:00
|
|
|
val timeout = 10000
|
2011-06-14 14:26:13 +02:00
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
2011-04-28 16:23:03 +02:00
|
|
|
assert(Futures.fold(0, timeout)(futures)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
2011-01-24 13:31:07 +01:00
|
|
|
}
|
2011-01-24 16:37:08 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldFoldReturnZeroOnEmptyInput {
|
2011-04-28 16:23:03 +02:00
|
|
|
assert(Futures.fold(0)(List[Future[Int]]())(_ + _).get === 0)
|
2011-01-24 17:42:56 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldReduceResults {
|
|
|
|
|
val actors = (1 to 10).toList map { _ ⇒
|
2011-01-24 16:37:08 +01:00
|
|
|
actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case (add: Int, wait: Int) ⇒ Thread.sleep(wait); self reply_? add }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
2011-04-28 16:23:03 +02:00
|
|
|
val timeout = 10000
|
2011-06-14 14:26:13 +02:00
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] }
|
2011-04-28 16:23:03 +02:00
|
|
|
assert(Futures.reduce(futures, timeout)(_ + _).get === 45)
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldReduceResultsWithException {
|
|
|
|
|
val actors = (1 to 10).toList map { _ ⇒
|
2011-01-24 16:37:08 +01:00
|
|
|
actorOf(new Actor {
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case (add: Int, wait: Int) ⇒
|
2011-01-24 16:37:08 +01:00
|
|
|
Thread.sleep(wait)
|
|
|
|
|
if (add == 6) throw new IllegalArgumentException("shouldFoldResultsWithException: expected")
|
|
|
|
|
self reply_? add
|
|
|
|
|
}
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
2011-04-28 16:23:03 +02:00
|
|
|
val timeout = 10000
|
2011-06-14 14:26:13 +02:00
|
|
|
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] }
|
2011-04-28 16:23:03 +02:00
|
|
|
assert(Futures.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
|
2011-01-24 16:37:08 +01:00
|
|
|
}
|
2011-01-24 17:42:56 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test(expected = classOf[UnsupportedOperationException])
|
|
|
|
|
def shouldReduceThrowIAEOnEmptyInput {
|
2011-01-24 17:42:56 +01:00
|
|
|
Futures.reduce(List[Future[Int]]())(_ + _).await.resultOrException
|
|
|
|
|
}
|
2011-02-07 18:59:49 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def receiveShouldExecuteOnComplete {
|
2011-02-13 21:11:37 -07:00
|
|
|
val latch = new StandardLatch
|
2011-04-12 09:55:32 +02:00
|
|
|
val actor = actorOf[TestActor].start()
|
2011-06-13 13:43:21 +02:00
|
|
|
actor ? "Hello" onResult { case "World" ⇒ latch.open }
|
2011-02-13 21:11:37 -07:00
|
|
|
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
2011-04-12 10:53:56 +02:00
|
|
|
actor.stop()
|
2011-02-13 21:11:37 -07:00
|
|
|
}
|
2011-03-20 17:15:23 -06:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldTraverseFutures {
|
2011-04-02 15:40:28 -06:00
|
|
|
val oddActor = actorOf(new Actor {
|
|
|
|
|
var counter = 1
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case 'GetNext ⇒
|
2011-04-02 15:40:28 -06:00
|
|
|
self reply counter
|
|
|
|
|
counter += 2
|
|
|
|
|
}
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2011-04-02 15:40:28 -06:00
|
|
|
|
2011-06-14 00:19:54 +02:00
|
|
|
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
|
2011-04-15 13:09:53 -06:00
|
|
|
assert(Future.sequence(oddFutures).get.sum === 10000)
|
2011-04-12 10:53:56 +02:00
|
|
|
oddActor.stop()
|
2011-04-02 15:40:28 -06:00
|
|
|
|
|
|
|
|
val list = (1 to 100).toList
|
2011-05-18 17:25:30 +02:00
|
|
|
assert(Future.traverse(list)(x ⇒ Future(x * 2 - 1)).get.sum === 10000)
|
2011-04-02 15:40:28 -06:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldHandleThrowables {
|
2011-03-20 17:15:23 -06:00
|
|
|
class ThrowableTest(m: String) extends Throwable(m)
|
|
|
|
|
|
|
|
|
|
val f1 = Future { throw new ThrowableTest("test") }
|
|
|
|
|
f1.await
|
|
|
|
|
intercept[ThrowableTest] { f1.resultOrException }
|
|
|
|
|
|
|
|
|
|
val latch = new StandardLatch
|
|
|
|
|
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
|
2011-05-18 17:25:30 +02:00
|
|
|
f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach"))
|
2011-06-02 13:33:49 -07:00
|
|
|
f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") }
|
2011-05-18 17:25:30 +02:00
|
|
|
val f3 = f2 map (s ⇒ s.toUpperCase)
|
2011-03-20 17:15:23 -06:00
|
|
|
latch.open
|
|
|
|
|
f2.await
|
|
|
|
|
assert(f2.resultOrException === Some("success"))
|
2011-05-18 17:25:30 +02:00
|
|
|
f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach"))
|
2011-06-02 13:33:49 -07:00
|
|
|
f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") }
|
2011-03-20 17:15:23 -06:00
|
|
|
f3.await
|
|
|
|
|
assert(f3.resultOrException === Some("SUCCESS"))
|
|
|
|
|
|
|
|
|
|
// make sure all futures are completed in dispatcher
|
2011-04-27 20:45:39 -06:00
|
|
|
assert(Dispatchers.defaultGlobalDispatcher.pendingFutures === 0)
|
2011-03-20 17:15:23 -06:00
|
|
|
}
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldBlockUntilResult {
|
2011-03-22 22:12:16 +01:00
|
|
|
val latch = new StandardLatch
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val f = Future({ latch.await; 5 })
|
2011-04-21 15:12:47 -06:00
|
|
|
val f2 = Future({ f.get + 5 })
|
2011-03-22 22:12:16 +01:00
|
|
|
|
|
|
|
|
assert(f2.resultOrException === None)
|
|
|
|
|
latch.open
|
2011-04-21 15:12:47 -06:00
|
|
|
assert(f2.get === 10)
|
2011-03-22 22:20:32 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val f3 = Future({ Thread.sleep(100); 5 }, 10)
|
2011-03-22 22:20:32 +01:00
|
|
|
intercept[FutureTimeoutException] {
|
2011-04-21 15:12:47 -06:00
|
|
|
f3.get
|
2011-03-22 22:20:32 +01:00
|
|
|
}
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureComposingWithContinuations {
|
2011-04-21 15:12:47 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
val actor = actorOf[TestActor].start
|
|
|
|
|
|
|
|
|
|
val x = Future("Hello")
|
2011-06-14 00:19:54 +02:00
|
|
|
val y = x flatMap (actor ? _) mapTo manifest[String]
|
2011-04-21 15:12:47 -06:00
|
|
|
|
2011-06-14 00:19:54 +02:00
|
|
|
val r = flow(x() + " " + y() + "!")
|
2011-04-21 15:12:47 -06:00
|
|
|
|
|
|
|
|
assert(r.get === "Hello World!")
|
|
|
|
|
|
|
|
|
|
actor.stop
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureComposingWithContinuationsFailureDivideZero {
|
2011-04-21 15:12:47 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
val x = Future("Hello")
|
|
|
|
|
val y = x map (_.length)
|
|
|
|
|
|
2011-07-05 18:44:10 +02:00
|
|
|
val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 200)
|
2011-04-21 15:12:47 -06:00
|
|
|
|
|
|
|
|
intercept[java.lang.ArithmeticException](r.get)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureComposingWithContinuationsFailureCastInt {
|
2011-04-21 15:12:47 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
val actor = actorOf[TestActor].start
|
|
|
|
|
|
|
|
|
|
val x = Future(3)
|
2011-06-14 00:19:54 +02:00
|
|
|
val y = (actor ? "Hello").mapTo[Int]
|
2011-03-22 22:12:16 +01:00
|
|
|
|
2011-07-05 18:44:10 +02:00
|
|
|
val r = flow(x() + y(), 200)
|
2011-04-21 15:12:47 -06:00
|
|
|
|
|
|
|
|
intercept[ClassCastException](r.get)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureComposingWithContinuationsFailureCastNothing {
|
2011-04-21 15:12:47 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
val actor = actorOf[TestActor].start
|
|
|
|
|
|
|
|
|
|
val x = Future("Hello")
|
2011-06-14 00:19:54 +02:00
|
|
|
val y = actor ? "Hello" mapTo manifest[Nothing]
|
2011-04-21 15:12:47 -06:00
|
|
|
|
|
|
|
|
val r = flow(x() + y())
|
|
|
|
|
|
|
|
|
|
intercept[ClassCastException](r.get)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureCompletingWithContinuations {
|
2011-04-21 15:12:47 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
2011-05-03 18:52:58 -06:00
|
|
|
val x, y, z = Promise[Int]()
|
2011-04-21 15:12:47 -06:00
|
|
|
val ly, lz = new StandardLatch
|
|
|
|
|
|
|
|
|
|
val result = flow {
|
|
|
|
|
y completeWith x
|
|
|
|
|
ly.open // not within continuation
|
|
|
|
|
|
|
|
|
|
z << x
|
|
|
|
|
lz.open // within continuation, will wait for 'z' to complete
|
|
|
|
|
z() + y()
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
|
|
|
|
|
2011-04-21 15:12:47 -06:00
|
|
|
assert(ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
|
|
|
|
|
assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
|
|
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { x << 5 }
|
2011-04-21 15:12:47 -06:00
|
|
|
|
|
|
|
|
assert(y.get === 5)
|
|
|
|
|
assert(z.get === 5)
|
|
|
|
|
assert(lz.isOpen)
|
|
|
|
|
assert(result.get === 10)
|
2011-04-23 09:14:20 -06:00
|
|
|
|
2011-05-03 18:52:58 -06:00
|
|
|
val a, b, c = Promise[Int]()
|
2011-04-23 09:14:20 -06:00
|
|
|
|
|
|
|
|
val result2 = flow {
|
2011-04-23 09:21:04 -06:00
|
|
|
val n = (a << c).result.get + 10
|
|
|
|
|
b << (c() - 2)
|
|
|
|
|
a() + n * b()
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
2011-04-23 09:14:20 -06:00
|
|
|
|
|
|
|
|
c completeWith Future(5)
|
|
|
|
|
|
2011-04-23 09:21:04 -06:00
|
|
|
assert(a.get === 5)
|
2011-04-23 09:14:20 -06:00
|
|
|
assert(b.get === 3)
|
2011-04-23 09:21:04 -06:00
|
|
|
assert(result2.get === 50)
|
2011-04-27 20:45:39 -06:00
|
|
|
Thread.sleep(100)
|
|
|
|
|
|
|
|
|
|
// make sure all futures are completed in dispatcher
|
|
|
|
|
assert(Dispatchers.defaultGlobalDispatcher.pendingFutures === 0)
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry {
|
2011-04-27 16:52:19 +02:00
|
|
|
val latch = new StandardLatch
|
2011-05-03 18:52:58 -06:00
|
|
|
val f = Promise[Int](0)
|
2011-04-27 16:52:19 +02:00
|
|
|
Thread.sleep(25)
|
2011-05-18 17:25:30 +02:00
|
|
|
f.onComplete(_ ⇒ latch.open) //Shouldn't throw any exception here
|
2011-04-27 16:52:19 +02:00
|
|
|
|
|
|
|
|
assert(f.isExpired) //Should be expired
|
|
|
|
|
|
|
|
|
|
f.complete(Right(1)) //Shouldn't complete the Future since it is expired
|
|
|
|
|
|
|
|
|
|
assert(f.value.isEmpty) //Shouldn't be completed
|
|
|
|
|
assert(!latch.isOpen) //Shouldn't run the listener
|
2011-04-23 09:14:20 -06:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureDataFlowShouldEmulateBlocking1 {
|
2011-04-26 11:41:26 +02:00
|
|
|
import Future.flow
|
|
|
|
|
|
2011-05-03 18:52:58 -06:00
|
|
|
val one, two = Promise[Int](1000 * 60)
|
2011-04-26 11:41:26 +02:00
|
|
|
val simpleResult = flow {
|
|
|
|
|
one() + two()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(List(one, two, simpleResult).forall(_.isCompleted == false))
|
|
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { one << 1 }
|
2011-04-26 11:41:26 +02:00
|
|
|
|
|
|
|
|
assert(one.isCompleted)
|
|
|
|
|
assert(List(two, simpleResult).forall(_.isCompleted == false))
|
|
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { two << 9 }
|
2011-04-26 11:41:26 +02:00
|
|
|
|
|
|
|
|
assert(List(one, two).forall(_.isCompleted == true))
|
2011-04-26 13:37:06 +02:00
|
|
|
assert(simpleResult.get === 10)
|
2011-04-26 11:41:26 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureDataFlowShouldEmulateBlocking2 {
|
2011-04-26 11:17:29 +02:00
|
|
|
import Future.flow
|
2011-05-03 18:52:58 -06:00
|
|
|
val x1, x2, y1, y2 = Promise[Int](1000 * 60)
|
2011-04-26 11:17:29 +02:00
|
|
|
val lx, ly, lz = new StandardLatch
|
|
|
|
|
val result = flow {
|
|
|
|
|
lx.open()
|
|
|
|
|
x1 << y1
|
|
|
|
|
ly.open()
|
|
|
|
|
x2 << y2
|
|
|
|
|
lz.open()
|
|
|
|
|
x1() + x2()
|
|
|
|
|
}
|
|
|
|
|
assert(lx.isOpen)
|
|
|
|
|
assert(!ly.isOpen)
|
|
|
|
|
assert(!lz.isOpen)
|
2011-05-18 17:25:30 +02:00
|
|
|
assert(List(x1, x2, y1, y2).forall(_.isCompleted == false))
|
2011-04-26 11:17:29 +02:00
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { y1 << 1 } // When this is set, it should cascade down the line
|
2011-04-26 11:17:29 +02:00
|
|
|
|
|
|
|
|
assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
|
2011-04-26 13:37:06 +02:00
|
|
|
assert(x1.get === 1)
|
2011-04-26 11:17:29 +02:00
|
|
|
assert(!lz.isOpen)
|
|
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { y2 << 9 } // When this is set, it should cascade down the line
|
2011-04-26 11:17:29 +02:00
|
|
|
|
|
|
|
|
assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
|
2011-04-26 13:37:06 +02:00
|
|
|
assert(x2.get === 9)
|
2011-04-26 11:17:29 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
assert(List(x1, x2, y1, y2).forall(_.isCompleted == true))
|
2011-04-26 11:17:29 +02:00
|
|
|
|
2011-04-26 13:37:06 +02:00
|
|
|
assert(result.get === 10)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def dataFlowAPIshouldbeSlick {
|
2011-04-26 13:37:06 +02:00
|
|
|
import Future.flow
|
|
|
|
|
|
2011-04-26 13:46:54 +02:00
|
|
|
val i1, i2, s1, s2 = new StandardLatch
|
2011-04-26 13:37:06 +02:00
|
|
|
|
2011-04-26 13:46:54 +02:00
|
|
|
val callService1 = Future { i1.open; s1.awaitUninterruptible; 1 }
|
|
|
|
|
val callService2 = Future { i2.open; s2.awaitUninterruptible; 9 }
|
2011-04-26 13:37:06 +02:00
|
|
|
|
2011-04-26 13:46:54 +02:00
|
|
|
val result = flow { callService1() + callService2() }
|
2011-04-26 13:37:06 +02:00
|
|
|
|
|
|
|
|
assert(!s1.isOpen)
|
|
|
|
|
assert(!s2.isOpen)
|
|
|
|
|
assert(!result.isCompleted)
|
2011-04-26 13:46:54 +02:00
|
|
|
assert(i1.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
|
|
|
|
|
assert(i2.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
|
2011-04-26 13:37:06 +02:00
|
|
|
s1.open
|
|
|
|
|
s2.open
|
|
|
|
|
assert(result.get === 10)
|
2011-04-26 11:17:29 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureCompletingWithContinuationsFailure {
|
2011-04-23 09:14:20 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
2011-05-03 18:52:58 -06:00
|
|
|
val x, y, z = Promise[Int]()
|
2011-04-23 09:14:20 -06:00
|
|
|
val ly, lz = new StandardLatch
|
|
|
|
|
|
|
|
|
|
val result = flow {
|
|
|
|
|
y << x
|
|
|
|
|
ly.open
|
|
|
|
|
val oops = 1 / 0
|
|
|
|
|
z << x
|
|
|
|
|
lz.open
|
|
|
|
|
z() + y() + oops
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(!ly.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
|
|
|
|
|
assert(!lz.tryAwaitUninterruptible(100, TimeUnit.MILLISECONDS))
|
|
|
|
|
|
2011-05-03 18:44:27 -06:00
|
|
|
flow { x << 5 }
|
2011-04-23 09:14:20 -06:00
|
|
|
|
|
|
|
|
assert(y.get === 5)
|
|
|
|
|
intercept[java.lang.ArithmeticException](result.get)
|
|
|
|
|
assert(z.value === None)
|
|
|
|
|
assert(!lz.isOpen)
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
2011-04-23 07:42:30 -06:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureContinuationsShouldNotBlock {
|
2011-04-23 07:42:30 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
val latch = new StandardLatch
|
|
|
|
|
val future = Future {
|
|
|
|
|
latch.await
|
|
|
|
|
"Hello"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val result = flow {
|
|
|
|
|
Some(future()).filter(_ == "Hello")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(!result.isCompleted)
|
|
|
|
|
|
|
|
|
|
latch.open
|
|
|
|
|
|
|
|
|
|
assert(result.get === Some("Hello"))
|
|
|
|
|
}
|
2011-04-27 19:39:15 -06:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureFlowShouldBeTypeSafe {
|
2011-05-02 14:44:40 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
|
|
|
|
def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest
|
|
|
|
|
|
|
|
|
|
val rString = flow {
|
|
|
|
|
val x = Future(5)
|
|
|
|
|
x().toString
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val rInt = flow {
|
|
|
|
|
val x = rString.apply
|
|
|
|
|
val y = Future(5)
|
|
|
|
|
x.length + y()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert(checkType(rString, manifest[String]))
|
|
|
|
|
assert(checkType(rInt, manifest[Int]))
|
|
|
|
|
assert(!checkType(rInt, manifest[String]))
|
|
|
|
|
assert(!checkType(rInt, manifest[Nothing]))
|
|
|
|
|
assert(!checkType(rInt, manifest[Any]))
|
2011-05-03 18:44:27 -06:00
|
|
|
|
2011-05-02 14:44:40 -06:00
|
|
|
rString.await
|
|
|
|
|
rInt.await
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def futureFlowSimpleAssign {
|
2011-05-03 18:44:27 -06:00
|
|
|
import Future.flow
|
|
|
|
|
|
2011-05-03 18:52:58 -06:00
|
|
|
val x, y, z = Promise[Int]()
|
2011-05-03 18:44:27 -06:00
|
|
|
|
|
|
|
|
flow {
|
|
|
|
|
z << x() + y()
|
|
|
|
|
}
|
|
|
|
|
flow { x << 40 }
|
|
|
|
|
flow { y << 2 }
|
|
|
|
|
|
|
|
|
|
assert(z.get === 42)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def ticket812FutureDispatchCleanup {
|
2011-04-27 19:39:15 -06:00
|
|
|
val dispatcher = implicitly[MessageDispatcher]
|
2011-04-27 20:45:39 -06:00
|
|
|
assert(dispatcher.pendingFutures === 0)
|
2011-05-18 17:25:30 +02:00
|
|
|
val future = Future({ Thread.sleep(100); "Done" }, 10)
|
2011-04-27 19:39:15 -06:00
|
|
|
intercept[FutureTimeoutException] { future.await }
|
2011-04-27 20:45:39 -06:00
|
|
|
assert(dispatcher.pendingFutures === 1)
|
|
|
|
|
Thread.sleep(100)
|
|
|
|
|
assert(dispatcher.pendingFutures === 0)
|
2011-03-22 22:12:16 +01:00
|
|
|
}
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|