Removing resultOrException
This commit is contained in:
parent
de758c0cc1
commit
1efed78de8
4 changed files with 32 additions and 28 deletions
|
|
@ -3,6 +3,7 @@ package akka.dispatch;
|
||||||
import akka.actor.Timeout;
|
import akka.actor.Timeout;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
|
|
||||||
|
import akka.util.Duration;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -269,7 +270,15 @@ public class JavaFutureTests {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
final Integer got = f.get().get();
|
assertEquals(expect, Block.sync(f, Duration.create(5, TimeUnit.SECONDS)));
|
||||||
assertEquals(expect, got);
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void BlockMustBeCallable() {
|
||||||
|
Promise<String> p = ff.<String>promise();
|
||||||
|
Duration d = Duration.create(1, TimeUnit.SECONDS);
|
||||||
|
p.completeWithResult("foo");
|
||||||
|
Block.on(p, d);
|
||||||
|
assertEquals(Block.sync(p, d), "foo");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ object FutureSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class JavaFutureSpec extends JavaFutureTests with JUnitSuite
|
class JavaFutureSpec extends JavaFutureTests with JUnitSuite
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -439,19 +440,15 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
"shouldBlockUntilResult" in {
|
"shouldBlockUntilResult" in {
|
||||||
val latch = new StandardLatch
|
val latch = new StandardLatch
|
||||||
|
|
||||||
val f = Future({ latch.await; 5 })
|
val f = Future { latch.await; 5 }
|
||||||
val f2 = Future({ f.get + 5 })
|
val f2 = Future { Block.sync(f, timeout.duration) + 5 }
|
||||||
|
|
||||||
assert(f2.resultOrException === None)
|
intercept[TimeoutException](Block.on(f2, 100 millis))
|
||||||
latch.open
|
latch.open
|
||||||
assert(f2.get === 10)
|
assert(Block.sync(f2, timeout.duration) === 10)
|
||||||
|
|
||||||
val f3 = Future({ Thread.sleep(100); 5 })
|
val f3 = Future { Thread.sleep(100); 5 }
|
||||||
filterException[TimeoutException] {
|
filterException[TimeoutException] { intercept[TimeoutException] { Block.on(f3, 0 millis) } }
|
||||||
intercept[TimeoutException] {
|
|
||||||
Block.on(f3, 0 millis)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"futureComposingWithContinuations" in {
|
"futureComposingWithContinuations" in {
|
||||||
|
|
@ -826,7 +823,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
"contain a result" in { f((future, result) ⇒ future.result must be(Some(result))) }
|
"contain a result" in { f((future, result) ⇒ future.result must be(Some(result))) }
|
||||||
"not contain an exception" in { f((future, _) ⇒ future.exception must be(None)) }
|
"not contain an exception" in { f((future, _) ⇒ future.exception must be(None)) }
|
||||||
"return result with 'get'" in { f((future, result) ⇒ future.get must be(result)) }
|
"return result with 'get'" in { f((future, result) ⇒ future.get must be(result)) }
|
||||||
"return result with 'resultOrException'" in { f((future, result) ⇒ future.resultOrException must be(Some(result))) }
|
"return result with 'Block.sync'" in { f((future, result) ⇒ Block.sync(future, timeout.duration) must be(result)) }
|
||||||
"not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) }
|
"not timeout" in { f((future, _) ⇒ Block.on(future, 0 millis)) }
|
||||||
"filter result" in {
|
"filter result" in {
|
||||||
f { (future, result) ⇒
|
f { (future, result) ⇒
|
||||||
|
|
@ -850,7 +847,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
||||||
"not contain a result" in { f((future, _) ⇒ future.result must be(None)) }
|
"not contain a result" in { f((future, _) ⇒ future.result must be(None)) }
|
||||||
"contain an exception" in { f((future, message) ⇒ future.exception.get.getMessage must be(message)) }
|
"contain an exception" in { f((future, message) ⇒ future.exception.get.getMessage must be(message)) }
|
||||||
"throw exception with 'get'" in { f((future, message) ⇒ (evaluating { future.get } must produce[E]).getMessage must be(message)) }
|
"throw exception with 'get'" in { f((future, message) ⇒ (evaluating { future.get } must produce[E]).getMessage must be(message)) }
|
||||||
"throw exception with 'resultOrException'" in { f((future, message) ⇒ (evaluating { future.resultOrException } must produce[E]).getMessage must be(message)) }
|
"throw exception with 'Block.sync'" in { f((future, message) ⇒ (evaluating { Block.sync(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
|
||||||
"retain exception with filter" in {
|
"retain exception with filter" in {
|
||||||
f { (future, message) ⇒
|
f { (future, message) ⇒
|
||||||
(evaluating { (future filter (_ ⇒ true)).get } must produce[E]).getMessage must be(message)
|
(evaluating { (future filter (_ ⇒ true)).get } must produce[E]).getMessage must be(message)
|
||||||
|
|
|
||||||
|
|
@ -617,15 +617,6 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
|
||||||
}
|
}
|
||||||
future
|
future
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current result, throws the exception if one has been raised, else returns None
|
|
||||||
*/
|
|
||||||
final def resultOrException: Option[T] = value match {
|
|
||||||
case Some(Left(e)) ⇒ throw e
|
|
||||||
case Some(Right(r)) ⇒ Some(r)
|
|
||||||
case _ ⇒ None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object Promise {
|
object Promise {
|
||||||
|
|
@ -749,7 +740,11 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
|
||||||
if (value.isDefined || tryAwait(atMost)) this
|
if (value.isDefined || tryAwait(atMost)) this
|
||||||
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
|
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
|
||||||
|
|
||||||
def sync(atMost: Duration)(implicit permit: CanBlock): T = block(atMost).resultOrException.get
|
def sync(atMost: Duration)(implicit permit: CanBlock): T =
|
||||||
|
block(atMost).value.get match {
|
||||||
|
case Left(e) ⇒ throw e
|
||||||
|
case Right(r) ⇒ r
|
||||||
|
}
|
||||||
|
|
||||||
def value: Option[Either[Throwable, T]] = getState.value
|
def value: Option[Either[Throwable, T]] = getState.value
|
||||||
|
|
||||||
|
|
@ -824,5 +819,8 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis
|
||||||
}
|
}
|
||||||
|
|
||||||
def block(atMost: Duration)(implicit permit: CanBlock): this.type = this
|
def block(atMost: Duration)(implicit permit: CanBlock): this.type = this
|
||||||
def sync(atMost: Duration)(implicit permit: CanBlock): T = resultOrException.get
|
def sync(atMost: Duration)(implicit permit: CanBlock): T = value.get match {
|
||||||
|
case Left(e) ⇒ throw e
|
||||||
|
case Right(r) ⇒ r
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ object ComputeGridSample {
|
||||||
val fun = () ⇒ "AKKA ROCKS"
|
val fun = () ⇒ "AKKA ROCKS"
|
||||||
val futures = local send (fun, 2) // send and invoke function on to two cluster nodes and get result
|
val futures = local send (fun, 2) // send and invoke function on to two cluster nodes and get result
|
||||||
|
|
||||||
val result = Futures.fold("")(futures)(_ + " - " + _).await.resultOrException
|
val result = Block.sync(Futures.fold("")(futures)(_ + " - " + _), timeout)
|
||||||
println("===================>>> Cluster says [" + result + "]")
|
println("===================>>> Cluster says [" + result + "]")
|
||||||
|
|
||||||
local.stop
|
local.stop
|
||||||
|
|
@ -83,8 +83,8 @@ object ComputeGridSample {
|
||||||
val future2 = local send (fun, 2, 1) head // send and invoke function on one cluster node and get result
|
val future2 = local send (fun, 2, 1) head // send and invoke function on one cluster node and get result
|
||||||
|
|
||||||
// grab the result from the first one that returns
|
// grab the result from the first one that returns
|
||||||
val result = Futures.firstCompletedOf(List(future1, future2)).await.resultOrException
|
val result = Block.sync(Futures.firstCompletedOf(List(future1, future2)), timeout)
|
||||||
println("===================>>> Cluster says [" + result.get + "]")
|
println("===================>>> Cluster says [" + result + "]")
|
||||||
|
|
||||||
local.stop
|
local.stop
|
||||||
remote1.stop
|
remote1.stop
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue