Renaming Block to Await, renaming sync to result, renaming on to ready, Await.ready and Await.result looks and reads well

This commit is contained in:
Viktor Klang 2011-12-12 22:50:08 +01:00
parent d8fe6a5509
commit b32cbbc764
66 changed files with 404 additions and 409 deletions

View file

@ -54,7 +54,7 @@ public class JavaFutureTests {
}
});
assertEquals("Hello World",Block.sync(f2, timeout));
assertEquals("Hello World", Await.result(f2, timeout));
}
@Test
@ -71,7 +71,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(Block.sync(f, timeout), "foo");
assertEquals(Await.result(f, timeout), "foo");
}
@Test
@ -105,7 +105,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(Block.sync(f, timeout), "foo");
assertEquals(Await.result(f, timeout), "foo");
}
@Test
@ -121,7 +121,7 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(Block.sync(f, timeout), "foo");
assertEquals(Await.result(f, timeout), "foo");
}
@Test
@ -139,8 +139,8 @@ public class JavaFutureTests {
}
});
assertEquals(Block.sync(f, timeout), "1000");
assertEquals(Block.sync(r, timeout).intValue(), 1000);
assertEquals(Await.result(f, timeout), "1000");
assertEquals(Await.result(r, timeout).intValue(), 1000);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
@ -158,8 +158,8 @@ public class JavaFutureTests {
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(Block.sync(f, timeout), "foo");
assertEquals(Block.sync(r, timeout), "foo");
assertEquals(Await.result(f, timeout), "foo");
assertEquals(Await.result(r, timeout), "foo");
}
// TODO: Improve this test, perhaps with an Actor
@ -179,7 +179,7 @@ public class JavaFutureTests {
Future<Iterable<String>> futureList = Futures.sequence(listFutures, system.dispatcher());
assertEquals(Block.sync(futureList, timeout), listExpected);
assertEquals(Await.result(futureList, timeout), listExpected);
}
// TODO: Improve this test, perhaps with an Actor
@ -203,7 +203,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(Block.sync(result, timeout), expected.toString());
assertEquals(Await.result(result, timeout), expected.toString());
}
@Test
@ -226,7 +226,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(Block.sync(result, timeout), expected.toString());
assertEquals(Await.result(result, timeout), expected.toString());
}
@Test
@ -249,7 +249,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(Block.sync(result, timeout), expectedStrings);
assertEquals(Await.result(result, timeout), expectedStrings);
}
@Test
@ -270,7 +270,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
assertEquals(expect, Block.sync(f, timeout));
assertEquals(expect, Await.result(f, timeout));
}
@Test
@ -278,7 +278,7 @@ public class JavaFutureTests {
Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo");
Block.on(p, d);
assertEquals(Block.sync(p, d), "foo");
Await.ready(p, d);
assertEquals(Await.result(p, d), "foo");
}
}

View file

