From f03ecb6ae911104215fbbdbaf3a482a2706071e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 23 Feb 2010 10:05:47 +0100 Subject: [PATCH] Added "def !!!: Future" to Actor + Futures.* with util methods --- akka-core/src/main/scala/actor/Actor.scala | 16 +-- .../src/main/scala/dispatch/Future.scala | 46 +++++++- akka-core/src/test/scala/FutureTest.scala | 111 ++++++++++++++++++ 3 files changed, 160 insertions(+), 13 deletions(-) create mode 100644 akka-core/src/test/scala/FutureTest.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a953292756..732d3fabcd 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -90,7 +90,7 @@ object Actor extends Logging { */ def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { start - def receive = body + def receive: PartialFunction[Any, Unit] = body } /** @@ -108,8 +108,8 @@ object Actor extends Logging { * * */ - def actor[A](body: => Unit) = { - def handler[A](body: => Unit) = new { + def actor(body: => Unit) = { + def handler(body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start body @@ -536,19 +536,13 @@ trait Actor extends TransactionManagement { */ def !![T](message: Any): Option[T] = !![T](message, timeout) - - /* - //FIXME 2.8 def !!!(message: Any)(implicit sender: AnyRef = None): FutureResult = { - def !!!(message: Any)(implicit sender: AnyRef): FutureResult = { + def !!!(message: Any): FutureResult = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) - else None - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from) + postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } - */ /** * This method is evil and has been removed. Use '!!' with a timeout instead. diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index a18ac14372..c1e61695b8 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -8,10 +8,52 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit + +import java.util.concurrent.{SynchronousQueue, TimeUnit} class FutureTimeoutException(message: String) extends RuntimeException(message) +object Futures { + def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await) + + def awaitOne(futures: List[FutureResult]): FutureResult = { + var future: Option[FutureResult] = None + do { + future = futures.find(_.isCompleted) + } while (future.isEmpty) + future.get + } + + /* + def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = { + import Actor.Sender.Self + import Actor.{spawn, actor} + + case class Result(res: Option[Any]) + val handOff = new SynchronousQueue[Option[Any]] + spawn { + try { + println("f1 await") + f1.await + println("f1 offer") + handOff.offer(f1.result) + } catch {case _ => {}} + } + spawn { + try { + println("f2 await") + f2.await + println("f2 offer") + println("f2 offer: " + f2.result) + handOff.offer(f2.result) + } catch {case _ => {}} + } + Thread.sleep(100) + handOff.take + } +*/ +} + sealed trait FutureResult { def await def awaitBlocking @@ -46,7 +88,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes var start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) - if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds") + if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") } catch { case e: InterruptedException => wait = wait - (currentTimeInNanos - start) diff --git a/akka-core/src/test/scala/FutureTest.scala b/akka-core/src/test/scala/FutureTest.scala new file mode 100644 index 0000000000..d073a92557 --- /dev/null +++ b/akka-core/src/test/scala/FutureTest.scala @@ -0,0 +1,111 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.dispatch.Futures + +class FutureTest extends JUnitSuite { + class TestActor extends Actor { + def receive = { + case "Hello" => + reply("World") + case "NoReply" => {} + case "Failure" => + throw new RuntimeException("expected") + } + } + + @Test def shouldActorReplyResultThroughExplicitFuture = { + val actor = new TestActor + actor.start + val future = actor !!! "Hello" + future.await + assert(future.result.isDefined) + assert("World" === future.result.get) + actor.stop + } + + @Test def shouldActorReplyExceptionThroughExplicitFuture = { + val actor = new TestActor + actor.start + val future = actor !!! "Failure" + future.await + assert(future.exception.isDefined) + assert("expected" === future.exception.get._2.getMessage) + actor.stop + } + + /* + @Test def shouldFutureAwaitEitherLeft = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "NoReply" + val result = Futures.awaitEither(future1, future2) + assert(result.isDefined) + assert("World" === result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitEitherRight = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "NoReply" + val future2 = actor2 !!! "Hello" + val result = Futures.awaitEither(future1, future2) + assert(result.isDefined) + assert("World" === result.get) + actor1.stop + actor2.stop + } + */ + @Test def shouldFutureAwaitOneLeft = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "NoReply" + val future2 = actor2 !!! "Hello" + val result = Futures.awaitOne(List(future1, future2)) + assert(result.result.isDefined) + assert("World" === result.result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitOneRight = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "NoReply" + val result = Futures.awaitOne(List(future1, future2)) + assert(result.result.isDefined) + assert("World" === result.result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitAll = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "Hello" + Futures.awaitAll(List(future1, future2)) + assert(future1.result.isDefined) + assert("World" === future1.result.get) + assert(future2.result.isDefined) + assert("World" === future2.result.get) + actor1.stop + actor2.stop + } + +}