Removing Future.get

This commit is contained in:
Viktor Klang 2011-12-12 22:24:17 +01:00
parent ddcbe23f6d
commit d8fe6a5509
26 changed files with 183 additions and 193 deletions

View file

@ -25,6 +25,8 @@ public class JavaFutureTests {
private static ActorSystem system;
private static Timeout t;
private final Duration timeout = Duration.create(5, TimeUnit.SECONDS);
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
@ -52,7 +54,7 @@ public class JavaFutureTests {
}
});
assertEquals("Hello World", f2.get());
assertEquals("Hello World",Block.sync(f2, timeout));
}
@Test
@ -69,7 +71,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(Block.sync(f, timeout), "foo");
}
@Test
@ -103,7 +105,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(Block.sync(f, timeout), "foo");
}
@Test
@ -119,7 +121,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(Block.sync(f, timeout), "foo");
}
@Test
@ -137,8 +139,8 @@ public class JavaFutureTests {
}
});
assertEquals(f.get(), "1000");
assertEquals(r.get().intValue(), 1000);
assertEquals(Block.sync(f, timeout), "1000");
assertEquals(Block.sync(r, timeout).intValue(), 1000);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
@ -156,8 +158,8 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(r.get(), "foo");
assertEquals(Block.sync(f, timeout), "foo");
assertEquals(Block.sync(r, timeout), "foo");
}
// TODO: Improve this test, perhaps with an Actor
@ -177,7 +179,7 @@ public class JavaFutureTests {
Future<Iterable<String>> futureList = Futures.sequence(listFutures, system.dispatcher());
assertEquals(futureList.get(), listExpected);
assertEquals(Block.sync(futureList, timeout), listExpected);
}
// TODO: Improve this test, perhaps with an Actor
@ -201,7 +203,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(result.get(), expected.toString());
assertEquals(Block.sync(result, timeout), expected.toString());
}
@Test
@ -224,7 +226,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(result.get(), expected.toString());
assertEquals(Block.sync(result, timeout), expected.toString());
}
@Test
@ -247,7 +249,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(result.get(), expectedStrings);
assertEquals(Block.sync(result, timeout), expectedStrings);
}
@Test
@ -268,7 +270,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS)));
assertEquals(expect, Block.sync(f, timeout));
}
@Test

View file

@ -123,7 +123,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
f.isCompleted must be === false
a ! 42
f.isCompleted must be === true
f.get must be === 42
Block.sync(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second)
}
@ -136,7 +136,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their path" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
(looker ? LookupPath(pathOf.path)).get must be === result
Block.sync(looker ? LookupPath(pathOf.path), timeout.duration) must be === result
}
for {
looker all
@ -146,8 +146,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their string representation" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
(looker ? LookupString(pathOf.path.toString)).get must be === result
(looker ? LookupString(pathOf.path.toString + "/")).get must be === result
Block.sync(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result
Block.sync(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result
}
for {
looker all
@ -157,8 +157,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their root-anchored relative path" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
(looker ? LookupString(pathOf.path.elements.mkString("/", "/", ""))).get must be === result
(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/"))).get must be === result
Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result
Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result
}
for {
looker all
@ -168,9 +168,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their relative path" in {
def check(looker: ActorRef, result: ActorRef, elems: String*) {
(looker ? LookupElems(elems)).get must be === result
(looker ? LookupString(elems mkString "/")).get must be === result
(looker ? LookupString(elems mkString ("", "/", "/"))).get must be === result
Block.sync(looker ? LookupElems(elems), timeout.duration) must be === result
Block.sync(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result
Block.sync(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result
}
check(c1, user, "..")
for {
@ -185,11 +185,11 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find system-generated actors" in {
def check(target: ActorRef) {
for (looker all) {
(looker ? LookupPath(target.path)).get must be === target
(looker ? LookupString(target.path.toString)).get must be === target
(looker ? LookupString(target.path.toString + "/")).get must be === target
(looker ? LookupString(target.path.elements.mkString("/", "/", ""))).get must be === target
if (target != root) (looker ? LookupString(target.path.elements.mkString("/", "/", "/"))).get must be === target
Block.sync(looker ? LookupPath(target.path), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.toString), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target
if (target != root) Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target
}
}
for (target Seq(root, syst, user, system.deadLetters)) check(target)
@ -199,7 +199,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
import scala.collection.JavaConverters._
def checkOne(looker: ActorRef, query: Query) {
(looker ? query).get must be === system.deadLetters
Block.sync(looker ? query, timeout.duration) must be === system.deadLetters
}
def check(looker: ActorRef) {
Seq(LookupString("a/b/c"),
@ -218,21 +218,21 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val f = c1 ? GetSender(testActor)
val a = expectMsgType[ActorRef]
a.path.elements.head must be === "temp"
(c2 ? LookupPath(a.path)).get must be === a
(c2 ? LookupString(a.path.toString)).get must be === a
(c2 ? LookupString(a.path.elements.mkString("/", "/", ""))).get must be === a
(c2 ? LookupString("../../" + a.path.elements.mkString("/"))).get must be === a
(c2 ? LookupString(a.path.toString + "/")).get must be === a
(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/")).get must be === a
(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/")).get must be === a
(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements)).get must be === a
(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ "")).get must be === a
Block.sync(c2 ? LookupPath(a.path), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.toString), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a
Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a
Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a
f.isCompleted must be === false
a ! 42
f.isCompleted must be === true
f.get must be === 42
Block.sync(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond((c2 ? LookupPath(a.path)).get == system.deadLetters, 1 second)
awaitCond(Block.sync(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second)
}
}

View file

@ -361,8 +361,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val fnull = (ref ? (null, timeout)).mapTo[String]
ref ! PoisonPill
ffive.get must be("five")
fnull.get must be("null")
Block.sync(ffive, timeout.duration) must be("five")
Block.sync(fnull, timeout.duration) must be("null")
awaitCond(ref.isTerminated, 2000 millis)
}

View file

@ -8,6 +8,7 @@ import akka.testkit._
import akka.util.duration._
import Actor._
import akka.util.Duration
import akka.dispatch.Block
object ForwardActorSpec {
val ExpectedMessage = "FOO"
@ -32,20 +33,21 @@ class ForwardActorSpec extends AkkaSpec {
"A Forward Actor" must {
"forward actor reference when invoking forward on bang" in {
"forward actor reference when invoking forward on tell" in {
val latch = new TestLatch(1)
val replyTo = system.actorOf(new Actor { def receive = { case ExpectedMessage latch.countDown() } })
val replyTo = system.actorOf(new Actor { def receive = { case ExpectedMessage testActor ! ExpectedMessage } })
val chain = createForwardingChain(system)
chain.tell(ExpectedMessage, replyTo)
latch.await(Duration(5, "s")) must be === true
expectMsg(5 seconds, ExpectedMessage)
}
"forward actor reference when invoking forward on bang bang" in {
"forward actor reference when invoking forward on ask" in {
val chain = createForwardingChain(system)
chain.ask(ExpectedMessage, 5000).get must be === ExpectedMessage
chain.ask(ExpectedMessage, 5000) onSuccess { case ExpectedMessage testActor ! ExpectedMessage }
expectMsg(5 seconds, ExpectedMessage)
}
}
}

View file

@ -193,9 +193,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val f1 = client ? ByteString("Hello World!1")
val f2 = client ? ByteString("Hello World!2")
val f3 = client ? ByteString("Hello World!3")
f1.get must equal(ByteString("Hello World!1"))
f2.get must equal(ByteString("Hello World!2"))
f3.get must equal(ByteString("Hello World!3"))
Block.sync(f1, timeout.duration) must equal(ByteString("Hello World!1"))
Block.sync(f2, timeout.duration) must equal(ByteString("Hello World!2"))
Block.sync(f3, timeout.duration) must equal(ByteString("Hello World!3"))
client.stop
server.stop
ioManager.stop
@ -209,7 +209,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val client = system.actorOf(new SimpleEchoClient("localhost", 8065, ioManager))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(f.get.size === 1000)
assert(Block.sync(f, timeout.duration).size === 1000)
client.stop
server.stop
ioManager.stop
@ -223,7 +223,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val client = system.actorOf(new SimpleEchoClient("localhost", 8066, ioManager))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(f.get.size === 1000)
assert(Block.sync(f, timeout.duration).size === 1000)
client.stop
server.stop
ioManager.stop
@ -244,12 +244,12 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
Block.on(f4, timeout.duration)
val f5 = client1 ? (('get, "test"))
val f6 = client2 ? 'getall
f1.get must equal("OK")
f2.get must equal("OK")
f3.get must equal(ByteString("World"))
f4.get must equal("OK")
f5.get must equal(ByteString("I'm a test!"))
f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
Block.sync(f1, timeout.duration) must equal("OK")
Block.sync(f2, timeout.duration) must equal("OK")
Block.sync(f3, timeout.duration) must equal(ByteString("World"))
Block.sync(f4, timeout.duration) must equal("OK")
Block.sync(f5, timeout.duration) must equal(ByteString("I'm a test!"))
Block.sync(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
client1.stop
client2.stop
server.stop

View file

@ -7,10 +7,10 @@ package akka.actor
import org.scalatest.BeforeAndAfterEach
import akka.util.duration._
import akka.{ Die, Ping }
import akka.dispatch.Block
import akka.testkit.TestEvent._
import akka.testkit._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Block
object SupervisorSpec {
val Timeout = 5 seconds
@ -151,7 +151,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne
intercept[RuntimeException] { (temporaryActor.?(DieReply, TimeoutMillis)).get }
intercept[RuntimeException] { Block.sync(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) }
expectNoMsg(1 second)
}

View file

@ -247,7 +247,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t = newFooBar
val f = t.futurePigdog(200)
f.isCompleted must be(false)
f.get must be("Pigdog")
Block.sync(f, timeout.duration) must be("Pigdog")
mustStop(t)
}
@ -255,7 +255,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t = newFooBar
val futures = for (i 1 to 20) yield (i, t.futurePigdog(20, i))
for ((i, f) futures) {
f.get must be("Pigdog" + i)
Block.sync(f, timeout.duration) must be("Pigdog" + i)
}
mustStop(t)
}
@ -278,7 +278,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t, t2 = newFooBar(Duration(2, "s"))
val f = t.futureComposePigdogFrom(t2)
f.isCompleted must be(false)
f.get must equal("PIGDOG")
Block.sync(f, timeout.duration) must equal("PIGDOG")
mustStop(t)
mustStop(t2)
}
@ -323,7 +323,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val f2 = t.futurePigdog(0)
f2.isCompleted must be(false)
f.isCompleted must be(false)
f.get must equal(f2.get)
Block.sync(f, timeout.duration) must equal(Block.sync(f2, timeout.duration))
mustStop(t)
}
@ -348,7 +348,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i))
for ((i, r) results) r.get must be("Pigdog" + i)
for ((i, r) results) Block.sync(r, timeout.duration) must be("Pigdog" + i)
for (t thais) mustStop(t)
}

View file

@ -390,12 +390,12 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException new KeptPromise(Left(ActorInterruptedException(ie))) }
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert(f4.get === "foo2")
assert(intercept[ActorInterruptedException](f3.get).getMessage === "Ping!")
assert(f6.get === "bar2")
assert(intercept[ActorInterruptedException](f5.get).getMessage === "Ping!")
assert(Block.sync(f1, timeout.duration) === "foo")
assert(Block.sync(f2, timeout.duration) === "bar")
assert(Block.sync(f4, timeout.duration) === "foo2")
assert(intercept[ActorInterruptedException](Block.sync(f3, timeout.duration)).getMessage === "Ping!")
assert(Block.sync(f6, timeout.duration) === "bar2")
assert(intercept[ActorInterruptedException](Block.sync(f5, timeout.duration)).getMessage === "Ping!")
}
}
@ -410,10 +410,10 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
val f5 = a ? ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert(f4.get === "foo2")
assert(f6.get === "bar2")
assert(Block.sync(f1, timeout.duration) === "foo")
assert(Block.sync(f2, timeout.duration) === "bar")
assert(Block.sync(f4, timeout.duration) === "foo2")
assert(Block.sync(f6, timeout.duration) === "bar2")
assert(f3.value.isEmpty)
assert(f5.value.isEmpty)
}

View file

@ -66,7 +66,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
case "ping" if (works.get) latch.countDown()
}).withDispatcher(throughputDispatcher))
assert((slowOne ? "hogexecutor").get === "OK")
assert(Block.sync(slowOne ? "hogexecutor", timeout.duration) === "OK")
(1 to 100) foreach { _ slowOne ! "ping" }
fastOne ! "sabotage"
start.countDown()

View file

@ -50,7 +50,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"return supplied value on timeout" in {
val timedOut = new KeptPromise[String](Right("Timedout"))
val promise = Promise[String]() orElse timedOut
promise.get must be("Timedout")
Block.sync(promise, timeout.duration) must be("Timedout")
}
}
"completed with a result" must {
@ -200,9 +200,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
c (actor ? 7).mapTo[String]
} yield b + "-" + c
future1.get must be("10-14")
Block.sync(future1, timeout.duration) must be("10-14")
assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { future2.get }
intercept[ClassCastException] { Block.sync(future2, timeout.duration) }
actor.stop()
}
}
@ -230,8 +230,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Res(c: Int) actor ? Req(7)
} yield b + "-" + c
future1.get must be("10-14")
intercept[MatchError] { future2.get }
Block.sync(future1, timeout.duration) must be("10-14")
intercept[MatchError] { Block.sync(future2, timeout.duration) }
actor.stop()
}
}
@ -267,17 +267,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val future11 = actor ? "Failure" recover { case _ "Oops!" }
future1.get must be(5)
intercept[ArithmeticException] { future2.get }
intercept[ArithmeticException] { future3.get }
future4.get must be("5")
future5.get must be("0")
intercept[ArithmeticException] { future6.get }
future7.get must be("You got ERROR")
intercept[RuntimeException] { future8.get }
future9.get must be("FAIL!")
future10.get must be("World")
future11.get must be("Oops!")
Block.sync(future1, timeout.duration) must be(5)
intercept[ArithmeticException] { Block.sync(future2, timeout.duration) }
intercept[ArithmeticException] { Block.sync(future3, timeout.duration) }
Block.sync(future4, timeout.duration) must be("5")
Block.sync(future5, timeout.duration) must be("0")
intercept[ArithmeticException] { Block.sync(future6, timeout.duration) }
Block.sync(future7, timeout.duration) must be("You got ERROR")
intercept[RuntimeException] { Block.sync(future8, timeout.duration) }
Block.sync(future9, timeout.duration) must be("FAIL!")
Block.sync(future10, timeout.duration) must be("World")
Block.sync(future11, timeout.duration) must be("Oops!")
actor.stop()
}
@ -285,16 +285,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5))
Future.firstCompletedOf(futures).get must be(5)
Block.sync(Future.firstCompletedOf(futures), timeout.duration) must be(5)
}
"find" in {
val futures = for (i 1 to 10) yield Future { i }
val result = Future.find[Int](futures)(_ == 3)
result.get must be(Some(3))
Block.sync(result, timeout.duration) must be(Some(3))
val notFound = Future.find[Int](futures)(_ == 11)
notFound.get must be(None)
Block.sync(notFound, timeout.duration) must be(None)
}
"fold" in {
@ -315,7 +315,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)).get must be(45)
Block.sync(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45)
}
"fold with an exception" in {
@ -353,7 +353,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"return zero value if folding empty list" in {
Future.fold(List[Future[Int]]())(0)(_ + _).get must be(0)
Block.sync(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0)
}
"shouldReduceResults" in {
@ -410,11 +410,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
assert(Future.sequence(oddFutures).get.sum === 10000)
assert(Block.sync(Future.sequence(oddFutures), timeout.duration).sum === 10000)
oddActor.stop()
val list = (1 to 100).toList
assert(Future.traverse(list)(x Future(x * 2 - 1)).get.sum === 10000)
assert(Block.sync(Future.traverse(list)(x Future(x * 2 - 1)), timeout.duration).sum === 10000)
}
"shouldHandleThrowables" in {
@ -461,7 +461,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + " " + y() + "!")
assert(r.get === "Hello World!")
assert(Block.sync(r, timeout.duration) === "Hello World!")
actor.stop
}
@ -475,7 +475,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
intercept[java.lang.ArithmeticException](r.get)
intercept[java.lang.ArithmeticException](Block.sync(r, timeout.duration))
}
}
@ -490,7 +490,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + y(), 100)
intercept[ClassCastException](r.get)
intercept[ClassCastException](Block.sync(r, timeout.duration))
}
}
@ -505,7 +505,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + y())
intercept[ClassCastException](r.get)
intercept[ClassCastException](Block.sync(r, timeout.duration))
}
}
@ -529,10 +529,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 5 }
assert(y.get === 5)
assert(z.get === 5)
assert(Block.sync(y, timeout.duration) === 5)
assert(Block.sync(z, timeout.duration) === 5)
assert(lz.isOpen)
assert(result.get === 10)
assert(Block.sync(result, timeout.duration) === 10)
val a, b, c = Promise[Int]()
@ -544,9 +544,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
c completeWith Future(5)
assert(a.get === 5)
assert(b.get === 3)
assert(result2.get === 50)
assert(Block.sync(a, timeout.duration) === 5)
assert(Block.sync(b, timeout.duration) === 3)
assert(Block.sync(result2, timeout.duration) === 50)
}
"futureDataFlowShouldEmulateBlocking1" in {
@ -571,7 +571,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Block.on(two, 1 minute)
assert(List(one, two).forall(_.isCompleted == true))
assert(simpleResult.get === 10)
assert(Block.sync(simpleResult, timeout.duration) === 10)
}
@ -625,7 +625,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(i2.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
s1.open
s2.open
assert(result.get === 10)
assert(Block.sync(result, timeout.duration) === 10)
}
"futureCompletingWithContinuationsFailure" in {
@ -649,8 +649,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 5 }
assert(y.get === 5)
intercept[java.lang.ArithmeticException](result.get)
assert(Block.sync(y, timeout.duration) === 5)
intercept[java.lang.ArithmeticException](Block.sync(result, timeout.duration))
assert(z.value === None)
assert(!lz.isOpen)
}
@ -673,7 +673,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
latch.open
assert(result.get === Some("Hello"))
assert(Block.sync(result, timeout.duration) === Some("Hello"))
}
"futureFlowShouldBeTypeSafe" in {
@ -711,7 +711,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 40 }
flow { y << 2 }
assert(z.get === 42)
assert(Block.sync(z, timeout.duration) === 42)
}
"futureFlowLoops" in {
@ -733,7 +733,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
var i = 0
promises foreach { p
assert(p.get === i)
assert(Block.sync(p, timeout.duration) === i)
i += 1
}
@ -793,7 +793,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"should not deadlock with nested await (ticket 1313)" in {
val simple = Future() map (_ (Future(()) map (_ ())).get)
val simple = Future() map (_ Block.sync((Future(()) map (_ ())), timeout.duration))
Block.on(simple, timeout.duration) must be('completed)
val l1, l2 = new StandardLatch
@ -818,16 +818,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
def futureWithResult(f: ((Future[Any], Any) Unit) Unit) {
"be completed" in { f((future, _) future must be('completed)) }
"contain a value" in { f((future, result) future.value must be(Some(Right(result)))) }
"return result with 'get'" in { f((future, result) future.get must be(result)) }
"return result with 'get'" in { f((future, result) Block.sync(future, timeout.duration) must be(result)) }
"return result with 'Block.sync'" in { f((future, result) Block.sync(future, timeout.duration) must be(result)) }
"not timeout" in { f((future, _) Block.on(future, 0 millis)) }
"filter result" in {
f { (future, result)
(future filter (_ true)).get must be(result)
(evaluating { (future filter (_ false)).get } must produce[MatchError]).getMessage must startWith(result.toString)
Block.sync((future filter (_ true)), timeout.duration) must be(result)
(evaluating { Block.sync((future filter (_ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString)
}
}
"transform result with map" in { f((future, result) (future map (_.toString.length)).get must be(result.toString.length)) }
"transform result with map" in { f((future, result) Block.sync((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) }
"compose result with flatMap" in {
f { (future, result)
val r = for (r future; p Promise.successful("foo")) yield r.toString + p
@ -862,12 +862,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
future.value.get.left.get.getMessage must be(message)
})
}
"throw exception with 'get'" in { f((future, message) (evaluating { future.get } must produce[E]).getMessage must be(message)) }
"throw exception with 'get'" in { f((future, message) (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"throw exception with 'Block.sync'" in { f((future, message) (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with filter" in {
f { (future, message)
(evaluating { (future filter (_ true)).get } must produce[E]).getMessage must be(message)
(evaluating { (future filter (_ false)).get } must produce[E]).getMessage must be(message)
(evaluating { Block.sync(future filter (_ true), timeout.duration) } must produce[E]).getMessage must be(message)
(evaluating { Block.sync(future filter (_ false), timeout.duration) } must produce[E]).getMessage must be(message)
}
}
"retain exception with map" in { f((future, message) (evaluating { Block.sync(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }

View file

@ -21,9 +21,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
b << q
c << q()
}
assert(a.get === 1)
assert(b.get === 2)
assert(c.get === 3)
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
}
"pend" in {
@ -35,9 +35,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
c << q
}
flow { q <<< List(1, 2, 3) }
assert(a.get === 1)
assert(b.get === 2)
assert(c.get === 3)
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
}
"pend again" in {
@ -54,10 +54,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
c << q1
d << q1
}
assert(a.get === 1)
assert(b.get === 2)
assert(c.get === 3)
assert(d.get === 4)
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Block.sync(d, timeout.duration) === 4)
}
"enque" in {
@ -71,10 +71,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
}
q ++= List(1, 2, 3, 4)
assert(a.get === 1)
assert(b.get === 2)
assert(c.get === 3)
assert(d.get === 4)
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Block.sync(d, timeout.duration) === 4)
}
"map" in {
@ -90,9 +90,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
flow {
qs << ("Hello", "World!", "Test")
}
assert(a.get === 5)
assert(b.get === "World!")
assert(c.get === 4)
assert(Block.sync(a, timeout.duration) === 5)
assert(Block.sync(b, timeout.duration) === "World!")
assert(Block.sync(c, timeout.duration) === 4)
}
"not fail under concurrent stress" in {
@ -128,8 +128,7 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
}
}
val result = future.get
assert(result === (1L to 100000L).sum)
assert(Block.sync(future, timeout.duration) === (1L to 100000L).sum)
}
}
}

View file

@ -8,6 +8,7 @@ import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._
import akka.dispatch.Block
object RoutingSpec {
@ -270,7 +271,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
shutdownLatch.await
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
}
"throw an exception, if all the connections have stopped" in {
@ -297,8 +298,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
(actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0)
Block.sync(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0)
}
"return the first response from connections, when some of them failed to reply" in {
@ -306,7 +306,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
(actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1)
Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
}
"be started when constructed" in {

View file

@ -339,7 +339,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
(systemGuardian ? CreateChild(props, name)).get match {
Block.sync(systemGuardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -347,7 +347,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
(guardian ? CreateChild(props, name)).get match {
Block.sync(guardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -355,7 +355,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
(guardian ? CreateRandomNameChild(props)).get match {
Block.sync(guardian ? CreateRandomNameChild(props), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}

View file

@ -415,7 +415,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex
}
case m (actor.?(m, timeout)).get.asInstanceOf[AnyRef]
case m Block.sync(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef]
}
}
}

View file

@ -376,24 +376,6 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
*/
def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T Future[Any]))
//Removed
/*def as[A](implicit m: Manifest[A]): Option[A] = {
try Block.on(this, Duration.Inf) catch { case _: TimeoutException }
value match {
case None None
case Some(Left(ex)) throw ex
case Some(Right(v))
try { Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) } catch {
case c: ClassCastException
if (v.asInstanceOf[AnyRef] eq null) throw new ClassCastException("null cannot be cast to " + m.erasure)
else throw new ClassCastException("'" + v + "' of class " + v.asInstanceOf[AnyRef].getClass + " cannot be cast to " + m.erasure)
}
}
}*/
@deprecated("Used Block.on(future, timeoutDuration)")
def get: T = Block.sync(this, Duration.Inf)
/**
* Tests whether this Future has been completed.
*/

View file

@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Block
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
@ -146,7 +147,7 @@ trait LoggingBus extends ActorEventBus {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds)
val response = try actor ? InitializeLogger(this) get catch {
val response = try Block.sync(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}

View file

@ -85,7 +85,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
})
monitorReponse.get must be("Too much memory is used!")
Block.sync(monitorReponse, 5 seconds) must be("Too much memory is used!")
}

View file

@ -11,6 +11,7 @@ import akka.testkit.{ EventFilter, TestEvent }
import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import akka.cluster.LocalCluster
import akka.dispatch.Block
object DirectRoutingFailoverMultiJvmSpec {
@ -48,7 +49,7 @@ class DirectRoutingFailoverMultiJvmNode1 extends MasterClusterTestNode {
}
LocalCluster.barrier("verify-actor", NrOfNodes) {
(actor ? "identify").get must equal("node2")
Block.sync(actor ? "identify", timeout.duration) must equal("node2")
}
val timer = Timer(30.seconds, true)

View file

@ -11,6 +11,7 @@ import java.util.{ Collections, Set ⇒ JSet }
import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import akka.cluster.LocalCluster._
import akka.dispatch.Block
object RandomFailoverMultiJvmSpec {
@ -91,7 +92,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random
val value = (actor ? "identify").get.asInstanceOf[String]
val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -12,6 +12,7 @@ import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import java.lang.Thread
import akka.cluster.LocalCluster._
import akka.dispatch.Block
object RoundRobinFailoverMultiJvmSpec {
@ -94,7 +95,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until 100) {
val value = (actor ? "identify").get.asInstanceOf[String]
val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -11,6 +11,7 @@ import java.nio.channels.NotYetConnectedException
import java.lang.Thread
import akka.routing.Routing.Broadcast
import akka.cluster.LocalCluster._
import akka.dispatch.Block
object ScatterGatherFailoverMultiJvmSpec {
@ -84,7 +85,7 @@ class ScatterGatherFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 2) {
val value = (actor ? "foo").get.asInstanceOf[String]
val value = Block.sync(actor ? "foo", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -25,8 +25,8 @@ public class UntypedTransactorExample {
long timeout = 5000;
Duration d = Duration.create(timeout, TimeUnit.MILLISECONDS);
Future future1 = counter1.ask("GetCount", timeout);
Future future2 = counter2.ask("GetCount", timeout);
Future<Object> future1 = counter1.ask("GetCount", timeout);
Future<Object> future2 = counter2.ask("GetCount", timeout);
int count1 = (Integer)Block.sync(future1, d);
System.out.println("counter 1: " + count1);

View file

@ -2,6 +2,8 @@ package akka.transactor.test;
import static org.junit.Assert.*;
import akka.dispatch.Block;
import akka.util.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -10,7 +12,6 @@ import org.junit.Before;
import akka.actor.ActorSystem;
import akka.transactor.Coordinated;
import akka.actor.Actors;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
@ -28,7 +29,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
@ -82,7 +82,7 @@ public class UntypedCoordinatedIncrementTest {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
assertEquals(1, ((Integer) future.get()).intValue());
assertEquals(1, ((Integer) Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
}
}
@ -102,8 +102,8 @@ public class UntypedCoordinatedIncrementTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future future = counter.ask("GetCount", askTimeout);
assertEquals(0, ((Integer) future.get()).intValue());
Future<Object>future = counter.ask("GetCount", askTimeout);
assertEquals(0,((Integer)Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
}
}

View file

@ -101,7 +101,7 @@ public class UntypedTransactorTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future future = counter.ask("GetCount", askTimeout);
Future<Object> future = counter.ask("GetCount", askTimeout);
int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
assertEquals(0, count);
}

View file

@ -140,7 +140,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
system.registerOnTermination(latch.countDown())
system.stop()
latch.await(2 seconds)
(davyJones ? "Die!").get must be === "finally gone"
Block.sync(davyJones ? "Die!", timeout.duration) must be === "finally gone"
// this will typically also contain log messages which were sent after the logger shutdown
locker must contain(DeadLetter(42, davyJones, probe.ref))

View file

@ -4,8 +4,8 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.dispatch.Future
import akka.util.duration._
import akka.dispatch.{ Block, Future }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestProbeSpec extends AkkaSpec with DefaultTimeout {
@ -18,7 +18,7 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout {
tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
tk.lastMessage.sender ! "world"
future must be('completed)
future.get must equal("world")
Block.sync(future, timeout.duration) must equal("world")
}
"reply to messages" in {