@ -7,7 +7,7 @@ package akka.actor
import akka.testkit._
import org.scalatest.BeforeAndAfterEach
import akka.util.duration._
import akka.dispatch.Block
import akka.dispatch.Await
object ActorFireForgetRequestReplySpec {
@ -81,7 +81,7 @@ class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach w
"should shutdown crashed temporary actor" in {
filterEvents(EventFilter[Exception]("Expected exception")) {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0))))
val actor = Block.sync((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
val actor = Await.result((supervisor ? Props[CrashingActor]).mapTo[ActorRef], timeout.duration)
actor.isTerminated must be(false)
actor ! "Die"
state.finished.await

View file

@ -11,7 +11,7 @@ import akka.actor.Actor._
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Block
import akka.dispatch.Await
object ActorLifeCycleSpec {
@ -41,7 +41,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
override def postRestart(reason: Throwable) { report("postRestart") }
})
val restarter = Block.sync((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
restarter ! Kill
@ -72,7 +72,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
val restarter = Block.sync((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
val restarter = Await.result((supervisor ? restarterProps).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
restarter ! Kill
@ -102,7 +102,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
val props = Props(new LifeCycleTestActor(testActor, id, gen))
val a = Block.sync((supervisor ? props).mapTo[ActorRef], timeout.duration)
val a = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
expectMsg(("preStart", id, 0))
a ! "status"
expectMsg(("OK", id, 0))

View file

@ -5,7 +5,7 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.dispatch.Block
import akka.dispatch.Await
object ActorLookupSpec {
@ -37,7 +37,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val c1 = system.actorOf(p, "c1")
val c2 = system.actorOf(p, "c2")
val c21 = Block.sync((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
val c21 = Await.result((c2 ? Create("c21")).mapTo[ActorRef], timeout.duration)
val user = system.asInstanceOf[ActorSystemImpl].guardian
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
@ -123,7 +123,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
f.isCompleted must be === false
a ! 42
f.isCompleted must be === true
Block.sync(f, timeout.duration) must be === 42
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second)
}
@ -136,7 +136,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their path" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
Block.sync(looker ? LookupPath(pathOf.path), timeout.duration) must be === result
Await.result(looker ? LookupPath(pathOf.path), timeout.duration) must be === result
}
for {
looker all
@ -146,8 +146,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their string representation" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
Block.sync(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result
Block.sync(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result
Await.result(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result
Await.result(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result
}
for {
looker all
@ -157,8 +157,8 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their root-anchored relative path" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result
Block.sync(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result
Await.result(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "")), timeout.duration) must be === result
Await.result(looker ? LookupString(pathOf.path.elements.mkString("/", "/", "/")), timeout.duration) must be === result
}
for {
looker all
@ -168,9 +168,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their relative path" in {
def check(looker: ActorRef, result: ActorRef, elems: String*) {
Block.sync(looker ? LookupElems(elems), timeout.duration) must be === result
Block.sync(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result
Block.sync(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result
Await.result(looker ? LookupElems(elems), timeout.duration) must be === result
Await.result(looker ? LookupString(elems mkString "/"), timeout.duration) must be === result
Await.result(looker ? LookupString(elems mkString ("", "/", "/")), timeout.duration) must be === result
}
check(c1, user, "..")
for {
@ -185,11 +185,11 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find system-generated actors" in {
def check(target: ActorRef) {
for (looker all) {
Block.sync(looker ? LookupPath(target.path), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.toString), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target
Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target
if (target != root) Block.sync(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target
Await.result(looker ? LookupPath(target.path), timeout.duration) must be === target
Await.result(looker ? LookupString(target.path.toString), timeout.duration) must be === target
Await.result(looker ? LookupString(target.path.toString + "/"), timeout.duration) must be === target
Await.result(looker ? LookupString(target.path.elements.mkString("/", "/", "")), timeout.duration) must be === target
if (target != root) Await.result(looker ? LookupString(target.path.elements.mkString("/", "/", "/")), timeout.duration) must be === target
}
}
for (target Seq(root, syst, user, system.deadLetters)) check(target)
@ -199,7 +199,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
import scala.collection.JavaConverters._
def checkOne(looker: ActorRef, query: Query) {
Block.sync(looker ? query, timeout.duration) must be === system.deadLetters
Await.result(looker ? query, timeout.duration) must be === system.deadLetters
}
def check(looker: ActorRef) {
Seq(LookupString("a/b/c"),
@ -218,21 +218,21 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val f = c1 ? GetSender(testActor)
val a = expectMsgType[ActorRef]
a.path.elements.head must be === "temp"
Block.sync(c2 ? LookupPath(a.path), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.toString), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a
Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a
Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a
Block.sync(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a
Await.result(c2 ? LookupPath(a.path), timeout.duration) must be === a
Await.result(c2 ? LookupString(a.path.toString), timeout.duration) must be === a
Await.result(c2 ? LookupString(a.path.elements.mkString("/", "/", "")), timeout.duration) must be === a
Await.result(c2 ? LookupString("../../" + a.path.elements.mkString("/")), timeout.duration) must be === a
Await.result(c2 ? LookupString(a.path.toString + "/"), timeout.duration) must be === a
Await.result(c2 ? LookupString(a.path.elements.mkString("/", "/", "") + "/"), timeout.duration) must be === a
Await.result(c2 ? LookupString("../../" + a.path.elements.mkString("/") + "/"), timeout.duration) must be === a
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a
f.isCompleted must be === false
a ! 42
f.isCompleted must be === true
Block.sync(f, timeout.duration) must be === 42
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(Block.sync(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second)
awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second)
}
}

View file

@ -13,7 +13,7 @@ import java.lang.IllegalStateException
import akka.util.ReflectiveAccess
import akka.serialization.Serialization
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.dispatch.{ Block, DefaultPromise, Promise, Future }
import akka.dispatch.{ Await, DefaultPromise, Promise, Future }
object ActorRefSpec {
@ -128,7 +128,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def wrap[T](f: Promise[Actor] T): T = {
val result = Promise[Actor]()
val r = f(result)
Block.sync(result, 1 minute)
Await.result(result, 1 minute)
r
}
@ -306,7 +306,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def receive = { case _ sender ! nested }
})
val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration)
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
a must not be null
nested must not be null
(a ne nested) must be === true
@ -314,13 +314,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
"support advanced nested actorOfs" in {
val a = system.actorOf(Props(new OuterActor(system.actorOf(Props(new InnerActor)))))
val inner = Block.sync(a ? "innerself", timeout.duration)
val inner = Await.result(a ? "innerself", timeout.duration)
Block.sync(a ? a, timeout.duration) must be(a)
Block.sync(a ? "self", timeout.duration) must be(a)
Await.result(a ? a, timeout.duration) must be(a)
Await.result(a ? "self", timeout.duration) must be(a)
inner must not be a
Block.sync(a ? "msg", timeout.duration) must be === "msg"
Await.result(a ? "msg", timeout.duration) must be === "msg"
}
"support reply via sender" in {
@ -361,8 +361,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val fnull = (ref ? (null, timeout)).mapTo[String]
ref ! PoisonPill
Block.sync(ffive, timeout.duration) must be("five")
Block.sync(fnull, timeout.duration) must be("null")
Await.result(ffive, timeout.duration) must be("five")
Await.result(fnull, timeout.duration) must be("null")
awaitCond(ref.isTerminated, 2000 millis)
}

View file

@ -8,7 +8,7 @@ import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.TimeoutException
import akka.dispatch.Block
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
@ -29,7 +29,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val echo = actorWithTimeout(Timeout(12))
try {
val f = echo ? "hallo"
intercept[TimeoutException] { Block.on(f, system.settings.ActorTimeout.duration) }
intercept[TimeoutException] { Await.ready(f, system.settings.ActorTimeout.duration) }
} finally { echo.stop }
}
}
@ -40,7 +40,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val echo = actorWithTimeout(Props.defaultTimeout)
try {
val f = (echo ? "hallo").mapTo[String]
intercept[TimeoutException] { Block.on(f, timeout.duration) }
intercept[TimeoutException] { Await.ready(f, timeout.duration) }
f.value must be(None)
} finally { echo.stop }
}
@ -51,7 +51,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
val echo = actorWithTimeout(Props.defaultTimeout)
val f = echo.?("hallo", testTimeout)
try {
intercept[TimeoutException] { Block.on(f, testTimeout) }
intercept[TimeoutException] { Await.ready(f, testTimeout) }
f.value must be === None
} finally { echo.stop }
}

View file

@ -8,7 +8,7 @@ import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Block
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
@ -79,13 +79,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
filterException[ActorKilledException] {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(2))))
val terminalProps = Props(context { case x context.sender ! x })
val terminal = Block.sync((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration)
val monitor = startWatching(terminal)
terminal ! Kill
terminal ! Kill
Block.sync(terminal ? "foo", timeout.duration) must be === "foo"
Await.result(terminal ? "foo", timeout.duration) must be === "foo"
terminal ! Kill
expectTerminationOf(terminal)
@ -106,8 +106,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
}
}))
val failed = Block.sync((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
val brother = Block.sync((supervisor ? Props(new Actor {
val failed = Await.result((supervisor ? Props.empty).mapTo[ActorRef], timeout.duration)
val brother = Await.result((supervisor ? Props(new Actor {
context.watch(failed)
def receive = Actor.emptyBehavior
})).mapTo[ActorRef], timeout.duration)

View file

@ -8,7 +8,7 @@ import akka.testkit._
import akka.util.duration._
import Actor._
import akka.util.Duration
import akka.dispatch.Block
import akka.dispatch.Await
object ForwardActorSpec {
val ExpectedMessage = "FOO"

View file

@ -10,7 +10,7 @@ import akka.util.ByteString
import akka.util.cps._
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Block, Future }
import akka.dispatch.{ Await, Future }
object IOActorSpec {
import IO._
@ -193,9 +193,9 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val f1 = client ? ByteString("Hello World!1")
val f2 = client ? ByteString("Hello World!2")
val f3 = client ? ByteString("Hello World!3")
Block.sync(f1, timeout.duration) must equal(ByteString("Hello World!1"))
Block.sync(f2, timeout.duration) must equal(ByteString("Hello World!2"))
Block.sync(f3, timeout.duration) must equal(ByteString("Hello World!3"))
Await.result(f1, timeout.duration) must equal(ByteString("Hello World!1"))
Await.result(f2, timeout.duration) must equal(ByteString("Hello World!2"))
Await.result(f3, timeout.duration) must equal(ByteString("Hello World!3"))
client.stop
server.stop
ioManager.stop
@ -209,7 +209,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val client = system.actorOf(new SimpleEchoClient("localhost", 8065, ioManager))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(Block.sync(f, timeout.duration).size === 1000)
assert(Await.result(f, timeout.duration).size === 1000)
client.stop
server.stop
ioManager.stop
@ -223,7 +223,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val client = system.actorOf(new SimpleEchoClient("localhost", 8066, ioManager))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
assert(Block.sync(f, timeout.duration).size === 1000)
assert(Await.result(f, timeout.duration).size === 1000)
client.stop
server.stop
ioManager.stop
@ -239,17 +239,17 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val f1 = client1 ? (('set, "hello", ByteString("World")))
val f2 = client1 ? (('set, "test", ByteString("No one will read me")))
val f3 = client1 ? (('get, "hello"))
Block.on(f2, timeout.duration)
Await.ready(f2, timeout.duration)
val f4 = client2 ? (('set, "test", ByteString("I'm a test!")))
Block.on(f4, timeout.duration)
Await.ready(f4, timeout.duration)
val f5 = client1 ? (('get, "test"))
val f6 = client2 ? 'getall
Block.sync(f1, timeout.duration) must equal("OK")
Block.sync(f2, timeout.duration) must equal("OK")
Block.sync(f3, timeout.duration) must equal(ByteString("World"))
Block.sync(f4, timeout.duration) must equal("OK")
Block.sync(f5, timeout.duration) must equal(ByteString("I'm a test!"))
Block.sync(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
Await.result(f1, timeout.duration) must equal("OK")
Await.result(f2, timeout.duration) must equal("OK")
Await.result(f3, timeout.duration) must equal(ByteString("World"))
Await.result(f4, timeout.duration) must equal("OK")
Await.result(f5, timeout.duration) must equal(ByteString("I'm a test!"))
Await.result(f6, timeout.duration) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
client1.stop
client2.stop
server.stop

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.dispatch.{ Block, Future }
import akka.dispatch.{ Await, Future }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalActorRefProviderSpec extends AkkaSpec {
@ -32,7 +32,7 @@ class LocalActorRefProviderSpec extends AkkaSpec {
val address = "new-actor" + i
implicit val timeout = Timeout(5 seconds)
val actors = for (j 1 to 4) yield Future(system.actorOf(Props(c { case _ }), address))
val set = Set() ++ actors.map(a Block.on(a, timeout.duration).value match {
val set = Set() ++ actors.map(a Await.ready(a, timeout.duration).value match {
case Some(Right(a: ActorRef)) 1
case Some(Left(ex: InvalidActorNameException)) 2
case x x

View file

@ -6,7 +6,7 @@ package akka.actor
import java.lang.Thread.sleep
import org.scalatest.BeforeAndAfterAll
import akka.dispatch.Block
import akka.dispatch.Await
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import java.util.concurrent.{ TimeUnit, CountDownLatch }
@ -52,7 +52,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open
}
})
val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
@ -87,7 +87,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
countDownLatch.countDown()
}
})
val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
(1 to 100) foreach { _ slave ! Crash }
assert(countDownLatch.await(120, TimeUnit.SECONDS))
@ -125,7 +125,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
}
}
})
val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
@ -176,7 +176,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open
}
})
val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash
@ -228,7 +228,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open
}
})
val slave = Block.sync((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! Ping
slave ! Crash

View file

@ -7,7 +7,7 @@ import akka.testkit.EventFilter
import akka.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit.DefaultTimeout
import akka.dispatch.Block
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
@ -114,7 +114,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
override def postRestart(reason: Throwable) = restartLatch.open
})
val actor = Block.sync((supervisor ? props).mapTo[ActorRef], timeout.duration)
val actor = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping))
// appx 2 pings before crash

View file

@ -7,7 +7,7 @@ package akka.actor
import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.dispatch.Block
import akka.dispatch.Await
object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg)
@ -34,10 +34,10 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), None, None)))
val managerProps = Props(new CountDownActor(countDown)).withFaultHandler(AllForOneStrategy(List(), None, None))
val manager = Block.sync((boss ? managerProps).mapTo[ActorRef], timeout.duration)
val manager = Await.result((boss ? managerProps).mapTo[ActorRef], timeout.duration)
val workerProps = Props(new CountDownActor(countDown))
val workerOne, workerTwo, workerThree = Block.sync((manager ? workerProps).mapTo[ActorRef], timeout.duration)
val workerOne, workerTwo, workerThree = Await.result((manager ? workerProps).mapTo[ActorRef], timeout.duration)
filterException[ActorKilledException] {
workerOne ! Kill

View file

@ -4,7 +4,7 @@
package akka.actor
import akka.testkit.{ filterEvents, EventFilter }
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Block }
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@ -28,11 +28,11 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout {
}
})
val actor1, actor2 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration)
val actor1, actor2 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration)
val actor3 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newDispatcher("test").build)).mapTo[ActorRef], timeout.duration)
val actor3 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newDispatcher("test").build)).mapTo[ActorRef], timeout.duration)
val actor4 = Block.sync((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration)
val actor4 = Await.result((supervisor ? workerProps.withDispatcher(system.dispatcherFactory.newPinnedDispatcher("pinned"))).mapTo[ActorRef], timeout.duration)
actor1 ! Kill
actor2 ! Kill
@ -40,10 +40,10 @@ class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout {
actor4 ! Kill
countDownLatch.await(10, TimeUnit.SECONDS)
assert(Block.sync(actor1 ? "status", timeout.duration) == "OK", "actor1 is shutdown")
assert(Block.sync(actor2 ? "status", timeout.duration) == "OK", "actor2 is shutdown")
assert(Block.sync(actor3 ? "status", timeout.duration) == "OK", "actor3 is shutdown")
assert(Block.sync(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown")
assert(Await.result(actor1 ? "status", timeout.duration) == "OK", "actor1 is shutdown")
assert(Await.result(actor2 ? "status", timeout.duration) == "OK", "actor2 is shutdown")
assert(Await.result(actor3 ? "status", timeout.duration) == "OK", "actor3 is shutdown")
assert(Await.result(actor4 ? "status", timeout.duration) == "OK", "actor4 is shutdown")
}
}
}

View file

@ -10,7 +10,7 @@ import akka.{ Die, Ping }
import akka.testkit.TestEvent._
import akka.testkit._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Block
import akka.dispatch.Await
object SupervisorSpec {
val Timeout = 5 seconds
@ -72,7 +72,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
// Creating actors and supervisors
// =====================================================
private def child(supervisor: ActorRef, props: Props): ActorRef = Block.sync((supervisor ? props).mapTo[ActorRef], props.timeout.duration)
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], props.timeout.duration)
def temporaryActorAllForOne = {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0))))
@ -128,14 +128,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
}
def ping(pingPongActor: ActorRef) = {
Block.sync(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
Await.result(pingPongActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
expectMsg(Timeout, PingMessage)
}
def kill(pingPongActor: ActorRef) = {
val result = (pingPongActor ? (DieReply, TimeoutMillis))
expectMsg(Timeout, ExceptionMessage)
intercept[RuntimeException] { Block.sync(result, TimeoutMillis millis) }
intercept[RuntimeException] { Await.result(result, TimeoutMillis millis) }
}
"A supervisor" must {
@ -151,7 +151,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
"not restart temporary actor" in {
val (temporaryActor, _) = temporaryActorAllForOne
intercept[RuntimeException] { Block.sync(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) }
intercept[RuntimeException] { Await.result(temporaryActor.?(DieReply, TimeoutMillis), TimeoutMillis millis) }
expectNoMsg(1 second)
}
@ -292,16 +292,16 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
throw e
}
})
val dyingActor = Block.sync((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration)
val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration)
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
intercept[RuntimeException] {
Block.sync(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis)
Await.result(dyingActor.?(DieReply, TimeoutMillis), TimeoutMillis millis)
}
}
Block.sync(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
Await.result(dyingActor.?(Ping, TimeoutMillis), TimeoutMillis millis) must be === PongMessage
inits.get must be(3)

View file

@ -11,7 +11,7 @@ import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Block, Dispatchers }
import akka.dispatch.{ Await, Dispatchers }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {
@ -28,8 +28,8 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeou
override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.path }
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000))
val headActor = system.actorOf(p)
val middleActor = Block.sync((headActor ? p).mapTo[ActorRef], timeout.duration)
val lastActor = Block.sync((middleActor ? p).mapTo[ActorRef], timeout.duration)
val middleActor = Await.result((headActor ? p).mapTo[ActorRef], timeout.duration)
val lastActor = Await.result((middleActor ? p).mapTo[ActorRef], timeout.duration)
middleActor ! Kill
expectMsg(middleActor.path)

View file

@ -10,7 +10,7 @@ import akka.testkit.{ TestKit, filterEvents, EventFilter }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
import akka.dispatch.Block
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
@ -25,7 +25,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"be able to reply on failure during preRestart" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 5, 10000)))
val supervised = Block.sync((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor)
expectMsg("failure1")
@ -36,7 +36,7 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender
"be able to reply on failure during postStop" in {
filterEvents(EventFilter[Exception]("test", occurrences = 1)) {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0), None)))
val supervised = Block.sync((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
val supervised = Await.result((supervisor ? Props[Supervised]).mapTo[ActorRef], timeout.duration)
supervised.!("test")(testActor)
expectMsg("failure2")

View file

@ -16,7 +16,7 @@ import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Block, Dispatchers, Future, KeptPromise }
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
object TypedActorSpec {
@ -85,7 +85,7 @@ object TypedActorSpec {
def pigdog = "Pigdog"
def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog))
def futurePigdog(): Future[String] = Promise.successful(pigdog)
def futurePigdog(delay: Long): Future[String] = {
Thread.sleep(delay)
@ -94,7 +94,7 @@ object TypedActorSpec {
def futurePigdog(delay: Long, numbered: Int): Future[String] = {
Thread.sleep(delay)
new KeptPromise(Right(pigdog + numbered))
Promise.successful(pigdog + numbered)
}
def futureComposePigdogFrom(foo: Foo): Future[String] = {
@ -247,7 +247,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t = newFooBar
val f = t.futurePigdog(200)
f.isCompleted must be(false)
Block.sync(f, timeout.duration) must be("Pigdog")
Await.result(f, timeout.duration) must be("Pigdog")
mustStop(t)
}
@ -255,7 +255,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t = newFooBar
val futures = for (i 1 to 20) yield (i, t.futurePigdog(20, i))
for ((i, f) futures) {
Block.sync(f, timeout.duration) must be("Pigdog" + i)
Await.result(f, timeout.duration) must be("Pigdog" + i)
}
mustStop(t)
}
@ -278,7 +278,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val t, t2 = newFooBar(Duration(2, "s"))
val f = t.futureComposePigdogFrom(t2)
f.isCompleted must be(false)
Block.sync(f, timeout.duration) must equal("PIGDOG")
Await.result(f, timeout.duration) must equal("PIGDOG")
mustStop(t)
mustStop(t2)
}
@ -290,13 +290,13 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
}).withFaultHandler(OneForOneStrategy {
case e: IllegalStateException if e.getMessage == "expected" FaultHandlingStrategy.Resume
}))
val t = Block.sync((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
val t = Await.result((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
t.incr()
t.failingPigdog()
t.read() must be(1) //Make sure state is not reset after failure
intercept[IllegalStateException] { Block.sync(t.failingFuturePigdog, 2 seconds) }.getMessage must be("expected")
intercept[IllegalStateException] { Await.result(t.failingFuturePigdog, 2 seconds) }.getMessage must be("expected")
t.read() must be(1) //Make sure state is not reset after failure
(intercept[IllegalStateException] { t.failingJOptionPigdog }).getMessage must be("expected")
@ -323,7 +323,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val f2 = t.futurePigdog(0)
f2.isCompleted must be(false)
f.isCompleted must be(false)
Block.sync(f, timeout.duration) must equal(Block.sync(f2, timeout.duration))
Await.result(f, timeout.duration) must equal(Await.result(f2, timeout.duration))
mustStop(t)
}
@ -348,7 +348,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i))
for ((i, r) results) Block.sync(r, timeout.duration) must be("Pigdog" + i)
for ((i, r) results) Await.result(r, timeout.duration) must be("Pigdog" + i)
for (t thais) mustStop(t)
}

View file

@ -31,7 +31,7 @@ object ActorModelSpec {
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class AwaitLatch(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
@ -68,7 +68,7 @@ object ActorModelSpec {
}
def receive = {
case Await(latch) ack; latch.await(); busy.switchOff()
case AwaitLatch(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
@ -385,17 +385,17 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
val a = newTestActor(dispatcher)
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException new KeptPromise(Left(ActorInterruptedException(ie))) }
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(ActorInterruptedException(ie)) }
val f4 = a ? Reply("foo2")
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException new KeptPromise(Left(ActorInterruptedException(ie))) }
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException Promise.failed(ActorInterruptedException(ie)) }
val f6 = a ? Reply("bar2")
assert(Block.sync(f1, timeout.duration) === "foo")
assert(Block.sync(f2, timeout.duration) === "bar")
assert(Block.sync(f4, timeout.duration) === "foo2")
assert(intercept[ActorInterruptedException](Block.sync(f3, timeout.duration)).getMessage === "Ping!")
assert(Block.sync(f6, timeout.duration) === "bar2")
assert(intercept[ActorInterruptedException](Block.sync(f5, timeout.duration)).getMessage === "Ping!")
assert(Await.result(f1, timeout.duration) === "foo")
assert(Await.result(f2, timeout.duration) === "bar")
assert(Await.result(f4, timeout.duration) === "foo2")
assert(intercept[ActorInterruptedException](Await.result(f3, timeout.duration)).getMessage === "Ping!")
assert(Await.result(f6, timeout.duration) === "bar2")
assert(intercept[ActorInterruptedException](Await.result(f5, timeout.duration)).getMessage === "Ping!")
}
}
@ -410,10 +410,10 @@ abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
val f5 = a ? ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(Block.sync(f1, timeout.duration) === "foo")
assert(Block.sync(f2, timeout.duration) === "bar")
assert(Block.sync(f4, timeout.duration) === "foo2")
assert(Block.sync(f6, timeout.duration) === "bar2")
assert(Await.result(f1, timeout.duration) === "foo")
assert(Await.result(f2, timeout.duration) === "bar")
assert(Await.result(f4, timeout.duration) === "foo2")
assert(Await.result(f6, timeout.duration) === "bar2")
assert(f3.value.isEmpty)
assert(f5.value.isEmpty)
}

View file

@ -7,7 +7,7 @@ import akka.actor.{ Props, Actor }
import akka.util.Duration
import akka.util.duration._
import akka.testkit.DefaultTimeout
import akka.dispatch.{ Block, PinnedDispatcher, Dispatchers, Dispatcher }
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers, Dispatcher }
object DispatcherActorSpec {
class TestActor extends Actor {
@ -44,7 +44,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
"support ask/reply" in {
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newDispatcher("test").build))
assert("World" === Block.sync(actor ? "Hello", timeout.duration))
assert("World" === Await.result(actor ? "Hello", timeout.duration))
actor.stop()
}
@ -66,7 +66,7 @@ class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
case "ping" if (works.get) latch.countDown()
}).withDispatcher(throughputDispatcher))
assert(Block.sync(slowOne ? "hogexecutor", timeout.duration) === "OK")
assert(Await.result(slowOne ? "hogexecutor", timeout.duration) === "OK")
(1 to 100) foreach { _ slowOne ! "ping" }
fastOne ! "sabotage"
start.countDown()

View file

@ -6,7 +6,7 @@ import akka.testkit._
import akka.actor.{ Props, Actor }
import akka.testkit.AkkaSpec
import org.scalatest.BeforeAndAfterEach
import akka.dispatch.{ Block, PinnedDispatcher, Dispatchers }
import akka.dispatch.{ Await, PinnedDispatcher, Dispatchers }
object PinnedActorSpec {
class TestActor extends Actor {
@ -35,7 +35,7 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeo
"support ask/reply" in {
val actor = system.actorOf(Props[TestActor].withDispatcher(system.dispatcherFactory.newPinnedDispatcher("test")))
assert("World" === Block.sync(actor ? "Hello", timeout.duration))
assert("World" === Await.result(actor ? "Hello", timeout.duration))
actor.stop()
}
}

View file

@ -4,7 +4,7 @@
package akka.dataflow
import akka.actor.{ Actor, Props }
import akka.dispatch.{ Future, Block }
import akka.dispatch.{ Future, Await }
import akka.actor.future2actor
import akka.util.duration._
import akka.testkit.AkkaSpec
@ -26,9 +26,9 @@ class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
case "ex" Future(throw new AssertionError) pipeTo context.sender
}
}))
Block.sync(actor ? "do", timeout.duration) must be(31)
Await.result(actor ? "do", timeout.duration) must be(31)
intercept[AssertionError] {
Block.sync(actor ? "ex", timeout.duration)
Await.result(actor ? "ex", timeout.duration)
}
}
}

View file

@ -48,9 +48,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"never completed" must {
behave like emptyFuture(_(Promise()))
"return supplied value on timeout" in {
val timedOut = new KeptPromise[String](Right("Timedout"))
val timedOut = Promise.successful[String]("Timedout")
val promise = Promise[String]() orElse timedOut
Block.sync(promise, timeout.duration) must be("Timedout")
Await.result(promise, timeout.duration) must be("Timedout")
}
}
"completed with a result" must {
@ -77,7 +77,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
test(future)
latch.open
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
}
}
"is completed" must {
@ -89,7 +89,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
result
}
latch.open
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, result)
}
}
@ -98,8 +98,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[ArithmeticException] {
check({ (future: Future[Int], actions: List[FutureAction])
val result = (future /: actions)(_ /: _)
val expected = (Block.on(future, timeout.duration).value.get /: actions)(_ /: _)
((Block.on(result, timeout.duration).value.get, expected) match {
val expected = (Await.ready(future, timeout.duration).value.get /: actions)(_ /: _)
((Await.ready(result, timeout.duration).value.get, expected) match {
case (Right(a), Right(b)) a == b
case (Left(a), Left(b)) if a.toString == b.toString true
case (Left(a), Left(b)) if a.getStackTrace.isEmpty || b.getStackTrace.isEmpty
@ -117,7 +117,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
behave like futureWithResult { test
val actor = system.actorOf[TestActor]
val future = actor ? "Hello"
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, "World")
actor.stop()
}
@ -127,7 +127,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[RuntimeException] {
val actor = system.actorOf[TestActor]
val future = actor ? "Failure"
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, "Expected exception; to test fault-tolerance")
actor.stop()
}
@ -141,7 +141,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, "WORLD")
actor1.stop()
actor2.stop()
@ -153,7 +153,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! Status.Failure(new ArithmeticException("/ by zero")) } })
val future = actor1 ? "Hello" flatMap { case s: String actor2 ? s }
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, "/ by zero")
actor1.stop()
actor2.stop()
@ -166,7 +166,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val actor1 = system.actorOf[TestActor]
val actor2 = system.actorOf(new Actor { def receive = { case s: String sender ! s.toUpperCase } })
val future = actor1 ? "Hello" flatMap { case i: Int actor2 ? i }
Block.on(future, timeout.duration)
Await.ready(future, timeout.duration)
test(future, "World (of class java.lang.String)")
actor1.stop()
actor2.stop()
@ -200,9 +200,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
c (actor ? 7).mapTo[String]
} yield b + "-" + c
Block.sync(future1, timeout.duration) must be("10-14")
Await.result(future1, timeout.duration) must be("10-14")
assert(checkType(future1, manifest[String]))
intercept[ClassCastException] { Block.sync(future2, timeout.duration) }
intercept[ClassCastException] { Await.result(future2, timeout.duration) }
actor.stop()
}
}
@ -230,8 +230,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Res(c: Int) actor ? Req(7)
} yield b + "-" + c
Block.sync(future1, timeout.duration) must be("10-14")
intercept[MatchError] { Block.sync(future2, timeout.duration) }
Await.result(future1, timeout.duration) must be("10-14")
intercept[MatchError] { Await.result(future2, timeout.duration) }
actor.stop()
}
}
@ -267,34 +267,34 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val future11 = actor ? "Failure" recover { case _ "Oops!" }
Block.sync(future1, timeout.duration) must be(5)
intercept[ArithmeticException] { Block.sync(future2, timeout.duration) }
intercept[ArithmeticException] { Block.sync(future3, timeout.duration) }
Block.sync(future4, timeout.duration) must be("5")
Block.sync(future5, timeout.duration) must be("0")
intercept[ArithmeticException] { Block.sync(future6, timeout.duration) }
Block.sync(future7, timeout.duration) must be("You got ERROR")
intercept[RuntimeException] { Block.sync(future8, timeout.duration) }
Block.sync(future9, timeout.duration) must be("FAIL!")
Block.sync(future10, timeout.duration) must be("World")
Block.sync(future11, timeout.duration) must be("Oops!")
Await.result(future1, timeout.duration) must be(5)
intercept[ArithmeticException] { Await.result(future2, timeout.duration) }
intercept[ArithmeticException] { Await.result(future3, timeout.duration) }
Await.result(future4, timeout.duration) must be("5")
Await.result(future5, timeout.duration) must be("0")
intercept[ArithmeticException] { Await.result(future6, timeout.duration) }
Await.result(future7, timeout.duration) must be("You got ERROR")
intercept[RuntimeException] { Await.result(future8, timeout.duration) }
Await.result(future9, timeout.duration) must be("FAIL!")
Await.result(future10, timeout.duration) must be("World")
Await.result(future11, timeout.duration) must be("Oops!")
actor.stop()
}
}
"firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ new KeptPromise[Int](Right(5))
Block.sync(Future.firstCompletedOf(futures), timeout.duration) must be(5)
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5)
Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5)
}
"find" in {
val futures = for (i 1 to 10) yield Future { i }
val result = Future.find[Int](futures)(_ == 3)
Block.sync(result, timeout.duration) must be(Some(3))
Await.result(result, timeout.duration) must be(Some(3))
val notFound = Future.find[Int](futures)(_ == 11)
Block.sync(notFound, timeout.duration) must be(None)
Await.result(notFound, timeout.duration) must be(None)
}
"fold" in {
@ -305,7 +305,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
}
"fold by composing" in {
@ -315,7 +315,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), 10000).mapTo[Int] }
Block.sync(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45)
Await.result(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45)
}
"fold with an exception" in {
@ -332,7 +332,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
intercept[Throwable] { Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
}
}
@ -344,7 +344,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
case (l, i) if i % 2 == 0 l += i.asInstanceOf[AnyRef]
case (l, _) l
}
val result = Block.sync(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
val result = Await.result(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
assert(result === 250500)
}
@ -353,7 +353,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"return zero value if folding empty list" in {
Block.sync(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0)
Await.result(Future.fold(List[Future[Int]]())(0)(_ + _), timeout.duration) must be(0)
}
"shouldReduceResults" in {
@ -364,7 +364,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
assert(Block.sync(Future.reduce(futures)(_ + _), timeout millis) === 45)
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
}
"shouldReduceResultsWithException" in {
@ -381,13 +381,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
intercept[Throwable] { Block.sync(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
}
}
"shouldReduceThrowIAEOnEmptyInput" in {
filterException[IllegalArgumentException] {
intercept[UnsupportedOperationException] { Block.sync(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) }
intercept[UnsupportedOperationException] { Await.result(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) }
}
}
@ -410,11 +410,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
})
val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int])
assert(Block.sync(Future.sequence(oddFutures), timeout.duration).sum === 10000)
assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000)
oddActor.stop()
val list = (1 to 100).toList
assert(Block.sync(Future.traverse(list)(x Future(x * 2 - 1)), timeout.duration).sum === 10000)
assert(Await.result(Future.traverse(list)(x Future(x * 2 - 1)), timeout.duration).sum === 10000)
}
"shouldHandleThrowables" in {
@ -422,7 +422,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[ThrowableTest] {
val f1 = Future[Any] { throw new ThrowableTest("test") }
intercept[ThrowableTest] { Block.sync(f1, timeout.duration) }
intercept[ThrowableTest] { Await.result(f1, timeout.duration) }
val latch = new StandardLatch
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
@ -430,10 +430,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f2 onSuccess { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
latch.open
assert(Block.sync(f2, timeout.duration) === "success")
assert(Await.result(f2, timeout.duration) === "success")
f2 foreach (_ throw new ThrowableTest("current thread foreach"))
f2 onSuccess { case _ throw new ThrowableTest("current thread receive") }
assert(Block.sync(f3, timeout.duration) === "SUCCESS")
assert(Await.result(f3, timeout.duration) === "SUCCESS")
}
}
@ -441,14 +441,14 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val latch = new StandardLatch
val f = Future { latch.await; 5 }
val f2 = Future { Block.sync(f, timeout.duration) + 5 }
val f2 = Future { Await.result(f, timeout.duration) + 5 }
intercept[TimeoutException](Block.on(f2, 100 millis))
intercept[TimeoutException](Await.ready(f2, 100 millis))
latch.open
assert(Block.sync(f2, timeout.duration) === 10)
assert(Await.result(f2, timeout.duration) === 10)
val f3 = Future { Thread.sleep(100); 5 }
filterException[TimeoutException] { intercept[TimeoutException] { Block.on(f3, 0 millis) } }
filterException[TimeoutException] { intercept[TimeoutException] { Await.ready(f3, 0 millis) } }
}
"futureComposingWithContinuations" in {
@ -461,7 +461,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + " " + y() + "!")
assert(Block.sync(r, timeout.duration) === "Hello World!")
assert(Await.result(r, timeout.duration) === "Hello World!")
actor.stop
}
@ -475,7 +475,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
intercept[java.lang.ArithmeticException](Block.sync(r, timeout.duration))
intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration))
}
}
@ -490,7 +490,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + y(), 100)
intercept[ClassCastException](Block.sync(r, timeout.duration))
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
@ -505,7 +505,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val r = flow(x() + y())
intercept[ClassCastException](Block.sync(r, timeout.duration))
intercept[ClassCastException](Await.result(r, timeout.duration))
}
}
@ -529,10 +529,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 5 }
assert(Block.sync(y, timeout.duration) === 5)
assert(Block.sync(z, timeout.duration) === 5)
assert(Await.result(y, timeout.duration) === 5)
assert(Await.result(z, timeout.duration) === 5)
assert(lz.isOpen)
assert(Block.sync(result, timeout.duration) === 10)
assert(Await.result(result, timeout.duration) === 10)
val a, b, c = Promise[Int]()
@ -544,9 +544,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
c completeWith Future(5)
assert(Block.sync(a, timeout.duration) === 5)
assert(Block.sync(b, timeout.duration) === 3)
assert(Block.sync(result2, timeout.duration) === 50)
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === 3)
assert(Await.result(result2, timeout.duration) === 50)
}
"futureDataFlowShouldEmulateBlocking1" in {
@ -561,17 +561,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { one << 1 }
Block.on(one, 1 minute)
Await.ready(one, 1 minute)
assert(one.isCompleted)
assert(List(two, simpleResult).forall(_.isCompleted == false))
flow { two << 9 }
Block.on(two, 1 minute)
Await.ready(two, 1 minute)
assert(List(one, two).forall(_.isCompleted == true))
assert(Block.sync(simpleResult, timeout.duration) === 10)
assert(Await.result(simpleResult, timeout.duration) === 10)
}
@ -595,17 +595,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { y1 << 1 } // When this is set, it should cascade down the line
assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(Block.sync(x1, 1 minute) === 1)
assert(Await.result(x1, 1 minute) === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(Block.sync(x2, 1 minute) === 9)
assert(Await.result(x2, 1 minute) === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
assert(Block.sync(result, 1 minute) === 10)
assert(Await.result(result, 1 minute) === 10)
}
"dataFlowAPIshouldbeSlick" in {
@ -625,7 +625,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(i2.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
s1.open
s2.open
assert(Block.sync(result, timeout.duration) === 10)
assert(Await.result(result, timeout.duration) === 10)
}
"futureCompletingWithContinuationsFailure" in {
@ -649,8 +649,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 5 }
assert(Block.sync(y, timeout.duration) === 5)
intercept[java.lang.ArithmeticException](Block.sync(result, timeout.duration))
assert(Await.result(y, timeout.duration) === 5)
intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration))
assert(z.value === None)
assert(!lz.isOpen)
}
@ -673,7 +673,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
latch.open
assert(Block.sync(result, timeout.duration) === Some("Hello"))
assert(Await.result(result, timeout.duration) === Some("Hello"))
}
"futureFlowShouldBeTypeSafe" in {
@ -696,8 +696,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(!checkType(rInt, manifest[Nothing]))
assert(!checkType(rInt, manifest[Any]))
Block.sync(rString, timeout.duration)
Block.sync(rInt, timeout.duration)
Await.result(rString, timeout.duration)
Await.result(rInt, timeout.duration)
}
"futureFlowSimpleAssign" in {
@ -711,7 +711,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { x << 40 }
flow { y << 2 }
assert(Block.sync(z, timeout.duration) === 42)
assert(Await.result(z, timeout.duration) === 42)
}
"futureFlowLoops" in {
@ -733,7 +733,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
var i = 0
promises foreach { p
assert(Block.sync(p, timeout.duration) === i)
assert(Await.result(p, timeout.duration) === i)
i += 1
}
@ -789,12 +789,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
latch(8).open
latch(9).await
Block.on(f4, timeout.duration) must be('completed)
Await.ready(f4, timeout.duration) must be('completed)
}
"should not deadlock with nested await (ticket 1313)" in {
val simple = Future() map (_ Block.sync((Future(()) map (_ ())), timeout.duration))
Block.on(simple, timeout.duration) must be('completed)
val simple = Future() map (_ Await.result((Future(()) map (_ ())), timeout.duration))
Await.ready(simple, timeout.duration) must be('completed)
val l1, l2 = new StandardLatch
val complex = Future() map { _
@ -805,7 +805,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
nested foreach (_ l2.open)
l2.await
}
Block.on(complex, timeout.duration) must be('completed)
Await.ready(complex, timeout.duration) must be('completed)
}
}
}
@ -818,39 +818,39 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
def futureWithResult(f: ((Future[Any], Any) Unit) Unit) {
"be completed" in { f((future, _) future must be('completed)) }
"contain a value" in { f((future, result) future.value must be(Some(Right(result)))) }
"return result with 'get'" in { f((future, result) Block.sync(future, timeout.duration) must be(result)) }
"return result with 'Block.sync'" in { f((future, result) Block.sync(future, timeout.duration) must be(result)) }
"not timeout" in { f((future, _) Block.on(future, 0 millis)) }
"return result with 'get'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) }
"return result with 'Await.sync'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) }
"not timeout" in { f((future, _) Await.ready(future, 0 millis)) }
"filter result" in {
f { (future, result)
Block.sync((future filter (_ true)), timeout.duration) must be(result)
(evaluating { Block.sync((future filter (_ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString)
Await.result((future filter (_ true)), timeout.duration) must be(result)
(evaluating { Await.result((future filter (_ false)), timeout.duration) } must produce[MatchError]).getMessage must startWith(result.toString)
}
}
"transform result with map" in { f((future, result) Block.sync((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) }
"transform result with map" in { f((future, result) Await.result((future map (_.toString.length)), timeout.duration) must be(result.toString.length)) }
"compose result with flatMap" in {
f { (future, result)
val r = for (r future; p Promise.successful("foo")) yield r.toString + p
Block.sync(r, timeout.duration) must be(result.toString + "foo")
Await.result(r, timeout.duration) must be(result.toString + "foo")
}
}
"perform action with foreach" in {
f { (future, result)
val p = Promise[Any]()
future foreach p.success
Block.sync(p, timeout.duration) must be(result)
Await.result(p, timeout.duration) must be(result)
}
}
"not recover from exception" in { f((future, result) Block.sync(future.recover({ case _ "pigdog" }), timeout.duration) must be(result)) }
"not recover from exception" in { f((future, result) Await.result(future.recover({ case _ "pigdog" }), timeout.duration) must be(result)) }
"perform action on result" in {
f { (future, result)
val p = Promise[Any]()
future.onSuccess { case x p.success(x) }
Block.sync(p, timeout.duration) must be(result)
Await.result(p, timeout.duration) must be(result)
}
}
"not perform action on exception" is pending
"cast using mapTo" in { f((future, result) Block.sync(future.mapTo[Boolean].recover({ case _: ClassCastException false }), timeout.duration) must be(false)) }
"cast using mapTo" in { f((future, result) Await.result(future.mapTo[Boolean].recover({ case _: ClassCastException false }), timeout.duration) must be(false)) }
}
def futureWithException[E <: Throwable: Manifest](f: ((Future[Any], String) Unit) Unit) {
@ -862,27 +862,27 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
future.value.get.left.get.getMessage must be(message)
})
}
"throw exception with 'get'" in { f((future, message) (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"throw exception with 'Block.sync'" in { f((future, message) (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"throw exception with 'get'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"throw exception with 'Await.sync'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with filter" in {
f { (future, message)
(evaluating { Block.sync(future filter (_ true), timeout.duration) } must produce[E]).getMessage must be(message)
(evaluating { Block.sync(future filter (_ false), timeout.duration) } must produce[E]).getMessage must be(message)
(evaluating { Await.result(future filter (_ true), timeout.duration) } must produce[E]).getMessage must be(message)
(evaluating { Await.result(future filter (_ false), timeout.duration) } must produce[E]).getMessage must be(message)
}
}
"retain exception with map" in { f((future, message) (evaluating { Block.sync(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with flatMap" in { f((future, message) (evaluating { Block.sync(future flatMap (_ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with map" in { f((future, message) (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with flatMap" in { f((future, message) (evaluating { Await.result(future flatMap (_ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) }
"not perform action with foreach" is pending
"recover from exception" in { f((future, message) Block.sync(future.recover({ case e if e.getMessage == message "pigdog" }), timeout.duration) must be("pigdog")) }
"recover from exception" in { f((future, message) Await.result(future.recover({ case e if e.getMessage == message "pigdog" }), timeout.duration) must be("pigdog")) }
"not perform action on result" is pending
"perform action on exception" in {
f { (future, message)
val p = Promise[Any]()
future.onFailure { case _ p.success(message) }
Block.sync(p, timeout.duration) must be(message)
Await.result(p, timeout.duration) must be(message)
}
}
"always cast successfully using mapTo" in { f((future, message) (evaluating { Block.sync(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) }
"always cast successfully using mapTo" in { f((future, message) (evaluating { Await.result(future.mapTo[java.lang.Thread], timeout.duration) } must produce[E]).getMessage must be(message)) }
}
sealed trait IntAction { def apply(that: Int): Int }

View file

@ -19,7 +19,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val f = spawn { q.dequeue }
Block.sync(f, 1 second) must be(null)
Await.result(f, 1 second) must be(null)
}
"create a bounded mailbox with 10 capacity and with push timeout" in {
@ -115,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val consumers = for (i (1 to 4).toList) yield createConsumer
val ps = producers.map(Block.sync(_, within))
val cs = consumers.map(Block.sync(_, within))
val ps = producers.map(Await.result(_, within))
val cs = consumers.map(Await.result(_, within))
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages

View file

@ -43,7 +43,7 @@ class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout {
actor.resume //Signal the actor to start treating it's message backlog
Block.sync(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
}
}

View file

@ -21,9 +21,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
b << q
c << q()
}
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend" in {
@ -35,9 +35,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
c << q
}
flow { q <<< List(1, 2, 3) }
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend again" in {
@ -54,10 +54,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
c << q1
d << q1
}
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Block.sync(d, timeout.duration) === 4)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"enque" in {
@ -71,10 +71,10 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
}
q ++= List(1, 2, 3, 4)
assert(Block.sync(a, timeout.duration) === 1)
assert(Block.sync(b, timeout.duration) === 2)
assert(Block.sync(c, timeout.duration) === 3)
assert(Block.sync(d, timeout.duration) === 4)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"map" in {
@ -90,9 +90,9 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
flow {
qs << ("Hello", "World!", "Test")
}
assert(Block.sync(a, timeout.duration) === 5)
assert(Block.sync(b, timeout.duration) === "World!")
assert(Block.sync(c, timeout.duration) === 4)
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === "World!")
assert(Await.result(c, timeout.duration) === 4)
}
"not fail under concurrent stress" in {
@ -128,7 +128,7 @@ class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
}
}
assert(Block.sync(future, timeout.duration) === (1L to 100000L).sum)
assert(Await.result(future, timeout.duration) === (1L to 100000L).sum)
}
}
}

View file

@ -5,7 +5,7 @@ import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.AkkaSpec
import akka.dispatch.{ Block, KeptPromise, Future }
import akka.dispatch.{ Await, Promise, Future }
object ActorPoolSpec {
@ -17,7 +17,7 @@ object ActorPoolSpec {
import TypedActor.dispatcher
def sq(x: Int, sleep: Long): Future[Int] = {
if (sleep > 0) Thread.sleep(sleep)
new KeptPromise(Right(x * x))
Promise.successful(x * x)
}
}
@ -47,7 +47,7 @@ class TypedActorPoolSpec extends AkkaSpec with DefaultTimeout {
val results = for (i 1 to 100) yield (i, pool.sq(i, 0))
for ((i, r) results)
Block.sync(r, timeout.duration) must equal(i * i)
Await.result(r, timeout.duration) must equal(i * i)
ta.stop(pool)
}
@ -97,7 +97,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
count.get must be(2)
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
pool.stop()
}
@ -126,7 +126,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
try {
(for (count 1 to 500) yield pool.?("Test", 20 seconds)) foreach {
Block.sync(_, 20 seconds) must be("Response")
Await.result(_, 20 seconds) must be("Response")
}
} finally {
pool.stop()
@ -163,7 +163,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
pool ! 1
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
var loops = 0
def loop(t: Int) = {
@ -183,7 +183,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch.await
count.get must be(loops)
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
// a whole bunch should max it out
@ -192,7 +192,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch.await
count.get must be(loops)
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4)
pool.stop()
}
@ -239,7 +239,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch.await
count.get must be(loops)
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
// send a bunch over the threshold and observe an increment
loops = 15
@ -248,7 +248,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
latch.await(10 seconds)
count.get must be(loops)
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3)
pool.stop()
}
@ -342,7 +342,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(5 millis).dilated.sleep
val z = Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size
val z = Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size
z must be >= (2)
@ -353,7 +353,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
(500 millis).dilated.sleep
}
Block.sync((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be <= (z)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be <= (z)
pool.stop()
}

View file

@ -8,7 +8,7 @@ import akka.testkit.AkkaSpec
import akka.actor.DeploymentConfig._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
import akka.dispatch.Block
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
@ -83,7 +83,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val id = Block.sync((actor ? "hit").mapTo[Int], timeout.duration)
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
}
@ -194,7 +194,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
for (i 0 until iterationCount) {
for (k 0 until connectionCount) {
val id = Block.sync((actor ? "hit").mapTo[Int], timeout.duration)
val id = Await.result((actor ? "hit").mapTo[Int], timeout.duration)
replies = replies + (id -> (replies(id) + 1))
}
}

View file

@ -8,7 +8,7 @@ import collection.mutable.LinkedList
import akka.routing.Routing.Broadcast
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._
import akka.dispatch.Block
import akka.dispatch.Await
object RoutingSpec {
@ -271,7 +271,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
shutdownLatch.await
Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
Await.result(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
}
"throw an exception, if all the connections have stopped" in {
@ -298,7 +298,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
Block.sync(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0)
Await.result(actor ? Broadcast("Hi!"), timeout.duration).asInstanceOf[Int] must be(0)
}
"return the first response from connections, when some of them failed to reply" in {
@ -306,7 +306,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout {
val actor = new RoutedActorRef(system, props, impl.guardian, "foo")
Block.sync(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
Await.result(actor ? Broadcast(0), timeout.duration).asInstanceOf[Int] must be(1)
}
"be started when constructed" in {

View file

@ -3,7 +3,7 @@ package akka.ticket
import akka.actor._
import akka.routing._
import akka.testkit.AkkaSpec
import akka.dispatch.Block
import akka.dispatch.Await
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -28,7 +28,7 @@ class Ticket703Spec extends AkkaSpec {
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
Block.sync(actorPool.?("Ping", 10000), 10 seconds) must be === "Response"
Await.result(actorPool.?("Ping", 10000), 10 seconds) must be === "Response"
}
}
}

View file

@ -4,7 +4,7 @@
package akka.util
import org.scalatest.matchers.MustMatchers
import akka.dispatch.{ Future, Block }
import akka.dispatch.{ Future, Await }
import akka.testkit.AkkaSpec
import scala.util.Random
import akka.testkit.DefaultTimeout
@ -125,7 +125,7 @@ class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout {
val tasks = List.fill(nrOfTasks)(executeRandomTask)
tasks.foreach(Block.sync(_, timeout.duration))
tasks.foreach(Await.result(_, timeout.duration))
}
}
}

View file

@ -383,7 +383,7 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) {
_path = rootPath / "null"
brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef - promises are always broken.")))(dispatcher)
brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher)
}
override def isTerminated(): Boolean = true
@ -425,7 +425,7 @@ class AskActorRef(
}
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))(dispatcher)
Promise.failed(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))(dispatcher)
override def isTerminated = result.isCompleted

View file

@ -339,7 +339,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Block.sync(systemGuardian ? CreateChild(props, name), timeout.duration) match {
Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -347,7 +347,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Block.sync(guardian ? CreateChild(props, name), timeout.duration) match {
Await.result(guardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -355,7 +355,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
Block.sync(guardian ? CreateRandomNameChild(props), timeout.duration) match {
Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}

View file

@ -410,12 +410,12 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case m if m.returnsFuture_? actor.?(m, timeout)
case m if m.returnsJOption_? || m.returnsOption_?
val f = actor.?(m, timeout)
(try { Block.on(f, timeout.duration).value } catch { case _: TimeoutException None }) match {
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException None }) match {
case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex
}
case m Block.sync(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef]
case m Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef]
}
}
}

View file

@ -22,27 +22,27 @@ import scala.collection.mutable.Stack
import akka.util.{ Switch, Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable }
import akka.dispatch.Block.CanBlock
import akka.dispatch.Await.CanAwait
object Block {
sealed trait CanBlock
object Await {
sealed trait CanAwait
trait Blockable[+T] {
trait Awaitable[+T] {
/**
* Should throw java.util.concurrent.TimeoutException if times out
*/
def block(atMost: Duration)(implicit permit: CanBlock): this.type
def ready(atMost: Duration)(implicit permit: CanAwait): this.type
/**
* Throws exceptions if cannot produce a T within the specified time
*/
def sync(atMost: Duration)(implicit permit: CanBlock): T
def result(atMost: Duration)(implicit permit: CanAwait): T
}
private implicit val permit = new CanBlock {}
private implicit val permit = new CanAwait {}
def on[T <: Blockable[_]](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost)
def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost)
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
}
object Futures {
@ -147,7 +147,7 @@ object Future {
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
* Returns a Future to the result of the first future in the list that is completed
@ -165,7 +165,7 @@ object Future {
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Iterable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
if (futures.isEmpty) Promise.successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
@ -196,9 +196,8 @@ object Future {
* </pre>
*/
def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
if (futures.isEmpty) Promise.successful(zero)
else {
val result = Promise[R]()
val results = new ConcurrentLinkedQueue[T]()
val done = new Switch(false)
@ -245,8 +244,7 @@ object Future {
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
if (futures.isEmpty) Promise[R].failure(new UnsupportedOperationException("empty reduce left"))
else {
val result = Promise[R]()
val seedFound = new AtomicBoolean(false)
@ -271,7 +269,7 @@ object Future {
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a)
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
}.map(_.result)
@ -364,7 +362,7 @@ object Future {
}
}
sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
implicit def dispatcher: MessageDispatcher
@ -713,12 +711,12 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
def block(atMost: Duration)(implicit permit: CanBlock): this.type =
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (value.isDefined || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
def sync(atMost: Duration)(implicit permit: CanBlock): T =
block(atMost).value.get match {
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case Left(e) throw e
case Right(r) r
}
@ -797,8 +795,8 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis
this
}
def block(atMost: Duration)(implicit permit: CanBlock): this.type = this
def sync(atMost: Duration)(implicit permit: CanBlock): T = value.get match {
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case Left(e) throw e
case Right(r) r
}

View file

@ -183,7 +183,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout:
if (eo eq null) dequeue()
else {
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) new KeptPromise(Right(eo.head))
if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head)
else dequeue()
} else dequeue(Promise[A])
}

View file

@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Block
import akka.dispatch.Await
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
@ -147,7 +147,7 @@ trait LoggingBus extends ActorEventBus {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds)
val response = try Block.sync(actor ? InitializeLogger(this), timeout.duration) catch {
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}

View file

@ -8,7 +8,7 @@ import akka.util.duration._
import akka.actor._
import akka.actor.Actor._
import akka.camel.TypedCamelTestSupport.{ SetExpectedMessageCount SetExpectedTestMessageCount, _ }
import akka.dispatch.Block
import akka.dispatch.Await
class TypedConsumerPublishRequestorTest extends JUnitSuite {
import TypedConsumerPublishRequestorTest._
@ -40,10 +40,10 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerMethodRegisteredEvent = {
Actor.registry.addListener(requestor)
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props())
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val event = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds)
val event = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds)
assert(event.endpointUri === "direct:foo")
assert(event.typedActor === obj)
assert(event.methodName === "foo")
@ -51,21 +51,21 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerMethodUnregisteredEvent = {
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
Actor.registry.addListener(requestor)
val obj = TypedActor.typedActorOf(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl], Props())
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val ignorableEvent = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds)
val ignorableEvent = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodRegistered], 3 seconds)
val latch2 = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
val latch2 = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 3 seconds)
TypedActor.stop(obj)
assert(latch2.await(5000, TimeUnit.MILLISECONDS))
val event = Block.sync((publisher ? GetRetainedMessage).mapTo[ConsumerMethodUnregistered], 3 seconds)
val event = Await.result((publisher ? GetRetainedMessage).mapTo[ConsumerMethodUnregistered], 3 seconds)
assert(event.endpointUri === "direct:foo")
assert(event.typedActor === obj)
@ -75,23 +75,23 @@ class TypedConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveThreeConsumerMethodRegisteredEvents = {
Actor.registry.addListener(requestor)
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds)
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props())
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodRegistered])
val events = Block.sync((publisher ? request).mapTo[List[ConsumerMethodRegistered]], 3 seconds)
val events = Await.result((publisher ? request).mapTo[List[ConsumerMethodRegistered]], 3 seconds)
assert(events.map(_.method.getName).sortWith(_ < _) === List("m2", "m3", "m4"))
}
@Test
def shouldReceiveThreeConsumerMethodUnregisteredEvents = {
val obj = TypedActor.typedActorOf(classOf[SampleTypedConsumer], classOf[SampleTypedConsumerImpl], Props())
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(3)).mapTo[CountDownLatch], 3 seconds)
Actor.registry.addListener(requestor)
TypedActor.stop(obj)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
val request = GetRetainedMessages(_.isInstanceOf[ConsumerMethodUnregistered])
val events = Block.sync((publisher ? request).mapTo[List[ConsumerMethodUnregistered]], 3 seconds)
val events = Await.result((publisher ? request).mapTo[List[ConsumerMethodUnregistered]], 3 seconds)
assert(events.map(_.method.getName).sortWith(_ < _) === List("m2", "m3", "m4"))
}
}

View file

@ -14,7 +14,7 @@ import akka.japi.{ SideEffect, Option ⇒ JOption }
import akka.util.Bootable
import TypedCamelAccess._
import akka.dispatch.Block
import akka.dispatch.Await
/**
* Publishes consumer actors at their Camel endpoints. Consumer actors are published asynchronously when
@ -165,7 +165,7 @@ trait CamelService extends Bootable {
* activations that occurred in the past are not considered.
*/
private def expectEndpointActivationCount(count: Int): CountDownLatch =
Block.sync((activationTracker ? SetExpectedActivationCount(count)).mapTo[CountDownLatch], 3 seconds)
Await.result((activationTracker ? SetExpectedActivationCount(count)).mapTo[CountDownLatch], 3 seconds)
/**
* Sets an expectation on the number of upcoming endpoint de-activations and returns
@ -173,7 +173,7 @@ trait CamelService extends Bootable {
* de-activations that occurred in the past are not considered.
*/
private def expectEndpointDeactivationCount(count: Int): CountDownLatch =
Block.sync((activationTracker ? SetExpectedDeactivationCount(count)).mapTo[CountDownLatch], 3 seconds)
Await.result((activationTracker ? SetExpectedDeactivationCount(count)).mapTo[CountDownLatch], 3 seconds)
private[camel] def registerPublishRequestor: Unit =
Actor.registry.addListener(publishRequestor)

