From 67ead66b32afa28975272240040961e581dc5cc8 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Fri, 11 Mar 2011 10:46:36 -0700 Subject: [PATCH] Improve Future API when using UntypedActors, and add overloads for Java API --- .../src/main/scala/akka/dispatch/Future.scala | 45 +++++++++++++++++-- .../java/akka/dispatch/JavaFutureTests.java | 5 ++- .../test/scala/akka/dispatch/FutureSpec.scala | 29 ++++++++---- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 445fc451f2..c6ba1e1bd7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.actor.{Actor, EventHandler} import akka.routing.Dispatcher -import akka.japi.Procedure +import akka.japi.{ Procedure, Function => JFunc } import java.util.concurrent.locks.ReentrantLock import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable} @@ -144,7 +144,7 @@ object Future { } } -sealed trait Future[T] { +sealed trait Future[+T] { /** * Blocks the current thread until the Future has been completed or the * timeout has expired. In the case of the timeout expiring a @@ -240,6 +240,37 @@ sealed trait Future[T] { } } + /** + * Creates a new Future by applying a PartialFunction to the successful + * result of this Future if a match is found, or else return a MatchError. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + */ + final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val optv = ft.value + if (optv.isDefined) { + val v = optv.get + fa complete { + if (v.isLeft) v.asInstanceOf[Either[Throwable, A]] + else { + try { + val r = v.right.get + if (pf isDefinedAt r) Right(pf(r)) + else Left(new MatchError(r)) + } catch { + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + Left(e) + } + } + } + } + } + fa + } + /** * Creates a new Future by applying a function to the successful result of * this Future. If this Future is completed with an exception then the new @@ -338,7 +369,15 @@ sealed trait Future[T] { } /* Java API */ - final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) + final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_)) + + final def map[A >: T, B](f: JFunc[A,B]): Future[B] = map(f(_)) + + final def flatMap[A >: T, B](f: JFunc[A,Future[B]]): Future[B] = flatMap(f(_)) + + final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_)) + + final def filter[A >: T](p: JFunc[A,Boolean]): Future[T] = filter(p(_)) } diff --git a/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java index b4d01ce575..d35946ce60 100644 --- a/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java @@ -3,7 +3,8 @@ package akka.dispatch; import org.junit.Test; import static org.junit.Assert.*; import java.util.concurrent.Callable; -import scala.runtime.AbstractFunction1; +import akka.japi.Function; +import akka.japi.Procedure; import scala.Some; import scala.Right; import static akka.dispatch.Futures.future; @@ -17,7 +18,7 @@ import static akka.dispatch.Futures.future; } }); - Future f2 = f1.map(new AbstractFunction1() { + Future f2 = f1.map(new Function() { public String apply(String s) { return s + " World"; } diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index b99cf68e40..3a9a3b258e 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -59,8 +59,21 @@ class FutureSpec extends JUnitSuite { @Test def shouldFutureCompose { val actor1 = actorOf[TestActor].start val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start - val future1 = actor1.!!![Any]("Hello") flatMap { case s: String => actor2.!!![Any](s) } - val future2 = actor1.!!![Any]("Hello") flatMap { case s: Int => actor2.!!![Any](s) } + val future1 = actor1 !!! "Hello" flatMap ((s: String) => actor2 !!! s) + val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String)) + val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int)) + assert(Some(Right("WORLD")) === future1.await.value) + assert(Some(Right("WORLD")) === future2.await.value) + intercept[ClassCastException] { future3.await.resultOrException } + actor1.stop + actor2.stop + } + + @Test def shouldFutureComposePatternMatch { + val actor1 = actorOf[TestActor].start + val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start + val future1 = actor1 !!! "Hello" collect { case (s: String) => s } flatMap (actor2 !!! _) + val future2 = actor1 !!! "Hello" collect { case (n: Int) => n } flatMap (actor2 !!! _) assert(Some(Right("WORLD")) === future1.await.value) intercept[MatchError] { future2.await.resultOrException } actor1.stop @@ -103,15 +116,15 @@ class FutureSpec extends JUnitSuite { }).start val future1 = for { - Res(a: Int) <- (actor !!! Req("Hello")): Future[Any] - Res(b: String) <- (actor !!! Req(a)): Future[Any] - Res(c: String) <- (actor !!! Req(7)): Future[Any] + a <- actor !!! Req("Hello") collect { case Res(x: Int) => x } + b <- actor !!! Req(a) collect { case Res(x: String) => x } + c <- actor !!! Req(7) collect { case Res(x: String) => x } } yield b + "-" + c val future2 = for { - Res(a: Int) <- (actor !!! Req("Hello")): Future[Any] - Res(b: Int) <- (actor !!! Req(a)): Future[Any] - Res(c: String) <- (actor !!! Req(7)): Future[Any] + a <- actor !!! Req("Hello") collect { case Res(x: Int) => x } + b <- actor !!! Req(a) collect { case Res(x: Int) => x } + c <- actor !!! Req(7) collect { case Res(x: String) => x } } yield b + "-" + c assert(Some(Right("10-14")) === future1.await.value)