Adding Blockable.sync to reduce usage of resultOrException.get

This commit is contained in:
Viktor Klang 2011-12-11 01:29:46 +01:00
parent 3b1330c6d7
commit de758c0cc1
9 changed files with 54 additions and 45 deletions

View file

@ -128,7 +128,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def wrap[T](f: Promise[Actor] T): T = {
val result = Promise[Actor]()
val r = f(result)
Block.on(result, 1 minute).resultOrException
Block.sync(result, 1 minute)
r
}

View file

@ -304,7 +304,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
Block.on(Future.fold(futures)(0)(_ + _), timeout millis).resultOrException.get must be(45)
Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
}
"fold by composing" in {
@ -331,7 +331,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
Block.on(Future.fold(futures)(0)(_ + _), timeout millis).exception.get.getMessage must be("shouldFoldResultsWithException: expected")
intercept[Throwable] { Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
}
}
@ -343,7 +343,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
case (l, i) if i % 2 == 0 l += i.asInstanceOf[AnyRef]
case (l, _) l
}
val result = Block.on(f.mapTo[ArrayBuffer[Int]], 10000 millis).resultOrException.get.sum
val result = Block.sync(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum
assert(result === 250500)
}
@ -363,7 +363,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200), timeout).mapTo[Int] }
assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).resultOrException.get === 45)
assert(Block.sync(Future.reduce(futures)(_ + _), timeout millis) === 45)
}
"shouldReduceResultsWithException" in {
@ -380,13 +380,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100), timeout).mapTo[Int] }
assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).exception.get.getMessage === "shouldFoldResultsWithException: expected")
intercept[Throwable] { Block.sync(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
}
}
"shouldReduceThrowIAEOnEmptyInput" in {
filterException[IllegalArgumentException] {
intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get }
intercept[UnsupportedOperationException] { Block.sync(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) }
}
}
@ -421,7 +421,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
filterException[ThrowableTest] {
val f1 = Future[Any] { throw new ThrowableTest("test") }
intercept[ThrowableTest] { Block.on(f1, timeout.duration).resultOrException.get }
intercept[ThrowableTest] { Block.sync(f1, timeout.duration) }
val latch = new StandardLatch
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
@ -429,10 +429,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
f2 onResult { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
latch.open
assert(Block.on(f2, timeout.duration).resultOrException.get === "success")
assert(Block.sync(f2, timeout.duration) === "success")
f2 foreach (_ throw new ThrowableTest("current thread foreach"))
f2 onResult { case _ throw new ThrowableTest("current thread receive") }
assert(Block.on(f3, timeout.duration).resultOrException.get === "SUCCESS")
assert(Block.sync(f3, timeout.duration) === "SUCCESS")
}
}
@ -598,17 +598,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
flow { y1 << 1 } // When this is set, it should cascade down the line
assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(Block.on(x1, 1 minute).resultOrException.get === 1)
assert(Block.sync(x1, 1 minute) === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS))
assert(Block.on(x2, 1 minute).resultOrException.get === 9)
assert(Block.sync(x2, 1 minute) === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted == true))
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
assert(Block.on(result, 1 minute).resultOrException.get === 10)
assert(Block.sync(result, 1 minute) === 10)
}
"dataFlowAPIshouldbeSlick" in {
@ -699,8 +699,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
assert(!checkType(rInt, manifest[Nothing]))
assert(!checkType(rInt, manifest[Any]))
Block.on(rString, timeout.duration).resultOrException
Block.on(rInt, timeout.duration).resultOrException
Block.sync(rString, timeout.duration)
Block.sync(rInt, timeout.duration)
}
"futureFlowSimpleAssign" in {
@ -808,7 +808,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
nested foreach (_ l2.open)
l2.await
}
assert(Block.on(complex, timeout.duration).isCompleted)
Block.on(complex, timeout.duration) must be('completed)
}
}
}

View file

@ -19,7 +19,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val f = spawn { q.dequeue }
Block.on(f, 1 second).resultOrException must be === Some(null)
Block.sync(f, 1 second) must be(null)
}
"create a bounded mailbox with 10 capacity and with push timeout" in {
@ -115,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val consumers = for (i (1 to 4).toList) yield createConsumer
val ps = producers.map(Block.on(_, within).resultOrException.get)
val cs = consumers.map(Block.on(_, within).resultOrException.get)
val ps = producers.map(Block.sync(_, within))
val cs = consumers.map(Block.sync(_, within))
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages

View file

@ -126,7 +126,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
try {
(for (count 1 to 500) yield pool.?("Test", 20 seconds)) foreach {
Block.on(_, 20 seconds).resultOrException.get must be("Response")
Block.sync(_, 20 seconds) must be("Response")
}
} finally {
pool.stop()

View file

@ -28,7 +28,7 @@ class Ticket703Spec extends AkkaSpec {
}
}))
}).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000)))
Block.on(actorPool.?("Ping", 10000), 10 seconds).resultOrException.get must be === "Response"
Block.sync(actorPool.?("Ping", 10000), 10 seconds) must be === "Response"
}
}
}

View file

@ -27,17 +27,22 @@ import akka.dispatch.Block.CanBlock
object Block {
sealed trait CanBlock
trait Blockable {
trait Blockable[+T] {
/**
* Should throw java.util.concurrent.TimeoutException if times out
*/
def block(atMost: Duration)(implicit permit: CanBlock): this.type
/**
* Throws exceptions if cannot produce a T within the specified time
*/
def sync(atMost: Duration)(implicit permit: CanBlock): T
}
private implicit val permit = new CanBlock {}
def on[T <: Blockable](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost)
def on[T <: Blockable[_]](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost)
def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost)
}
class FutureFactory(implicit dispatcher: MessageDispatcher) {
@ -361,7 +366,7 @@ object Future {
}
}
sealed trait Future[+T] extends japi.Future[T] with Block.Blockable {
sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
implicit def dispatcher: MessageDispatcher
@ -379,6 +384,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable {
* conform, or any exception the Future was completed with. Will return None
* in case of a timeout.
*/
@deprecated("Use Block.on")
def as[A](implicit m: Manifest[A]): Option[A] = {
try Block.on(this, Duration.Inf) catch { case _: TimeoutException }
value match {
@ -394,7 +400,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable {
}
@deprecated("Used Block.on(future, timeoutDuration)")
def get: T = Block.on(this, Duration.Inf).resultOrException.get
def get: T = Block.sync(this, Duration.Inf)
/**
* Tests whether this Future has been completed.
@ -471,6 +477,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable {
* Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
* This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
*/
//FIXME implement as The result of any of the Futures, or if oth failed, the first failure
def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
/**
@ -720,28 +727,29 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
import DefaultPromise.{ FState, Success, Failure, Pending }
def block(atMost: Duration)(implicit permit: CanBlock): this.type = if (value.isDefined) this else {
protected final def tryAwait(atMost: Duration): Boolean = {
Future.blocking
val start = MILLISECONDS.toNanos(System.currentTimeMillis)
@tailrec
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (value.isEmpty && waitTimeNanos > 0) {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = System.nanoTime()
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException }
awaitUnsafe(waitTimeNanos - (MILLISECONDS.toNanos(System.currentTimeMillis) - start))
} else {
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
value.isDefined
}
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
val waitNanos = if (atMost.isFinite) atMost.toNanos else Long.MaxValue
def block(atMost: Duration)(implicit permit: CanBlock): this.type =
if (value.isDefined || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
if (awaitUnsafe(waitNanos)) this
else throw new TimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds")
}
def sync(atMost: Duration)(implicit permit: CanBlock): T = block(atMost).resultOrException.get
def value: Option[Either[Throwable, T]] = getState.value
@ -797,8 +805,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
}
private def notifyCompleted(func: Future[T] Unit) {
// FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
// TODO FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
}
}
@ -816,4 +824,5 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis
}
def block(atMost: Duration)(implicit permit: CanBlock): this.type = this
def sync(atMost: Duration)(implicit permit: CanBlock): T = resultOrException.get
}

View file

@ -75,7 +75,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
()
}
}
try { Block.on(envelopePromise, settings.ReadTimeout).resultOrException.orNull } catch { case _: TimeoutException null }
try { Block.sync(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException null }
}
def numberOfMessages: Int = {

View file

@ -168,7 +168,7 @@ class RemoteActorRefProvider(
actors.replace(path.toString, newFuture, actor)
actor
case actor: InternalActorRef actor
case future: Future[_] Block.on(future, system.settings.ActorTimeout.duration).resultOrException.get.asInstanceOf[InternalActorRef]
case future: Future[_] Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef]
}
}

View file

@ -63,9 +63,9 @@ class AgentSpec extends AkkaSpec {
val r2 = agent.alterOff((s: String) { Thread.sleep(2000); s + "c" })(5000)
val r3 = agent.alter(_ + "d")(5000)
Block.on(r1, 5 seconds).resultOrException.get must be === "ab"
Block.on(r2, 5 seconds).resultOrException.get must be === "abc"
Block.on(r3, 5 seconds).resultOrException.get must be === "abcd"
Block.sync(r1, 5 seconds) must be === "ab"
Block.sync(r2, 5 seconds) must be === "abc"
Block.sync(r3, 5 seconds) must be === "abcd"
agent() must be("abcd")
@ -141,7 +141,7 @@ class AgentSpec extends AkkaSpec {
agent send (_ + "b")
agent send (_ + "c")
Block.on(agent.future, timeout.duration).resultOrException.get must be("abc")
Block.sync(agent.future, timeout.duration) must be("abc")
agent.close()
}