From de758c0cc1363a31b62eaa9894084dc6c4f3eae7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 11 Dec 2011 01:29:46 +0100 Subject: [PATCH] Adding Blockable.sync to reduce usage of resultOrException.get --- .../test/scala/akka/actor/ActorRefSpec.scala | 2 +- .../test/scala/akka/dispatch/FutureSpec.scala | 32 +++++++------- .../akka/dispatch/MailboxConfigSpec.scala | 6 +-- .../scala/akka/routing/ActorPoolSpec.scala | 2 +- .../scala/akka/ticket/Ticket703Spec.scala | 2 +- .../src/main/scala/akka/dispatch/Future.scala | 43 +++++++++++-------- .../actor/mailbox/MongoBasedMailbox.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 2 +- .../scala/akka/agent/test/AgentSpec.scala | 8 ++-- 9 files changed, 54 insertions(+), 45 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index dd130bc0ad..4cbb92cb45 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -128,7 +128,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { def wrap[T](f: Promise[Actor] ⇒ T): T = { val result = Promise[Actor]() val r = f(result) - Block.on(result, 1 minute).resultOrException + Block.sync(result, 1 minute) r } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index c5d9801e00..d9e41c2941 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -304,7 +304,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - Block.on(Future.fold(futures)(0)(_ + _), timeout millis).resultOrException.get must be(45) + Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) must be(45) } "fold by composing" in { @@ -331,7 +331,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - Block.on(Future.fold(futures)(0)(_ + _), timeout millis).exception.get.getMessage must be("shouldFoldResultsWithException: expected") + intercept[Throwable] { Block.sync(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected") } } @@ -343,7 +343,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa case (l, i) if i % 2 == 0 ⇒ l += i.asInstanceOf[AnyRef] case (l, _) ⇒ l } - val result = Block.on(f.mapTo[ArrayBuffer[Int]], 10000 millis).resultOrException.get.sum + val result = Block.sync(f.mapTo[ArrayBuffer[Int]], 10000 millis).sum assert(result === 250500) } @@ -363,7 +363,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 200), timeout).mapTo[Int] } - assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).resultOrException.get === 45) + assert(Block.sync(Future.reduce(futures)(_ + _), timeout millis) === 45) } "shouldReduceResultsWithException" in { @@ -380,13 +380,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } val timeout = 10000 def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) ⇒ actor.?((idx, idx * 100), timeout).mapTo[Int] } - assert(Block.on(Future.reduce(futures)(_ + _), timeout millis).exception.get.getMessage === "shouldFoldResultsWithException: expected") + intercept[Throwable] { Block.sync(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected" } } "shouldReduceThrowIAEOnEmptyInput" in { filterException[IllegalArgumentException] { - intercept[UnsupportedOperationException] { Future.reduce(List[Future[Int]]())(_ + _).get } + intercept[UnsupportedOperationException] { Block.sync(Future.reduce(List[Future[Int]]())(_ + _), timeout.duration) } } } @@ -421,7 +421,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa filterException[ThrowableTest] { val f1 = Future[Any] { throw new ThrowableTest("test") } - intercept[ThrowableTest] { Block.on(f1, timeout.duration).resultOrException.get } + intercept[ThrowableTest] { Block.sync(f1, timeout.duration) } val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } @@ -429,10 +429,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open - assert(Block.on(f2, timeout.duration).resultOrException.get === "success") + assert(Block.sync(f2, timeout.duration) === "success") f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } - assert(Block.on(f3, timeout.duration).resultOrException.get === "SUCCESS") + assert(Block.sync(f3, timeout.duration) === "SUCCESS") } } @@ -598,17 +598,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa flow { y1 << 1 } // When this is set, it should cascade down the line assert(ly.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(Block.on(x1, 1 minute).resultOrException.get === 1) + assert(Block.sync(x1, 1 minute) === 1) assert(!lz.isOpen) flow { y2 << 9 } // When this is set, it should cascade down the line assert(lz.tryAwaitUninterruptible(2000, TimeUnit.MILLISECONDS)) - assert(Block.on(x2, 1 minute).resultOrException.get === 9) + assert(Block.sync(x2, 1 minute) === 9) - assert(List(x1, x2, y1, y2).forall(_.isCompleted == true)) + assert(List(x1, x2, y1, y2).forall(_.isCompleted)) - assert(Block.on(result, 1 minute).resultOrException.get === 10) + assert(Block.sync(result, 1 minute) === 10) } "dataFlowAPIshouldbeSlick" in { @@ -699,8 +699,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(!checkType(rInt, manifest[Nothing])) assert(!checkType(rInt, manifest[Any])) - Block.on(rString, timeout.duration).resultOrException - Block.on(rInt, timeout.duration).resultOrException + Block.sync(rString, timeout.duration) + Block.sync(rInt, timeout.duration) } "futureFlowSimpleAssign" in { @@ -808,7 +808,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa nested foreach (_ ⇒ l2.open) l2.await } - assert(Block.on(complex, timeout.duration).isCompleted) + Block.on(complex, timeout.duration) must be('completed) } } } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 7693a85632..acaa7b5c18 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -19,7 +19,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val f = spawn { q.dequeue } - Block.on(f, 1 second).resultOrException must be === Some(null) + Block.sync(f, 1 second) must be(null) } "create a bounded mailbox with 10 capacity and with push timeout" in { @@ -115,8 +115,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val consumers = for (i ← (1 to 4).toList) yield createConsumer - val ps = producers.map(Block.on(_, within).resultOrException.get) - val cs = consumers.map(Block.on(_, within).resultOrException.get) + val ps = producers.map(Block.sync(_, within)) + val cs = consumers.map(Block.sync(_, within)) ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 6b724d4d74..407120f382 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -126,7 +126,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { try { (for (count ← 1 to 500) yield pool.?("Test", 20 seconds)) foreach { - Block.on(_, 20 seconds).resultOrException.get must be("Response") + Block.sync(_, 20 seconds) must be("Response") } } finally { pool.stop() diff --git a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala index 2c11150d72..8feb284be4 100644 --- a/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/ticket/Ticket703Spec.scala @@ -28,7 +28,7 @@ class Ticket703Spec extends AkkaSpec { } })) }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 5, 1000))) - Block.on(actorPool.?("Ping", 10000), 10 seconds).resultOrException.get must be === "Response" + Block.sync(actorPool.?("Ping", 10000), 10 seconds) must be === "Response" } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index fee1d3458a..e15547ad3f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -27,17 +27,22 @@ import akka.dispatch.Block.CanBlock object Block { sealed trait CanBlock - trait Blockable { - + trait Blockable[+T] { /** * Should throw java.util.concurrent.TimeoutException if times out */ def block(atMost: Duration)(implicit permit: CanBlock): this.type + + /** + * Throws exceptions if cannot produce a T within the specified time + */ + def sync(atMost: Duration)(implicit permit: CanBlock): T } private implicit val permit = new CanBlock {} - def on[T <: Blockable](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost) + def on[T <: Blockable[_]](block: T, atMost: Duration /* = Duration.Inf*/ ): T = block.block(atMost) + def sync[T](block: Blockable[T], atMost: Duration): T = block.sync(atMost) } class FutureFactory(implicit dispatcher: MessageDispatcher) { @@ -361,7 +366,7 @@ object Future { } } -sealed trait Future[+T] extends japi.Future[T] with Block.Blockable { +sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { implicit def dispatcher: MessageDispatcher @@ -379,6 +384,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable { * conform, or any exception the Future was completed with. Will return None * in case of a timeout. */ + @deprecated("Use Block.on") def as[A](implicit m: Manifest[A]): Option[A] = { try Block.on(this, Duration.Inf) catch { case _: TimeoutException ⇒ } value match { @@ -394,7 +400,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable { } @deprecated("Used Block.on(future, timeoutDuration)") - def get: T = Block.on(this, Duration.Inf).resultOrException.get + def get: T = Block.sync(this, Duration.Inf) /** * Tests whether this Future has been completed. @@ -471,6 +477,7 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable { * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this. * This is semantically the same as: Future.firstCompletedOf(Seq(this, that)) */ + //FIXME implement as The result of any of the Futures, or if oth failed, the first failure def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize /** @@ -720,29 +727,30 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst import DefaultPromise.{ FState, Success, Failure, Pending } - def block(atMost: Duration)(implicit permit: CanBlock): this.type = if (value.isDefined) this else { + protected final def tryAwait(atMost: Duration): Boolean = { Future.blocking - val start = MILLISECONDS.toNanos(System.currentTimeMillis) @tailrec def awaitUnsafe(waitTimeNanos: Long): Boolean = { if (value.isEmpty && waitTimeNanos > 0) { val ms = NANOSECONDS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec + val start = System.nanoTime() try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } - awaitUnsafe(waitTimeNanos - (MILLISECONDS.toNanos(System.currentTimeMillis) - start)) - } else { + awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + } else value.isDefined - } } - - val waitNanos = if (atMost.isFinite) atMost.toNanos else Long.MaxValue - - if (awaitUnsafe(waitNanos)) this - else throw new TimeoutException("Futures timed out after [" + NANOSECONDS.toMillis(waitNanos) + "] milliseconds") + awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) } + def block(atMost: Duration)(implicit permit: CanBlock): this.type = + if (value.isDefined || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + + def sync(atMost: Duration)(implicit permit: CanBlock): T = block(atMost).resultOrException.get + def value: Option[Either[Throwable, T]] = getState.value @inline @@ -797,8 +805,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst } private def notifyCompleted(func: Future[T] ⇒ Unit) { - // FIXME catching all and continue isn't good for OOME, ticket #1418 - try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? + // TODO FIXME catching all and continue isn't good for OOME, ticket #1418 + try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } } } @@ -816,4 +824,5 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dis } def block(atMost: Duration)(implicit permit: CanBlock): this.type = this + def sync(atMost: Duration)(implicit permit: CanBlock): T = resultOrException.get } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 63f59d2d6c..1f0b6d8587 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -75,7 +75,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { () } } - try { Block.on(envelopePromise, settings.ReadTimeout).resultOrException.orNull } catch { case _: TimeoutException ⇒ null } + try { Block.sync(envelopePromise, settings.ReadTimeout) } catch { case _: TimeoutException ⇒ null } } def numberOfMessages: Int = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 638901a02e..316fb9693c 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -168,7 +168,7 @@ class RemoteActorRefProvider( actors.replace(path.toString, newFuture, actor) actor case actor: InternalActorRef ⇒ actor - case future: Future[_] ⇒ Block.on(future, system.settings.ActorTimeout.duration).resultOrException.get.asInstanceOf[InternalActorRef] + case future: Future[_] ⇒ Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef] } } diff --git a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala index affd6d4c35..d23768b276 100644 --- a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala +++ b/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala @@ -63,9 +63,9 @@ class AgentSpec extends AkkaSpec { val r2 = agent.alterOff((s: String) ⇒ { Thread.sleep(2000); s + "c" })(5000) val r3 = agent.alter(_ + "d")(5000) - Block.on(r1, 5 seconds).resultOrException.get must be === "ab" - Block.on(r2, 5 seconds).resultOrException.get must be === "abc" - Block.on(r3, 5 seconds).resultOrException.get must be === "abcd" + Block.sync(r1, 5 seconds) must be === "ab" + Block.sync(r2, 5 seconds) must be === "abc" + Block.sync(r3, 5 seconds) must be === "abcd" agent() must be("abcd") @@ -141,7 +141,7 @@ class AgentSpec extends AkkaSpec { agent send (_ + "b") agent send (_ + "c") - Block.on(agent.future, timeout.duration).resultOrException.get must be("abc") + Block.sync(agent.future, timeout.duration) must be("abc") agent.close() }