Tests are green with new Futures, consider this a half-way-there marker

This commit is contained in:
Viktor Klang 2011-12-11 00:40:52 +01:00
parent b4f486667f
commit 3b1330c6d7
33 changed files with 287 additions and 601 deletions

View file

@ -22,14 +22,14 @@ import akka.testkit.AkkaSpec;
public class JavaFutureTests {
private static ActorSystem system;
private static FutureFactory ff;
private volatile static FutureFactory ff;
private static Timeout t;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
t = system.settings().ActorTimeout();
ff = new FutureFactory(system.dispatcher(), t);
ff = new FutureFactory(system.dispatcher());
}
@AfterClass
@ -51,7 +51,7 @@ public class JavaFutureTests {
public String apply(String s) {
return s + " World";
}
}, t);
});
assertEquals("Hello World", f2.get());
}
@ -59,8 +59,7 @@ public class JavaFutureTests {
@Test
public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
Future<String> f = cf;
f.onResult(new Procedure<String>() {
public void apply(String result) {
@ -77,8 +76,7 @@ public class JavaFutureTests {
@Test
public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
Future<String> f = cf;
f.onException(new Procedure<Throwable>() {
public void apply(Throwable t) {
@ -93,27 +91,10 @@ public class JavaFutureTests {
assertEquals(f.exception().get(), exception);
}
@Test
public void mustBeAbleToExecuteAnOnTimeoutCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Future<String> f = cf;
f.onTimeout(new Procedure<Future<String>>() {
public void apply(Future<String> future) {
latch.countDown();
}
});
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertTrue(f.value().isEmpty());
}
@Test
public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
Future<String> f = cf;
f.onComplete(new Procedure<Future<String>>() {
public void apply(akka.dispatch.Future<String> future) {
@ -129,8 +110,7 @@ public class JavaFutureTests {
@Test
public void mustBeAbleToForeachAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
Future<String> f = cf;
f.foreach(new Procedure<String>() {
public void apply(String future) {
@ -146,19 +126,17 @@ public class JavaFutureTests {
@Test
public void mustBeAbleToFlatMapAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
cf.completeWithResult("1000");
Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(String r) {
latch.countDown();
Promise<Integer> cf = new akka.dispatch.DefaultPromise<Integer>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<Integer> cf = ff.<Integer>promise();
cf.completeWithResult(Integer.parseInt(r));
return cf;
}
}, t);
});
assertEquals(f.get(), "1000");
assertEquals(r.get().intValue(), 1000);
@ -168,15 +146,14 @@ public class JavaFutureTests {
@Test
public void mustBeAbleToFilterAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system
.dispatcherFactory().defaultGlobalDispatcher());
Promise<String> cf = ff.<String>promise();
Future<String> f = cf;
Future<String> r = f.filter(new Function<String, Boolean>() {
public Boolean apply(String r) {
latch.countDown();
return r.equals("foo");
}
}, t);
});
cf.completeWithResult("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
@ -199,7 +176,7 @@ public class JavaFutureTests {
}));
}
Future<Iterable<String>> futureList = ff.sequence(listFutures, t);
Future<Iterable<String>> futureList = ff.sequence(listFutures);
assertEquals(futureList.get(), listExpected);
}
@ -219,7 +196,7 @@ public class JavaFutureTests {
}));
}
Future<String> result = ff.fold("", 15000, listFutures, new Function2<String, String, String>() {
Future<String> result = ff.fold("", listFutures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t;
}
@ -242,7 +219,7 @@ public class JavaFutureTests {
}));
}
Future<String> result = ff.reduce(listFutures, 15000, new Function2<String, String, String>() {
Future<String> result = ff.reduce(listFutures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t;
}
@ -261,7 +238,7 @@ public class JavaFutureTests {
listStrings.add("test");
}
Future<Iterable<String>> result = ff.traverse(listStrings, t, new Function<String, Future<String>>() {
Future<Iterable<String>> result = ff.traverse(listStrings, new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return ff.future(new Callable<String>() {
public String call() {
@ -290,7 +267,7 @@ public class JavaFutureTests {
public Boolean apply(Integer i) {
return i == 5;
}
}, t);
});
final Integer got = f.get().get();
assertEquals(expect, got);

View file

@ -11,9 +11,9 @@ import akka.testkit._
import akka.util.duration._
import java.lang.IllegalStateException
import akka.util.ReflectiveAccess
import akka.dispatch.{ DefaultPromise, Promise, Future }
import akka.serialization.Serialization
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.dispatch.{ Block, DefaultPromise, Promise, Future }
object ActorRefSpec {
@ -126,9 +126,9 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
}
def wrap[T](f: Promise[Actor] T): T = {
val result = new DefaultPromise[Actor](10 * 60 * 1000)
val result = Promise[Actor]()
val r = f(result)
result.get
Block.on(result, 1 minute).resultOrException
r
}

View file

@ -4,10 +4,11 @@
package akka.actor
import org.scalatest.BeforeAndAfterAll
import akka.dispatch.FutureTimeoutException
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException
import akka.dispatch.Block
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
@ -28,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val echo = actorWithTimeout(Timeout(12))
try {
val f = echo ? "hallo"
intercept[FutureTimeoutException] { f.await }
intercept[TimeoutException] { Block.on(f, system.settings.ActorTimeout.duration) }
} finally { echo.stop }
}
}
@ -39,7 +40,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val echo = actorWithTimeout(Props.defaultTimeout)
try {
val f = (echo ? "hallo").mapTo[String]
intercept[FutureTimeoutException] { f.await }
intercept[TimeoutException] { Block.on(f, timeout.duration) }
f.value must be(None)
} finally { echo.stop }
}
@ -48,7 +49,11 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
"use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = actorWithTimeout(Props.defaultTimeout)
try { (echo.?("hallo", testTimeout)).as[String] must be(None) } finally { echo.stop }
val f = echo.?("hallo", testTimeout)
try {
intercept[TimeoutException] { Block.on(f, testTimeout) }
f.value must be === None
} finally { echo.stop }
}
}
}

View file

@ -8,9 +8,9 @@ import org.scalatest.BeforeAndAfterEach
import akka.util.ByteString
import akka.util.cps._
import akka.dispatch.Future
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Block, Future }
object IOActorSpec {
import IO._
@ -239,9 +239,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val f1 = client1 ? (('set, "hello", ByteString("World")))
val f2 = client1 ? (('set, "test", ByteString("No one will read me")))
val f3 = client1 ? (('get, "hello"))
f2.await
Block.on(f2, timeout.duration)
val f4 = client2 ? (('set, "test", ByteString("I'm a test!")))
f4.await
Block.on(f4, timeout.duration)
val f5 = client1 ? (('get, "test"))
val f6 = client2 ? 'getall
f1.get must equal("OK")

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.dispatch.Future
import akka.dispatch.{ Block, Future }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalActorRefProviderSpec extends AkkaSpec {
@ -32,7 +32,7 @@ class LocalActorRefProviderSpec extends AkkaSpec {
val address = "new-actor" + i
implicit val timeout = Timeout(5 seconds)
val actors = for (j 1 to 4) yield Future(system.actorOf(Props(c { case _ }), address))
val set = Set() ++ actors.map(_.await.value match {
val set = Set() ++ actors.map(a Block.on(a, timeout.duration).value match {
case Some(Right(a: ActorRef)) 1
case Some(Left(ex: InvalidActorNameException)) 2
case x x

View file

@ -7,7 +7,6 @@ package akka.actor
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.Duration
import akka.util.duration._
import akka.dispatch.{ Dispatchers, Future, KeptPromise }
import akka.serialization.Serialization
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
@ -17,6 +16,7 @@ import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Block, Dispatchers, Future, KeptPromise }
object TypedActorSpec {
@ -296,7 +296,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
t.failingFuturePigdog.await.exception.get.getMessage must be("expected")
Block.on(t.failingFuturePigdog, 2 seconds).exception.get.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] { t.failingJOptionPigdog }).getMessage must be("expected")

View file

@ -10,11 +10,11 @@ import akka.actor.{ Actor, ActorRef, Status }
import akka.testkit.{ EventFilter, filterEvents, filterException }
import akka.util.duration._
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import org.scalatest.junit.JUnitSuite
import java.lang.ArithmeticException
import akka.testkit.DefaultTimeout
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
object FutureSpec {
class TestActor extends Actor {
@ -47,7 +47,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"never completed" must {
behave like emptyFuture(_(Promise()))
"return supplied value on timeout" in {
val promise = Promise[String](100) orElse "Timedout"
val timedOut = new KeptPromise[String](Right("Timedout"))
val promise = Promise[String]() orElse timedOut
promise.get must be("Timedout")
}
}
@ -61,9 +62,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = Promise[String]().complete(Left(new RuntimeException(message)))
behave like futureWithException[RuntimeException](_(future, message))
}
"expired" must {
behave like expiredFuture(_(Promise(0)))
}
}
"A Future" when {
@ -78,7 +76,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
test(future)
latch.open
future.await
Block.on(future, timeout.duration)
}
}
"is completed" must {
@ -90,7 +88,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
result
}
latch.open
future.await
Block.on(future, timeout.duration)
test(future, result)
}
}
@ -99,8 +97,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[ArithmeticException] {
check({ (future: Future[Int], actions: List[FutureAction])
val result = (future /: actions)(_ /: _)
val expected = (future.await.value.get /: actions)(_ /: _)
((result.await.value.get, expected) match {
val expected = (Block.on(future, timeout.duration).value.get /: actions)(_ /: _)
((Block.on(result, timeout.duration).value.get, expected) match {
case (Right(a), Right(b)) a == b
case (Left(a), Left(b)) if a.toString == b.toString true
case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty
@ -118,7 +116,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
behave like futureWithResult { test
val actor = system.actorOf[TestActor]
val future = actor ? "Hello"
future.await
Block.on(future, timeout.duration)
test(future, "World")
actor.stop()
}
@ -128,7 +126,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[RuntimeException] {
val actor = system.actorOf[TestActor]
val future = actor ? "Failure"
future.await
Block.on(future, timeout.duration)
test(future, "Expected exception; to test fault-tolerance")
actor.stop()
}
@ -142,7 +140,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await
Block.on(future, timeout.duration)
test(future, "WORLD")
actor1.stop()
actor2.stop()
@ -154,7 +152,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! Status.Failure(new ArithmeticException("/ by zero")) } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
future.await
Block.on(future, timeout.duration)
test(future, "/ by zero")
actor1.stop()
actor2.stop()
@ -167,7 +165,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i }
future.await
Block.on(future, timeout.duration)
test(future, "World (of class java.lang.String)")
actor1.stop()
actor2.stop()
@ -285,7 +283,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(new DefaultPromise[Int]()) :+ new KeptPromise[Int](Right(5))
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5))
Future.firstCompletedOf(futures).get must be(5)
}
@ -306,7 +304,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
Future.fold(futures, timeout)(0)(_ + _).get must be(45)
Block.on(Future.fold(futures)(0)(_ + _), timeout millis).resultOrException.get must be(45)
}
"fold by composing" in {
@ -333,18 +331,19 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
Future.fold(futures, timeout)(0)(_ + _).await.exception.get.getMessage must be("shouldFoldResultsWithException: expected")
Block.on(Future.fold(futures)(0)(_ + _), timeout millis).exception.get.getMessage must be("shouldFoldResultsWithException: expected")
}
}
"fold mutable zeroes safely" in {
import scala.collection.mutable.ArrayBuffer
def test(testNumber: Int) {
val fs = (0 to 1000) map (i Future(i, 10000))
val result = Future.fold(fs, 10000)(ArrayBuffer.empty[AnyRef]) {
val fs = (0 to 1000) map (i Future(i))
val f = Future.fold(fs)(ArrayBuffer.empty[AnyRef]) {
case (l, i) if i % 2 == 0 l += i.asInstanceOf[AnyRef]
case (l, _) l
}.get.asInstanceOf[ArrayBuffer[Int]].sum
}
val result = Block.on(f.mapTo[ArrayBuffer[Int]], 10000 millis).resultOrException.get.sum
assert(result === 250500)
}
@ -364,7 +363,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
assert(Future.reduce(futures, timeout)(_ + _).get === 45)
assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).resultOrException.get === 45)
}
"shouldReduceResultsWithException" in {
@ -381,7 +380,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
assert(Future.reduce(futures, timeout)(_ + _).await.exception.get.getMessage === "shouldFoldResultsWithException: expected")
assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).exception.get.getMessage === "shouldFoldResultsWithException: expected")
}
}
@ -421,9 +420,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
class ThrowableTest(m: String) extends Throwable(m)
filterException[ThrowableTest] {
val f1 = Future { throw new ThrowableTest("test") }
f1.await
intercept[ThrowableTest] { f1.get }
val f1 = Future[Any] { throw new ThrowableTest("test") }
intercept[ThrowableTest] { Block.on(f1, timeout.duration).resultOrException.get }
val latch = new StandardLatch
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
@ -431,12 +429,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f2 onResult { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
latch.open
f2.await
assert(f2.get === "success")
assert(Block.on(f2, timeout.duration).resultOrException.get === "success")
f2 foreach (_ throw new ThrowableTest("current thread foreach"))
f2 onResult { case _ throw new ThrowableTest("current thread receive") }
f3.await
assert(f3.get === "SUCCESS")
assert(Block.on(f3, timeout.duration).resultOrException.get === "SUCCESS")
}
}
@ -450,10 +446,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
latch.open
assert(f2.get === 10)
val f3 = Future({ Thread.sleep(10); 5 }, 10 millis)
filterException[FutureTimeoutException] {
intercept[FutureTimeoutException] {
f3.get
val f3 = Future({ Thread.sleep(100); 5 })
filterException[TimeoutException] {
intercept[TimeoutException] {
Block.on(f3, 0 millis)
}
}
}
@ -556,24 +552,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(result2.get === 50)
}
"shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry" in {
val latch = new StandardLatch
val f = Promise[Int](0)
Thread.sleep(25)
f.onComplete(_ latch.open) //Shouldn't throw any exception here
assert(f.isExpired) //Should be expired
f.complete(Right(1)) //Shouldn't complete the Future since it is expired
assert(f.value.isEmpty) //Shouldn't be completed
assert(!latch.isOpen) //Shouldn't run the listener
}
"futureDataFlowShouldEmulateBlocking1" in {
import Future.flow
val one, two = Promise[Int](1000 * 60)
val one, two = Promise[Int]()
val simpleResult = flow {
one() + two()
}
@ -582,14 +564,14 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { one << 1 }
one.await
Block.on(one, 1 minute)
assert(one.isCompleted)
assert(List(two, simpleResult).forall(_.isCompleted == false))
flow { two << 9 }
two.await
Block.on(two, 1 minute)
assert(List(one, two).forall(_.isCompleted == true))
assert(simpleResult.get === 10)
@ -598,7 +580,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"futureDataFlowShouldEmulateBlocking2" in {
import Future.flow
val x1, x2, y1, y2 = Promise[Int](1000 * 60)
val x1, x2, y1, y2 = Promise[Int]()
val lx, ly, lz = new StandardLatch
val result = flow {
lx.open()
@ -616,17 +598,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { y1 << 1 } // When this is set, it should cascade down the line
assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(x1.get === 1)
assert(Block.on(x1, 1 minute).resultOrException.get === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(x2.get === 9)
assert(Block.on(x2, 1 minute).resultOrException.get === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted == true))
assert(result.get === 10)
assert(Block.on(result, 1 minute).resultOrException.get === 10)
}
"dataFlowAPIshouldbeSlick" in {
@ -717,8 +699,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(!checkType(rInt, manifest[Nothing]))
assert(!checkType(rInt, manifest[Any]))
rString.await
rInt.await
Block.on(rString, timeout.duration).resultOrException
Block.on(rInt, timeout.duration).resultOrException
}
"futureFlowSimpleAssign" in {
@ -810,30 +792,29 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
latch(8).open
latch(9).await
f4.await must be('completed)
Block.on(f4, timeout.duration) must be('completed)
}
"should not deadlock with nested await (ticket 1313)" in {
val simple = Future() map (_ (Future(()) map (_ ())).get)
simple.await must be('completed)
Block.on(simple, timeout.duration) must be('completed)
val l1, l2 = new StandardLatch
val complex = Future() map { _
Future.blocking()
val nested = Future()
Future.blocking(system.dispatcher)
val nested = Future(())
nested foreach (_ l1.open)
l1.await // make sure nested is completed
nested foreach (_ l2.open)
l2.await
}
assert(complex.await.isCompleted)
assert(Block.on(complex, timeout.duration).isCompleted)
}
}
}
def emptyFuture(f: (Future[Any] Unit) Unit) {
"not be completed" in { f(_ must not be ('completed)) }
"not be expired" in { f(_ must not be ('expired)) }
"not contain a value" in { f(_.value must be(None)) }
"not contain a result" in { f(_.result must be(None)) }
"not contain an exception" in { f(_.exception must be(None)) }
@ -841,13 +822,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
def futureWithResult(f: ((Future[Any], Any) Unit) Unit) {
"be completed" in { f((future, _) future must be('completed)) }
"not be expired" in { f((future, _) future must not be ('expired)) }
"contain a value" in { f((future, result) future.value must be(Some(Right(result)))) }
"contain a result" in { f((future, result) future.result must be(Some(result))) }
"not contain an exception" in { f((future, _) future.exception must be(None)) }
"return result with 'get'" in { f((future, result) future.get must be(result)) }
"return result with 'resultOrException'" in { f((future, result) future.resultOrException must be(Some(result))) }
"not timeout" in { f((future, _) future.await) }
"not timeout" in { f((future, _) Block.on(future, 0 millis)) }
"filter result" in {
f { (future, result)
(future filter (_ true)).get must be(result)
@ -866,13 +846,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) Unit) Unit) {
"be completed" in { f((future, _) future must be('completed)) }
"not be expired" in { f((future, _) future must not be ('expired)) }
"contain a value" in { f((future, _) future.value must be('defined)) }
"not contain a result" in { f((future, _) future.result must be(None)) }
"contain an exception" in { f((future, message) future.exception.get.getMessage must be(message)) }
"throw exception with 'get'" in { f((future, message) (evaluating { future.get } must produce[E]).getMessage must be(message)) }
"throw exception with 'resultOrException'" in { f((future, message) (evaluating { future.resultOrException } must produce[E]).getMessage must be(message)) }
"not timeout" in { f((future, _) future.await) }
"retain exception with filter" in {
f { (future, message)
(evaluating { (future filter (_ true)).get } must produce[E]).getMessage must be(message)
@ -889,11 +867,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"always cast successfully using mapTo" is pending
}
def expiredFuture(f: (Future[Any] Unit) Unit) {
"not be completed" in { f(_ must not be ('completed)) }
"be expired" in { f(_ must be('expired)) }
}
sealed trait IntAction { def apply(that: Int): Int }
case class IntAdd(n: Int) extends IntAction { def apply(that: Int) = that + n }
case class IntSub(n: Int) extends IntAction { def apply(that: Int) = that - n }

View file

@ -17,13 +17,9 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val q = factory(config)
ensureInitialMailboxState(config, q)
implicit val within = 1 second
val f = spawn { q.dequeue }
val f = spawn {
q.dequeue
}
f.await.resultOrException must be === Some(null)
Block.on(f, 1 second).resultOrException must be === Some(null)
}
"create a bounded mailbox with 10 capacity and with push timeout" in {
@ -61,8 +57,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
}
//CANDIDATE FOR TESTKIT
def spawn[T <: AnyRef](fun: T)(implicit within: Duration): Future[T] = {
val result = new DefaultPromise[T](within.length, within.unit)
def spawn[T <: AnyRef](fun: T): Future[T] = {
val result = Promise[T]()
val t = new Thread(new Runnable {
def run = try {
result.completeWithResult(fun)
@ -119,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val consumers = for (i (1 to 4).toList) yield createConsumer
val ps = producers.map(_.await.resultOrException.get)
val cs = consumers.map(_.await.resultOrException.get)
val ps = producers.map(Block.on(_, within).resultOrException.get)
val cs = consumers.map(Block.on(_, within).resultOrException.get)
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages

View file

@ -40,40 +40,6 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
assert(c.get === 3)
}
"timeout" in {
val a, c = Promise[Int]()
val b = Promise[Int](0)
val q = PromiseStream[Int](1000)
flow {
a << q()
b << q()
c << q()
}
Thread.sleep(10)
flow {
q << (1, 2)
q << 3
}
assert(a.get === 1)
intercept[FutureTimeoutException] { b.get }
assert(c.get === 3)
}
"timeout again" in {
val q = PromiseStream[Int](500)
val a = q.dequeue()
val b = q.dequeue()
q += 1
Thread.sleep(500)
q += (2, 3)
val c = q.dequeue()
val d = q.dequeue()
assert(a.get === 1)
intercept[FutureTimeoutException] { b.get }
assert(c.get === 2)
assert(d.get === 3)
}
"pend again" in {
val a, b, c, d = Promise[Int]()
val q1, q2 = PromiseStream[Int]()

View file

@ -2,9 +2,6 @@ package akka.performance.trading.system
import akka.performance.trading.domain._
import akka.actor._
import akka.dispatch.Future
import akka.dispatch.FutureTimeoutException
import akka.dispatch.MessageDispatcher
trait MatchingEngine {
val meId: String

View file

@ -1,11 +1,11 @@
package akka.routing
import akka.dispatch.{ KeptPromise, Future }
import akka.actor._
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.AkkaSpec
import akka.dispatch.{ Block, KeptPromise, Future }
object ActorPoolSpec {
@ -125,8 +125,8 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
}).withFaultHandler(faultHandler))
try {
(for (count 1 to 500) yield pool.?("Test", 20000)) foreach {
_.await.resultOrException.get must be("Response")
(for (count 1 to 500) yield pool.?("Test", 20 seconds)) foreach {
Block.on(_, 20 seconds).resultOrException.get must be("Response")
}
} finally {
pool.stop()

View file

@ -3,6 +3,8 @@ package akka.ticket
import akka.actor._
import akka.routing._
import akka.testkit.AkkaSpec
import akka.dispatch.Block
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket703Spec extends AkkaSpec {
@ -26,7 +28,7 @@ class Ticket703Spec extends AkkaSpec {
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
(actorPool.?("Ping", 10000)).await.result must be === Some("Response")
Block.on(actorPool.?("Ping", 10000), 10 seconds).resultOrException.get must be === "Response"
}
}
}

View file

@ -4,7 +4,7 @@
package akka.util
import org.scalatest.matchers.MustMatchers
import akka.dispatch.Future
import akka.dispatch.{ Future, Block }
import akka.testkit.AkkaSpec
import scala.util.Random
import akka.testkit.DefaultTimeout
@ -125,7 +125,7 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout {
val tasks = List.fill(nrOfTasks)(executeRandomTask)
tasks.foreach(_.await)
tasks.foreach(Block.on(_, timeout.duration))
tasks.foreach(_.exception.map(throw _))
}
}

View file

@ -11,10 +11,10 @@ import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.Serialization
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import java.util.concurrent.TimeUnit
import akka.event.EventStream
import akka.event.DeathWatch
import scala.annotation.tailrec
import java.util.concurrent.{ TimeoutException, TimeUnit }
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -407,18 +407,9 @@ class AskActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,
deathWatch: DeathWatch,
timeout: Timeout,
val dispatcher: MessageDispatcher) extends MinimalActorRef {
final val result = new DefaultPromise[Any](timeout)(dispatcher)
{
val callback: Future[Any] Unit = { _ deathWatch.publish(Terminated(AskActorRef.this)); whenDone() }
result onComplete callback
result onTimeout callback
}
protected def whenDone(): Unit = ()
final val result = Promise[Any]()(dispatcher)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case Status.Success(r) result.completeWithResult(r)
@ -434,7 +425,7 @@ class AskActorRef(
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher)
override def isTerminated = result.isCompleted || result.isExpired
override def isTerminated = result.isCompleted
override def stop(): Unit = if (!isTerminated) result.completeWithException(new ActorKilledException("Stopped"))

View file

@ -442,7 +442,7 @@ class LocalActorRefProvider(
def dispatcher: MessageDispatcher = system.dispatcher
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
lazy val terminationFuture: Promise[Unit] = Promise[Unit]()(dispatcher)
lazy val rootGuardian: InternalActorRef = new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true) {
override def getParent: InternalActorRef = this
@ -567,16 +567,20 @@ class LocalActorRefProvider(
import akka.dispatch.DefaultPromise
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0
new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout
Promise[Any]()(dispatcher) //Abort early if nonsensical timeout
case t
val path = tempPath()
val name = path.name
val a = new AskActorRef(path, tempContainer, deathWatch, t, dispatcher) {
override def whenDone() {
tempContainer.children.remove(name)
}
}
val a = new AskActorRef(path, tempContainer, deathWatch, dispatcher)
tempContainer.children.put(name, a)
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.children.remove(name) }
a.result onComplete { _
try { f.cancel() }
finally {
try { tempContainer.children.remove(name) }
finally { deathWatch.publish(Terminated(a)) }
}
}
recipient.tell(message, a)
a.result
}

View file

@ -11,6 +11,7 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
import akka.serialization.SerializationExtension
import java.util.concurrent.TimeoutException
trait TypedActorFactory {
@ -409,7 +410,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case m if m.returnsFuture_? actor.?(m, timeout)
case m if m.returnsJOption_? || m.returnsOption_?
val f = actor.?(m, timeout)
(try { f.await.value } catch { case _: FutureTimeoutException None }) match {
(try { Block.on(f, timeout.duration).value } catch { case _: TimeoutException None }) match {
case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex

View file

@ -13,7 +13,6 @@ import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption }
import scala.util.continuations._
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import java.lang.{ Iterable JIterable }
import java.util.{ LinkedList JLinkedList }
@ -22,68 +21,59 @@ import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.{ Switch, Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable }
import akka.dispatch.Block.CanBlock
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(message: String) = this(message, null)
object Block {
sealed trait CanBlock
trait Blockable {
/**
* Should throw java.util.concurrent.TimeoutException if times out
*/
def block(atMost: Duration)(implicit permit: CanBlock): this.type
}
class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout) {
private implicit val permit = new CanBlock {}
def on[T <: Blockable](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost)
}
class FutureFactory(implicit dispatcher: MessageDispatcher) {
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T]): Future[T] =
Future(body.call, timeout)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Timeout): Future[T] =
Future(body.call, timeout)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Long): Future[T] =
Future(body.call, timeout)
Future(body.call)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher, timeout)
Future(body.call)(dispatcher)
/**
* Java API, equivalent to Future.apply
* Java API, equivalent to Promise.apply
*/
def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher, timeout)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher, timeout)
def promise[T](): Promise[T] = Promise[T]()
/**
* Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = {
val pred: T Boolean = predicate.apply(_)
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout)
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(pred).map(JOption.fromScalaOption(_))
}
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout)
/**
* Java API.
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout)
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))
/**
* Java API
@ -92,31 +82,22 @@ class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout)
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _)
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)
/**
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Timeout, fun: akka.japi.Function2[R, T, T]): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(fun.apply _)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
/**
* Java API.
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = {
implicit val t = timeout
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = {
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
@ -124,37 +105,28 @@ class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout)
})
}
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout)
/**
* Java API.
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
implicit val t = timeout
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a)
for (r fr; b fb) yield {
r add b
r
for (r fr; b fb) yield { r add b; r }
}
}
}
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn)
}
object Future {
/**
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
val promise = new DefaultPromise[T](timeout)
def apply[T](body: T)(implicit dispatcher: MessageDispatcher): Future[T] = {
val promise = Promise[T]()
dispatcher dispatchTask { ()
promise complete {
try {
@ -168,15 +140,6 @@ object Future {
promise
}
def apply[T](body: T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
apply(body)(dispatcher, timeout)
def apply[T](body: T, timeout: Duration)(implicit dispatcher: MessageDispatcher): Future[T] =
apply(body)(dispatcher, timeout)
def apply[T](body: T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] =
apply(body)(dispatcher, timeout)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@ -184,17 +147,14 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] =
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
sequence(in)(cbf, timeout, dispatcher)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
val futureResult = new DefaultPromise[T](timeout)
def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
val futureResult = Promise[T]()
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
futures.foreach(_ onComplete completeFirst)
@ -202,16 +162,13 @@ object Future {
futureResult
}
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
firstCompletedOf(futures)(dispatcher, timeout)
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Iterable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = {
def find[T](futures: Iterable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
else {
val result = new DefaultPromise[Option[T]](timeout)
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
val search: Future[T] Unit = f try {
f.result.filter(predicate).foreach(r result completeWithResult Some(r))
@ -225,9 +182,6 @@ object Future {
}
}
def find[T](futures: Iterable[Future[T]], timeout: Timeout)(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] =
find(futures)(predicate)(dispatcher, timeout)
/**
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
@ -238,16 +192,16 @@ object Future {
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
val result = new DefaultPromise[R](timeout)
val result = Promise[R]()
val results = new ConcurrentLinkedQueue[T]()
val done = new Switch(false)
val allDone = futures.size
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
val aggregate: Future[T] Unit = f if (done.isOff && !result.isCompleted) {
f.value.get match {
case Right(value)
val added = results add value
@ -280,9 +234,6 @@ object Future {
}
}
def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] =
fold(futures)(zero)(foldFun)(dispatcher, timeout)
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
@ -290,11 +241,11 @@ object Future {
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
else {
val result = new DefaultPromise[R](timeout)
val result = Promise[R]()
val seedFound = new AtomicBoolean(false)
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
@ -308,10 +259,6 @@ object Future {
result
}
}
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] =
reduce(futures)(op)(dispatcher, timeout)
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
@ -320,15 +267,12 @@ object Future {
* val myFutureList = Futures.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] =
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
}.map(_.result)
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
traverse(in)(fn)(cbf, timeout, dispatcher)
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
@ -345,8 +289,8 @@ object Future {
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
val future = Promise[A](timeout)
def flow[A](body: A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = {
val future = Promise[A]
dispatchTask({ ()
(reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException {
case e: Exception future completeWithException e
@ -355,8 +299,6 @@ object Future {
future
}
// TODO make variant of flow(timeout)(body) which does NOT break type inference
/**
* Assures that any Future tasks initiated in the current thread will be
* executed asynchronously, including any tasks currently queued to be
@ -381,7 +323,7 @@ object Future {
* }
* </pre>
*/
def blocking()(implicit dispatcher: MessageDispatcher): Unit =
def blocking(implicit dispatcher: MessageDispatcher): Unit =
_taskStack.get match {
case Some(taskStack) if taskStack.nonEmpty
val tasks = taskStack.elems
@ -419,7 +361,7 @@ object Future {
}
}
sealed trait Future[+T] extends japi.Future[T] {
sealed trait Future[+T] extends japi.Future[T] with Block.Blockable {
implicit def dispatcher: MessageDispatcher
@ -429,35 +371,7 @@ sealed trait Future[+T] extends japi.Future[T] {
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
def apply()(implicit timeout: Timeout): T @cps[Future[Any]] = shift(this flatMap (_: T Future[Any]))
/**
* Blocks awaiting completion of this Future, then returns the resulting value,
* or throws the completed exception
*
* Scala & Java API
*
* throws FutureTimeoutException if this Future times out when waiting for completion
*/
def get: T = this.await.resultOrException.get
/**
* Blocks the current thread until the Future has been completed or the
* timeout has expired. In the case of the timeout expiring a
* FutureTimeoutException will be thrown.
*/
def await: Future[T]
/**
* Blocks the current thread until the Future has been completed or the
* timeout has expired, additionally bounding the waiting period according to
* the <code>atMost</code> parameter. The timeout will be the lesser value of
* 'atMost' and the timeout supplied at the constructuion of this Future. In
* the case of the timeout expiring a FutureTimeoutException will be thrown.
* Other callers of this method are not affected by the additional bound
* imposed by <code>atMost</code>.
*/
def await(atMost: Duration): Future[T]
def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T Future[Any]))
/**
* Await completion of this Future and return its value if it conforms to A's
@ -466,7 +380,7 @@ sealed trait Future[+T] extends japi.Future[T] {
* in case of a timeout.
*/
def as[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
try Block.on(this, Duration.Inf) catch { case _: TimeoutException }
value match {
case None None
case Some(Left(ex)) throw ex
@ -479,42 +393,14 @@ sealed trait Future[+T] extends japi.Future[T] {
}
}
/**
* Await completion of this Future and return its value if it conforms to A's
* erased type, None otherwise. Will throw any exception the Future was
* completed with. Will return None in case of a timeout.
*/
def asSilently[A](implicit m: Manifest[A]): Option[A] = {
try await catch { case _: FutureTimeoutException }
value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v))
try Some(BoxedType(m.erasure).cast(v).asInstanceOf[A])
catch { case _: ClassCastException None }
}
}
@deprecated("Used Block.on(future, timeoutDuration)")
def get: T = Block.on(this, Duration.Inf).resultOrException.get
/**
* Tests whether this Future has been completed.
*/
final def isCompleted: Boolean = value.isDefined
/**
* Tests whether this Future's timeout has expired.
*
* Note that an expired Future may still contain a value, or it may be
* completed with a value.
*/
def isExpired: Boolean
def timeout: Timeout
/**
* This Future's timeout in nanoseconds.
*/
def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
/**
* The contained value of this Future. Before this Future is completed
* the value will be None. After completion the value will be Some(Right(t))
@ -542,8 +428,7 @@ sealed trait Future[+T] extends japi.Future[T] {
/**
* When this Future is completed, apply the provided function to the
* Future. If the Future has already been completed, this will apply
* immediately. Will not be called in case of a timeout, which also holds if
* corresponding Promise is attempted to complete after expiry. Multiple
* immediately. Multiple
* callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
*/
@ -582,9 +467,11 @@ sealed trait Future[+T] extends japi.Future[T] {
}
}
def onTimeout(func: Future[T] Unit): this.type
def orElse[A >: T](fallback: A): Future[A]
/**
* Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
* This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
*/
def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
/**
* Creates a new Future that will handle any matching Throwable that this
@ -597,8 +484,8 @@ sealed trait Future[+T] extends japi.Future[T] {
* Future(6 / 2) recover { case e: ArithmeticException 0 } // result: 3
* </pre>
*/
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val future = Promise[A]()
onComplete {
_.value.get match {
case Left(e) if pf isDefinedAt e future.complete(try { Right(pf(e)) } catch { case x: Exception Left(x) })
@ -621,8 +508,8 @@ sealed trait Future[+T] extends japi.Future[T] {
* } yield b + "-" + c
* </pre>
*/
final def map[A](f: T A)(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
final def map[A](f: T A): Future[A] = {
val future = Promise[A]()
onComplete {
_.value.get match {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, A]]
@ -643,8 +530,8 @@ sealed trait Future[+T] extends japi.Future[T] {
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = {
val fa = new DefaultPromise[A](timeout)
final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
val fa = Promise[A]()
onComplete { ft
fa complete (ft.value.get match {
case l: Left[_, _] l.asInstanceOf[Either[Throwable, A]]
@ -673,8 +560,8 @@ sealed trait Future[+T] extends japi.Future[T] {
* } yield b + "-" + c
* </pre>
*/
final def flatMap[A](f: T Future[A])(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
final def flatMap[A](f: T Future[A]): Future[A] = {
val future = Promise[A]()
onComplete {
_.value.get match {
@ -698,17 +585,17 @@ sealed trait Future[+T] extends japi.Future[T] {
}
}
final def withFilter(p: T Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p)
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean)(implicit timeout: Timeout) {
final class FutureWithFilter[+A](self: Future[A], p: A Boolean) {
def foreach(f: A Unit): Unit = self filter p foreach f
def map[B](f: A B): Future[B] = self filter p map f
def flatMap[B](f: A Future[B]): Future[B] = self filter p flatMap f
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
final def filter(p: T Boolean)(implicit timeout: Timeout): Future[T] = {
val future = new DefaultPromise[T](timeout)
final def filter(p: T Boolean): Future[T] = {
val future = Promise[T]()
onComplete {
_.value.get match {
case l: Left[_, _] future complete l.asInstanceOf[Either[Throwable, T]]
@ -735,16 +622,10 @@ sealed trait Future[+T] extends japi.Future[T] {
}
object Promise {
/**
* Creates a non-completed, new, Promise with the supplied timeout in milliseconds
* Creates a non-completed, new, Promise
*/
def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A](timeout)
/**
* Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
*/
def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
}
/**
@ -782,7 +663,7 @@ trait Promise[T] extends Future[T] {
final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any]) cont(complete(Right(value))) }
final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = new DefaultPromise[Any](this.timeout)
val fr = Promise[Any]()
this completeWith other onComplete { f
try {
fr completeWith cont(f)
@ -796,7 +677,7 @@ trait Promise[T] extends Future[T] {
}
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = new DefaultPromise[Any](this.timeout)
val fr = Promise[Any]()
stream.dequeue(this).onComplete { f
try {
fr completeWith cont(f)
@ -828,81 +709,50 @@ private[dispatch] object DefaultPromise {
case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
def exception: Throwable = value.get.left.get
}
case object Expired extends FState[Nothing] {
def value: Option[Either[Throwable, Nothing]] = None
}
private val emptyPendingValue = Pending[Nothing](Nil)
}
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
self
import DefaultPromise.{ FState, Success, Failure, Pending, Expired }
import DefaultPromise.{ FState, Success, Failure, Pending }
def this()(implicit dispatcher: MessageDispatcher, timeout: Timeout) = this(timeout)
def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout))
def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit))
private val _startTimeInNanos = currentTimeInNanos
def block(atMost: Duration)(implicit permit: CanBlock): this.type = if (value.isDefined) this else {
Future.blocking
val start = MILLISECONDS.toNanos(System.currentTimeMillis)
@tailrec
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (value.isEmpty && waitTimeNanos > 0) {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = currentTimeInNanos
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException }
awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
awaitUnsafe(waitTimeNanos - (MILLISECONDS.toNanos(System.currentTimeMillis) - start))
} else {
value.isDefined
}
}
def await(atMost: Duration): this.type = if (value.isDefined) this else {
Future.blocking()
val waitNanos =
if (timeout.duration.isFinite && atMost.isFinite)
atMost.toNanos min timeLeft()
else if (atMost.isFinite)
atMost.toNanos
else if (timeout.duration.isFinite)
timeLeft()
else Long.MaxValue //If both are infinite, use Long.MaxValue
val waitNanos = if (atMost.isFinite) atMost.toNanos else Long.MaxValue
if (awaitUnsafe(waitNanos)) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds")
else throw new TimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds")
}
def await = await(timeout.duration)
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
def value: Option[Either[Throwable, T]] = getState.value
@inline
protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean =
AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
@inline
protected final def getState: FState[T] = {
protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
@tailrec
def read(): FState[T] = {
val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
if (cur.isInstanceOf[Pending[_]] && isExpired) {
if (updateState(cur, Expired)) Expired else read()
} else cur
}
read()
}
@inline
protected final def getState: FState[T] = updater.get(this)
def complete(value: Either[Throwable, T]): this.type = {
val callbacks = {
@ -935,11 +785,9 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val cur = getState
cur match {
case _: Success[_] | _: Failure[_] true
case Expired false
case p: Pending[_]
val pt = p.asInstanceOf[Pending[T]]
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
else tryAddCallback()
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
}
}
@ -948,83 +796,17 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
this
}
def onTimeout(func: Future[T] Unit): this.type = {
val runNow =
if (!timeout.duration.isFinite) false //Not possible
else if (value.isEmpty) {
if (!isExpired) {
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired)
try dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this)
catch {
case _: IllegalStateException func(DefaultPromise.this)
}
else func(DefaultPromise.this)
}
}
}
val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable)
onComplete(_ timeoutFuture.cancel())
false
} else true
} else false
if (runNow) Future.dispatchTask(() notifyCompleted(func))
this
}
final def orElse[A >: T](fallback: A): Future[A] =
if (timeout.duration.isFinite) {
getState match {
case _: Success[_] | _: Failure[_] this
case Expired Future[A](fallback, timeout)
case _: Pending[_]
val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
promise completeWith this
val runnable = new Runnable {
def run() {
if (!isCompleted) {
val done =
if (!isExpired)
try {
dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS), this)
true
} catch {
case _: IllegalStateException false
}
else false
if (!done)
promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
}
}
}
dispatcher.prerequisites.scheduler.scheduleOnce(Duration(timeLeft(), NANOSECONDS), runnable)
promise
}
} else this
private def notifyCompleted(func: Future[T] Unit) {
// FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}
@inline
private def currentTimeInNanos: Long = MILLISECONDS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)?
//TODO: the danger of Math.abs is that it could break the ordering of time. So I would not recommend an abs.
@inline
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
private def timeLeftNoinline(): Long = timeLeft()
}
/**
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
val value = Some(suppliedValue)
def complete(value: Either[Throwable, T]): this.type = this
@ -1032,12 +814,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val di
Future dispatchTask (() func(this))
this
}
def await(atMost: Duration): this.type = this
def await: this.type = this
def isExpired: Boolean = true
def timeout: Timeout = Timeout.zero
final def onTimeout(func: Future[T] Unit): this.type = this
final def orElse[A >: T](fallback: A): Future[A] = this
def block(atMost: Duration)(implicit permit: CanBlock): this.type = this
}

View file

@ -185,9 +185,9 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout:
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) new KeptPromise(Right(eo.head))
else dequeue()
} else dequeue(Promise[A](timeout))
} else dequeue(Promise[A])
}
} else dequeue(Promise[A](timeout))
} else dequeue(Promise[A])
@tailrec
final def dequeue(promise: Promise[A]): Future[A] = _state.get match {

View file

@ -8,22 +8,13 @@ import akka.actor.Timeout
/* Java API */
trait Future[+T] { self: akka.dispatch.Future[T]
private[japi] final def onTimeout[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onTimeout(proc(_))
private[japi] final def onResult[A >: T](proc: Procedure[A]): this.type = self.onResult({ case r proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit])
private[japi] final def onException(proc: Procedure[Throwable]): this.type = self.onException({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_))
private[japi] final def map[A >: T, B](f: JFunc[A, B], timeout: Timeout): akka.dispatch.Future[B] = {
implicit val t = timeout
self.map(f(_))
}
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]], timeout: Timeout): akka.dispatch.Future[B] = {
implicit val t = timeout
self.flatMap(f(_))
}
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean], timeout: Timeout): akka.dispatch.Future[A] = {
implicit val t = timeout
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
}
}

View file

@ -11,10 +11,10 @@ import akka.config.ConfigurationException
import akka.util.ReentrantGuard
import akka.util.duration._
import akka.actor.Timeout
import akka.dispatch.FutureTimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
@ -147,7 +147,7 @@ trait LoggingBus extends ActorEventBus {
val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds)
val response = try actor ? InitializeLogger(this) get catch {
case _: FutureTimeoutException
case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}
if (response != LoggerInitialized)

View file

@ -202,7 +202,7 @@ class TransactionLog private (
EventHandler.debug(this, "Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
if (isAsync) {
val future = new DefaultPromise[Vector[Array[Byte]]](timeout)
val future = Promise[Vector[Array[Byte]]]()
ledger.asyncReadEntries(
from, to,
new AsyncCallback.ReadCallback {
@ -464,7 +464,7 @@ object TransactionLog {
}
}
val future = new DefaultPromise[LedgerHandle](timeout)
val future = Promise[LedgerHandle]()
if (isAsync) {
bookieClient.asyncCreateLedger(
ensembleSize, quorumSize, digestType, password,
@ -526,7 +526,7 @@ object TransactionLog {
val ledger = try {
if (isAsync) {
val future = new DefaultPromise[LedgerHandle](timeout)
val future = Promise[LedgerHandle]()
bookieClient.asyncOpenLedger(
logId, digestType, password,
new AsyncCallback.OpenCallback {

View file

@ -73,7 +73,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
}
"allow to track JVM state and bind handles through MetricsAlterationMonitors" in {
val monitorReponse = new DefaultPromise[String]
val monitorReponse = Promise[String]()
node.metricsManager.addMonitor(new LocalMetricsAlterationMonitor {

1
akka-docs/.history Normal file
View file

@ -0,0 +1 @@
exit

View file

@ -360,7 +360,7 @@ One thing to note is that we used two different versions of the ``actorOf`` meth
The actor's life-cycle is:
- Created & Started -- ``Actor.actorOf[MyActor]`` -- can receive messages
- Created & Started -- ``actorOf(MyActor.class)`` -- can receive messages
- Stopped -- ``actorRef.stop()`` -- can **not** receive messages
Once the actor has been stopped it is dead and can not be started again.

View file

@ -8,10 +8,10 @@ import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.actor.ActorCell
import akka.dispatch.Envelope
import akka.event.Logging
import akka.dispatch.DefaultPromise
import akka.actor.ActorRef
import akka.dispatch.{ Block, Promise, Envelope, DefaultPromise }
import java.util.concurrent.TimeoutException
class MongoBasedMailboxException(message: String) extends AkkaException(message)
@ -43,15 +43,14 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
/* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */
val durableMessage = MongoDurableMessage(ownerPathString, envelope.message, envelope.sender)
// todo - do we need to filter the actor name at all for safe collection naming?
val result = new DefaultPromise[Boolean](settings.WriteTimeout)(dispatcher)
val result = Promise[Boolean]()(dispatcher)
mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)]
wr match {
case Right((oid, wr)) result.completeWithResult(true)
case Left(t) result.completeWithException(t)
}
})
result.as[Boolean].orNull
Block.on(result, settings.WriteTimeout)
}
def dequeue(): Envelope = withErrorHandling {
@ -62,7 +61,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
* TODO - Should we have a specific query in place? Which way do we sort?
* TODO - Error handling version!
*/
val envelopePromise = new DefaultPromise[Envelope](settings.ReadTimeout)(dispatcher)
val envelopePromise = Promise[Envelope]()(dispatcher)
mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage]
doc match {
case Some(msg) {
@ -71,18 +70,16 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise)
}
case None
{
log.info("No matching document found. Not an error, just an empty queue.")
envelopePromise.completeWithResult(null)
}
()
}
}
envelopePromise.as[Envelope].orNull
try { Block.on(envelopePromise, settings.ReadTimeout).resultOrException.orNull } catch { case _: TimeoutException null }
}
def numberOfMessages: Int = {
val count = new DefaultPromise[Int](settings.ReadTimeout)(dispatcher)
val count = Promise[Int]()(dispatcher)
mongo.count()(count.completeWithResult)
count.as[Int].getOrElse(-1)
}

View file

@ -19,10 +19,10 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean
import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
import java.net.InetAddress
import akka.serialization.SerializationExtension
import java.util.concurrent.{ TimeoutException, ConcurrentHashMap }
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -84,7 +84,7 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
val path = supervisor.path / name
val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
val newFuture = Promise[ActorRef]()(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
@ -168,7 +168,7 @@ class RemoteActorRefProvider(
actors.replace(path.toString, newFuture, actor)
actor
case actor: InternalActorRef actor
case future: Future[_] future.get.asInstanceOf[InternalActorRef]
case future: Future[_] Block.on(future, system.settings.ActorTimeout.duration).resultOrException.get.asInstanceOf[InternalActorRef]
}
}
@ -224,7 +224,7 @@ class RemoteActorRefProvider(
if (withACK) {
try {
val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match {
(try Block.on(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)

View file

@ -4,7 +4,6 @@
package akka.spring
import foo.{ PingActor, IMyPojo, MyPojo }
import akka.dispatch.FutureTimeoutException
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@ -14,10 +13,10 @@ import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import org.springframework.core.io.{ ClassPathResource, Resource }
import org.scalatest.{ BeforeAndAfterAll, FeatureSpec }
import java.util.concurrent.CountDownLatch
import akka.remote.netty.NettyRemoteSupport
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.{TimeoutException, CountDownLatch}
object RemoteTypedActorLog {
import java.util.concurrent.{ LinkedBlockingQueue, TimeUnit, BlockingQueue }
@ -89,9 +88,9 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with B
assert(MyPojo.lastOneWayMessage === "hello 1")
}
scenario("FutureTimeoutException when timed out") {
scenario("TimeoutException when timed out") {
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
evaluating { myPojo.longRunning() } should produce[FutureTimeoutException]
evaluating { myPojo.longRunning() } should produce[TimeoutException]
}
scenario("typed-actor with timeout") {

View file

@ -8,7 +8,7 @@ import akka.actor.ActorSystem
import akka.actor._
import akka.stm._
import akka.japi.{ Function JFunc, Procedure JProc }
import akka.dispatch.{ PinnedDispatcher, UnboundedMailbox, DefaultPromise, Dispatchers, Future }
import akka.dispatch._
/**
* Used internally to send functions.
@ -123,7 +123,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
def alter(f: T T)(timeout: Timeout): Future[T] = {
def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]]
if (Stm.activeTransaction) {
val result = new DefaultPromise[T](timeout)(system.dispatcher)
val result = Promise[T]()(system.dispatcher)
get //Join xa
deferred { result completeWith dispatch } //Attach deferred-block to current transaction
result
@ -134,7 +134,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def send(newValue: T): Unit = send(x newValue)
def send(newValue: T): Unit = send(_ newValue)
/**
* Dispatch a new value for the internal state. Behaves the same
@ -166,7 +166,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* still be executed in order.
*/
def alterOff(f: T T)(timeout: Timeout): Future[T] = {
val result = new DefaultPromise[T](timeout)(system.dispatcher)
val result = Promise[T]()(system.dispatcher)
send((value: T) {
suspend()
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration)
@ -186,7 +186,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Gets this agent's value after all currently queued updates have completed.
*/
def await(implicit timeout: Timeout): T = future.await.result.get
def await(implicit timeout: Timeout): T = Block.on(future, timeout.duration).result.get
/**
* Map this agent to a new agent, applying the function to the internal state.

View file

@ -3,10 +3,14 @@ package akka.transactor.example;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Block;
import akka.dispatch.Future;
import akka.testkit.AkkaSpec;
import akka.transactor.Coordinated;
import akka.util.Duration;
import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedExample {
public static void main(String[] args) throws InterruptedException {
@ -20,11 +24,12 @@ public class UntypedCoordinatedExample {
Thread.sleep(3000);
long timeout = 5000;
Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS);
Future future1 = counter1.ask("GetCount", timeout);
Future future2 = counter2.ask("GetCount", timeout);
future1.await();
Block.on(future1, d);
if (future1.isCompleted()) {
if (future1.result().isDefined()) {
int result = (Integer) future1.result().get();
@ -32,7 +37,7 @@ public class UntypedCoordinatedExample {
}
}
future2.await();
Block.on(future2, d);
if (future2.isCompleted()) {
if (future2.result().isDefined()) {
int result = (Integer) future2.result().get();

View file

@ -3,8 +3,12 @@ package akka.transactor.example;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Block;
import akka.dispatch.Future;
import akka.testkit.AkkaSpec;
import akka.util.Duration;
import java.util.concurrent.TimeUnit;
public class UntypedTransactorExample {
public static void main(String[] args) throws InterruptedException {
@ -19,11 +23,12 @@ public class UntypedTransactorExample {
Thread.sleep(3000);
long timeout = 5000;
Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS);
Future future1 = counter1.ask("GetCount", timeout);
Future future2 = counter2.ask("GetCount", timeout);
future1.await();
Block.on(future1, d);
if (future1.isCompleted()) {
if (future1.result().isDefined()) {
int result = (Integer) future1.result().get();
@ -31,7 +36,7 @@ public class UntypedTransactorExample {
}
}
future2.await();
Block.on(future2, d);
if (future2.isCompleted()) {
if (future2.result().isDefined()) {
int result = (Integer) future2.result().get();

View file

@ -2,6 +2,8 @@ package akka.transactor.test;
import static org.junit.Assert.*;
import akka.dispatch.Block;
import akka.util.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -78,7 +80,7 @@ public class UntypedTransactorTest {
}
for (ActorRef counter : counters) {
Future future = counter.ask("GetCount", askTimeout);
future.await();
Block.on(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
if (future.isCompleted()) {
Option resultOption = future.result();
if (resultOption.isDefined()) {
@ -107,7 +109,7 @@ public class UntypedTransactorTest {
}
for (ActorRef counter : counters) {
Future future = counter.ask("GetCount", askTimeout);
future.await();
Block.on(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
if (future.isCompleted()) {
Option resultOption = future.result();
if (resultOption.isDefined()) {

View file

@ -11,6 +11,7 @@ import akka.util.duration._
import java.util.concurrent.CountDownLatch
import akka.testkit.AkkaSpec
import akka.testkit._
import akka.dispatch.Block
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
@ -35,7 +36,7 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds)
agent() must be("abcd")
agent.close
agent.close()
}
"maintain order between send and sendOff" in {
@ -51,7 +52,7 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds)
agent() must be("abcd")
agent.close
agent.close()
}
"maintain order between alter and alterOff" in {
@ -62,13 +63,13 @@ class AgentSpec extends AkkaSpec {
val r2 = agent.alterOff((s: String) { Thread.sleep(2000); s + "c" })(5000)
val r3 = agent.alter(_ + "d")(5000)
r1.await.resultOrException.get must be === "ab"
r2.await.resultOrException.get must be === "abc"
r3.await.resultOrException.get must be === "abcd"
Block.on(r1, 5 seconds).resultOrException.get must be === "ab"
Block.on(r2, 5 seconds).resultOrException.get must be === "abc"
Block.on(r3, 5 seconds).resultOrException.get must be === "abcd"
agent() must be("abcd")
agent.close
agent.close()
}
"be immediately readable" in {
@ -90,14 +91,14 @@ class AgentSpec extends AkkaSpec {
read must be(5)
agent() must be(10)
agent.close
agent.close()
}
"be readable within a transaction" in {
val agent = Agent(5)
val value = atomic { agent() }
value must be(5)
agent.close
agent.close()
}
"dispatch sends in successful transactions" in {
@ -112,7 +113,7 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds)
agent() must be(10)
agent.close
agent.close()
}
"not dispatch sends in aborted transactions" in {
@ -132,7 +133,7 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds)
agent() must be(5)
agent.close
agent.close()
}
"be able to return a 'queued' future" in {
@ -140,11 +141,9 @@ class AgentSpec extends AkkaSpec {
agent send (_ + "b")
agent send (_ + "c")
val future = agent.future
Block.on(agent.future, timeout.duration).resultOrException.get must be("abc")
future.await.result.get must be("abc")
agent.close
agent.close()
}
"be able to await the value after updates have completed" in {
@ -154,7 +153,7 @@ class AgentSpec extends AkkaSpec {
agent.await must be("abc")
agent.close
agent.close()
}
"be able to be mapped" in {
@ -164,8 +163,8 @@ class AgentSpec extends AkkaSpec {
agent1() must be(5)
agent2() must be(10)
agent1.close
agent2.close
agent1.close()
agent2.close()
}
"be able to be used in a 'foreach' for comprehension" in {
@ -178,7 +177,7 @@ class AgentSpec extends AkkaSpec {
result must be(3)
agent.close
agent.close()
}
"be able to be used in a 'map' for comprehension" in {
@ -188,8 +187,8 @@ class AgentSpec extends AkkaSpec {
agent1() must be(5)
agent2() must be(10)
agent1.close
agent2.close
agent1.close()
agent2.close()
}
"be able to be used in a 'flatMap' for comprehension" in {
@ -205,9 +204,9 @@ class AgentSpec extends AkkaSpec {
agent2() must be(2)
agent3() must be(3)
agent1.close
agent2.close
agent3.close
agent1.close()
agent2.close()
agent3.close()
}
}
}

View file

@ -7,16 +7,15 @@ import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag }
import org.scalatest.matchers.MustMatchers
import akka.actor.{ ActorSystem, ActorSystemImpl }
import akka.actor.{ Actor, ActorRef, Props }
import akka.dispatch.MessageDispatcher
import akka.event.{ Logging, LoggingAdapter }
import akka.util.duration._
import akka.dispatch.FutureTimeoutException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.PoisonPill
import java.util.concurrent.LinkedBlockingQueue
import akka.actor.CreateChild
import akka.actor.DeadLetter
import java.util.concurrent.TimeoutException
import akka.dispatch.{ Block, MessageDispatcher }
object TimingTest extends Tag("timing")
@ -65,8 +64,8 @@ abstract class AkkaSpec(_system: ActorSystem)
final override def afterAll {
system.stop()
try system.asInstanceOf[ActorSystemImpl].terminationFuture.await(5 seconds) catch {
case _: FutureTimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
try Block.on(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()
}