View file

@ -172,7 +172,7 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
private def sendSync(exchange: Exchange) = {
val actor = target(exchange)
val result: Any = try { Some(Block.sync((actor ? requestFor(exchange), 5 seconds)) } catch { case e Some(Failure(e)) }
val result: Any = try { Some(Await.result((actor ? requestFor(exchange), 5 seconds)) } catch { case e Some(Failure(e)) }
result match {
case Some(Ack) { /* no response message to set */ }
@ -294,7 +294,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
}
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))
Promise.failed(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))
def restart(reason: Throwable): Unit = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName)

View file

@ -8,7 +8,7 @@ import org.scalatest.junit.JUnitSuite
import akka.actor._
import akka.actor.Actor._
import akka.camel.CamelTestSupport.{ SetExpectedMessageCount SetExpectedTestMessageCount, _ }
import akka.dispatch.Block
import akka.dispatch.Await
class ConsumerPublishRequestorTest extends JUnitSuite {
import ConsumerPublishRequestorTest._
@ -36,19 +36,19 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
@Test
def shouldReceiveOneConsumerRegisteredEvent = {
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds)
requestor ! ActorRegistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(Block.sync(publisher ? GetRetainedMessage, 5 seconds) ===
assert(Await.result(publisher ? GetRetainedMessage, 5 seconds) ===
ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
}
@Test
def shouldReceiveOneConsumerUnregisteredEvent = {
val latch = Block.sync((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds)
val latch = Await.result((publisher ? SetExpectedTestMessageCount(1)).mapTo[CountDownLatch], 5 seconds)
requestor ! ActorUnregistered(consumer.address, consumer)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert(Block.sync(publisher ? GetRetainedMessage, 5 seconds) ===
assert(Await.result(publisher ? GetRetainedMessage, 5 seconds) ===
ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
}
}

View file

@ -51,7 +51,7 @@ import RemoteSystemDaemonMessageType._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
import akka.dispatch.{Block, Dispatchers, Future, PinnedDispatcher}
import akka.dispatch.{Await, Dispatchers, Future, PinnedDispatcher}
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
@ -1156,7 +1156,7 @@ class DefaultClusterNode private[akka] (
connection ! command
} else {
try {
Block.sync(connection ? (command, remoteDaemonAckTimeout), 10 seconds).asInstanceOf[Status] match {
Await.result(connection ? (command, remoteDaemonAckTimeout), 10 seconds).asInstanceOf[Status] match {
case Success(status)
EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(status))
case Failure(cause)

View file

@ -85,7 +85,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
})
Block.sync(monitorReponse, 5 seconds) must be("Too much memory is used!")
Await.result(monitorReponse, 5 seconds) must be("Too much memory is used!")
}

View file

@ -11,7 +11,7 @@ import akka.testkit.{ EventFilter, TestEvent }
import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import akka.cluster.LocalCluster
import akka.dispatch.Block
import akka.dispatch.Await
object DirectRoutingFailoverMultiJvmSpec {
@ -49,7 +49,7 @@ class DirectRoutingFailoverMultiJvmNode1 extends MasterClusterTestNode {
}
LocalCluster.barrier("verify-actor", NrOfNodes) {
Block.sync(actor ? "identify", timeout.duration) must equal("node2")
Await.result(actor ? "identify", timeout.duration) must equal("node2")
}
val timer = Timer(30.seconds, true)

View file

@ -11,7 +11,7 @@ import java.util.{ Collections, Set ⇒ JSet }
import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import akka.cluster.LocalCluster._
import akka.dispatch.Block
import akka.dispatch.Await
object RandomFailoverMultiJvmSpec {
@ -92,7 +92,7 @@ class RandomFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until 100) { // we should get hits from both nodes in 100 attempts, if not then not very random
val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String]
val value = Await.result(actor ? "identify", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -9,7 +9,7 @@ import akka.actor._
import akka.config.Config
import Cluster._
import akka.cluster.LocalCluster._
import akka.dispatch.Block
import akka.dispatch.Await
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
@ -79,7 +79,7 @@ class Random3ReplicasMultiJvmNode2 extends ClusterTestNode {
}
for (i 0 until 1000) {
count(Block.sync((hello ? "Hello").mapTo[String], 10 seconds))
count(Await.result((hello ? "Hello").mapTo[String], 10 seconds))
}
val repliesNode1 = replies("World from node [node1]")

View file

@ -12,7 +12,7 @@ import java.net.ConnectException
import java.nio.channels.NotYetConnectedException
import java.lang.Thread
import akka.cluster.LocalCluster._
import akka.dispatch.Block
import akka.dispatch.Await
object RoundRobinFailoverMultiJvmSpec {
@ -95,7 +95,7 @@ class RoundRobinFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until 100) {
val value = Block.sync(actor ? "identify", timeout.duration).asInstanceOf[String]
val value = Await.result(actor ? "identify", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -20,7 +20,7 @@ import akka.cluster.LocalCluster._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Block
import akka.dispatch.Await
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
@ -109,7 +109,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends ClusterTestNode {
implicit val timeout = Timeout(Duration(20, "seconds"))
for(i <- 1 to 8)
count(Block.sync((hello ? "Hello").mapTo[String], timeout.duration))
count(Await.result((hello ? "Hello").mapTo[String], timeout.duration))
replies.get("World from node [node1]").get must equal(4)
replies.get("World from node [node2]").get must equal(4)

View file

@ -11,7 +11,7 @@ import java.nio.channels.NotYetConnectedException
import java.lang.Thread
import akka.routing.Routing.Broadcast
import akka.cluster.LocalCluster._
import akka.dispatch.Block
import akka.dispatch.Await
object ScatterGatherFailoverMultiJvmSpec {
@ -85,7 +85,7 @@ class ScatterGatherFailoverMultiJvmNode1 extends MasterClusterTestNode {
def identifyConnections(actor: ActorRef): JSet[String] = {
val set = new java.util.HashSet[String]
for (i 0 until NrOfNodes * 2) {
val value = Block.sync(actor ? "foo", timeout.duration).asInstanceOf[String]
val value = Await.result(actor ? "foo", timeout.duration).asInstanceOf[String]
set.add(value)
}
set

View file

@ -49,7 +49,7 @@ object ComputeGridSample {
val fun = () "AKKA ROCKS"
val futures = local send (fun, 2) // send and invoke function on to two cluster nodes and get result
val result = Block.sync(Futures.fold("")(futures)(_ + " - " + _), timeout)
val result = Await.sync(Futures.fold("")(futures)(_ + " - " + _), timeout)
println("===================>>> Cluster says [" + result + "]")
local.stop
@ -83,7 +83,7 @@ object ComputeGridSample {
val future2 = local send (fun, 2, 1) head // send and invoke function on one cluster node and get result
// grab the result from the first one that returns
val result = Block.sync(Futures.firstCompletedOf(List(future1, future2)), timeout)
val result = Await.sync(Futures.firstCompletedOf(List(future1, future2)), timeout)
println("===================>>> Cluster says [" + result + "]")
local.stop

View file

@ -10,7 +10,7 @@ import org.bson.collection._
import akka.actor.ActorCell
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.{ Block, Promise, Envelope, DefaultPromise }
import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise }
import java.util.concurrent.TimeoutException
class MongoBasedMailboxException(message: String) extends AkkaException(message)
@ -50,7 +50,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
case Left(t) result.failure(t)
}
})
Block.on(result, settings.WriteTimeout)
Await.ready(result, settings.WriteTimeout)
}
def dequeue(): Envelope = withErrorHandling {
@ -75,13 +75,13 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
()
}
}
try { Block.sync(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException null }
try { Await.result(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException null }
}
def numberOfMessages: Int = {
val count = Promise[Int]()(dispatcher)
mongo.count()(count.success)
try { Block.sync(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception -1 }
try { Await.result(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception -1 }
}
//TODO review find other solution, this will be very expensive

View file

@ -24,7 +24,7 @@ import scala.annotation.tailrec
import com.google.protobuf.ByteString
import java.util.concurrent.TimeoutException
import akka.dispatch.Block
import akka.dispatch.Await
/**
* Interface for node membership change listener.
@ -248,7 +248,7 @@ class Gossiper(remote: Remote) {
try {
val t = remoteExtension.RemoteSystemDaemonAckTimeout
Block.sync(connection ? (toRemoteMessage(newGossip), t), t) match {
Await.result(connection ? (toRemoteMessage(newGossip), t), t) match {
case Success(receiver) log.debug("Gossip sent to [{}] was successfully received", receiver)
case Failure(cause) log.error(cause, cause.toString)
}

View file

@ -168,7 +168,7 @@ class RemoteActorRefProvider(
actors.replace(path.toString, creationPromise, actor)
actor
case actor: InternalActorRef actor
case future: Future[_] Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef]
case future: Future[_] Await.result(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef]
}
}
@ -224,7 +224,7 @@ class RemoteActorRefProvider(
if (withACK) {
try {
val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout)
(try Block.on(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException None }) match {
(try Await.ready(f, remoteExtension.RemoteSystemDaemonAckTimeout).value catch { case _: TimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)

View file

@ -186,7 +186,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Gets this agent's value after all currently queued updates have completed.
*/
def await(implicit timeout: Timeout): T = Block.sync(future, timeout.duration)
def await(implicit timeout: Timeout): T = Await.result(future, timeout.duration)
/**
* Map this agent to a new agent, applying the function to the internal state.

View file

@ -3,9 +3,8 @@ package akka.transactor.example;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Block;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.japi.Procedure;
import akka.testkit.AkkaSpec;
import akka.transactor.Coordinated;
@ -30,9 +29,9 @@ public class UntypedCoordinatedExample {
Future<Object> future1 = counter1.ask("GetCount", timeout);
Future<Object> future2 = counter2.ask("GetCount", timeout);
int count1 = (Integer)Block.sync(future1, d);
int count1 = (Integer) Await.result(future1, d);
System.out.println("counter 1: " + count1);
int count2 = (Integer)Block.sync(future2, d);
int count2 = (Integer) Await.result(future2, d);
System.out.println("counter 1: " + count2);
app.stop();

View file

@ -3,7 +3,7 @@ package akka.transactor.example;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Block;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.testkit.AkkaSpec;
import akka.util.Duration;
@ -28,9 +28,9 @@ public class UntypedTransactorExample {
Future<Object> future1 = counter1.ask("GetCount", timeout);
Future<Object> future2 = counter2.ask("GetCount", timeout);
int count1 = (Integer)Block.sync(future1, d);
int count1 = (Integer) Await.result(future1, d);
System.out.println("counter 1: " + count1);
int count2 = (Integer)Block.sync(future2, d);
int count2 = (Integer) Await.result(future2, d);
System.out.println("counter 1: " + count2);
app.stop();

View file

@ -2,7 +2,7 @@ package akka.transactor.test;
import static org.junit.Assert.*;
import akka.dispatch.Block;
import akka.dispatch.Await;
import akka.util.Duration;
import org.junit.After;
import org.junit.AfterClass;
@ -82,7 +82,7 @@ public class UntypedCoordinatedIncrementTest {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
assertEquals(1, ((Integer) Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
assertEquals(1, ((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
}
}
@ -103,7 +103,7 @@ public class UntypedCoordinatedIncrementTest {
}
for (ActorRef counter : counters) {
Future<Object>future = counter.ask("GetCount", askTimeout);
assertEquals(0,((Integer)Block.sync(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
assertEquals(0,((Integer) Await.result(future, Duration.create(timeout, TimeUnit.SECONDS))).intValue());
}
}

View file

@ -2,7 +2,7 @@ package akka.transactor.test;
import static org.junit.Assert.*;
import akka.dispatch.Block;
import akka.dispatch.Await;
import akka.util.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -11,7 +11,6 @@ import org.junit.Before;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
@ -27,7 +26,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import akka.testkit.AkkaSpec;
@ -80,7 +78,7 @@ public class UntypedTransactorTest {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
assertEquals(1, count);
}
}
@ -102,7 +100,7 @@ public class UntypedTransactorTest {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", askTimeout);
int count = (Integer)Block.sync(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
int count = (Integer) Await.result(future, Duration.create(askTimeout, TimeUnit.MILLISECONDS));
assertEquals(0, count);
}
}

View file

@ -11,7 +11,7 @@ import akka.util.duration._
import java.util.concurrent.CountDownLatch
import akka.testkit.AkkaSpec
import akka.testkit._
import akka.dispatch.Block
import akka.dispatch.Await
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
@ -63,9 +63,9 @@ class AgentSpec extends AkkaSpec {
val r2 = agent.alterOff((s: String) { Thread.sleep(2000); s + "c" })(5000)
val r3 = agent.alter(_ + "d")(5000)
Block.sync(r1, 5 seconds) must be === "ab"
Block.sync(r2, 5 seconds) must be === "abc"
Block.sync(r3, 5 seconds) must be === "abcd"
Await.result(r1, 5 seconds) must be === "ab"
Await.result(r2, 5 seconds) must be === "abc"
Await.result(r3, 5 seconds) must be === "abcd"
agent() must be("abcd")
@ -141,7 +141,7 @@ class AgentSpec extends AkkaSpec {
agent send (_ + "b")
agent send (_ + "c")
Block.sync(agent.future, timeout.duration) must be("abc")
Await.result(agent.future, timeout.duration) must be("abc")
agent.close()
}

View file

@ -7,7 +7,7 @@ import akka.actor._
import akka.stm.{ Ref, TransactionFactory }
import akka.util.duration._
import akka.testkit._
import akka.dispatch.Block
import akka.dispatch.Await
object CoordinatedIncrement {
case class Increment(friends: Seq[ActorRef])
@ -73,7 +73,7 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
counters(0) ! coordinated(Increment(counters.tail))
coordinated.await
for (counter counters) {
Block.sync((counter ? GetCount).mapTo[Int], timeout.duration) must be === 1
Await.result((counter ? GetCount).mapTo[Int], timeout.duration) must be === 1
}
counters foreach (_.stop())
failer.stop()
@ -90,7 +90,7 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll {
counters(0) ! Coordinated(Increment(counters.tail :+ failer))
coordinated.await
for (counter counters) {
Block.sync(counter ? GetCount, timeout.duration) must be === 0
Await.result(counter ? GetCount, timeout.duration) must be === 0
}
counters foreach (_.stop())
failer.stop()

View file

@ -11,7 +11,7 @@ import akka.testkit._
import scala.util.Random.{ nextInt random }
import java.util.concurrent.CountDownLatch
import akka.testkit.TestEvent.Mute
import akka.dispatch.Block
import akka.dispatch.Await
object FickleFriends {
case class FriendlyIncrement(friends: Seq[ActorRef], latch: CountDownLatch)
@ -120,9 +120,9 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll {
val latch = new CountDownLatch(1)
coordinator ! FriendlyIncrement(counters, latch)
latch.await // this could take a while
Block.sync(coordinator ? GetCount, timeout.duration) must be === 1
Await.result(coordinator ? GetCount, timeout.duration) must be === 1
for (counter counters) {
Block.sync(counter ? GetCount, timeout.duration) must be === 1
Await.result(counter ? GetCount, timeout.duration) must be === 1
}
counters foreach (_.stop())
coordinator.stop()

View file

@ -8,7 +8,7 @@ import akka.actor._
import akka.stm._
import akka.util.duration._
import akka.testkit._
import akka.dispatch.Block
import akka.dispatch.Await
object TransactorIncrement {
case class Increment(friends: Seq[ActorRef], latch: TestLatch)
@ -96,7 +96,7 @@ class TransactorSpec extends AkkaSpec {
counters(0) ! Increment(counters.tail, incrementLatch)
incrementLatch.await
for (counter counters) {
Block.sync(counter ? GetCount, timeout.duration) must be === 1
Await.result(counter ? GetCount, timeout.duration) must be === 1
}
counters foreach (_.stop())
failer.stop()
@ -113,7 +113,7 @@ class TransactorSpec extends AkkaSpec {
counters(0) ! Increment(counters.tail :+ failer, failLatch)
failLatch.await
for (counter counters) {
Block.sync(counter ? GetCount, timeout.duration) must be === 0
Await.result(counter ? GetCount, timeout.duration) must be === 0
}
counters foreach (_.stop())
failer.stop()

View file

@ -141,7 +141,7 @@ class TestKit(_system: ActorSystem) {
def msgAvailable = !queue.isEmpty
/**
* Block until the given condition evaluates to `true` or the timeout
* Await until the given condition evaluates to `true` or the timeout
* expires, whichever comes first.
*
* If no timeout is given, take it from the innermost enclosing `within`
@ -536,7 +536,7 @@ object TestKit {
private[testkit] val testActorId = new AtomicInteger(0)
/**
* Block until the given condition evaluates to `true` or the timeout
* Await until the given condition evaluates to `true` or the timeout
* expires, whichever comes first.
*
* If no timeout is given, take it from the innermost enclosing `within`

View file

@ -15,7 +15,7 @@ import akka.actor.PoisonPill
import akka.actor.CreateChild
import akka.actor.DeadLetter
import java.util.concurrent.TimeoutException
import akka.dispatch.{ Block, MessageDispatcher }
import akka.dispatch.{ Await, MessageDispatcher }
object TimingTest extends Tag("timing")
@ -64,7 +64,7 @@ abstract class AkkaSpec(_system: ActorSystem)
final override def afterAll {
system.stop()
try Block.on(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch {
try Await.ready(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()
@ -140,7 +140,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
system.registerOnTermination(latch.countDown())
system.stop()
latch.await(2 seconds)
Block.sync(davyJones ? "Die!", timeout.duration) must be === "finally gone"
Await.result(davyJones ? "Die!", timeout.duration) must be === "finally gone"
// this will typically also contain log messages which were sent after the logger shutdown
locker must contain(DeadLetter(42, davyJones, probe.ref))

View file

@ -7,7 +7,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.Logging.Warning
import akka.dispatch.{ Future, Promise, Block }
import akka.dispatch.{ Future, Promise, Await }
import akka.util.duration._
import akka.actor.ActorSystem
@ -110,7 +110,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime
def receive = { case _ sender ! nested }
}))
a must not be (null)
val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration)
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
nested must not be (null)
a must not be theSameInstanceAs(nested)
}
@ -121,7 +121,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime
def receive = { case _ sender ! nested }
}))
a must not be (null)
val nested = Block.sync((a ? "any").mapTo[ActorRef], timeout.duration)
val nested = Await.result((a ? "any").mapTo[ActorRef], timeout.duration)
nested must not be (null)
a must not be theSameInstanceAs(nested)
}
@ -195,7 +195,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTime
val f = a ? "work"
// CallingThreadDispatcher means that there is no delay
f must be('completed)
Block.sync(f, timeout.duration) must equal("workDone")
Await.result(f, timeout.duration) must equal("workDone")
}
}

View file

@ -5,7 +5,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.util.duration._
import akka.dispatch.{ Block, Future }
import akka.dispatch.{ Await, Future }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestProbeSpec extends AkkaSpec with DefaultTimeout {
@ -18,7 +18,7 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout {
tk.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
tk.lastMessage.sender ! "world"
future must be('completed)
Block.sync(future, timeout.duration) must equal("world")
Await.result(future, timeout.duration) must equal("world")
}
"reply to messages" in {