Improve Future API when using UntypedActors, and add overloads for Java API

This commit is contained in:
Derek Williams 2011-03-11 10:46:36 -07:00
parent 89f2bf3cb9
commit 67ead66b32
3 changed files with 66 additions and 13 deletions

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.AkkaException
import akka.actor.{Actor, EventHandler}
import akka.routing.Dispatcher
import akka.japi.Procedure
import akka.japi.{ Procedure, Function => JFunc }
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
@ -144,7 +144,7 @@ object Future {
}
}
sealed trait Future[T] {
sealed trait Future[+T] {
/**
* Blocks the current thread until the Future has been completed or the
* timeout has expired. In the case of the timeout expiring a
@ -240,6 +240,37 @@ sealed trait Future[T] {
}
}
/**
* Creates a new Future by applying a PartialFunction to the successful
* result of this Future if a match is found, or else return a MatchError.
* If this Future is completed with an exception then the new Future will
* also contain this exception.
*/
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
onComplete { ft =>
val optv = ft.value
if (optv.isDefined) {
val v = optv.get
fa complete {
if (v.isLeft) v.asInstanceOf[Either[Throwable, A]]
else {
try {
val r = v.right.get
if (pf isDefinedAt r) Right(pf(r))
else Left(new MatchError(r))
} catch {
case e: Exception =>
EventHandler notifyListeners EventHandler.Error(e, this)
Left(e)
}
}
}
}
}
fa
}
/**
* Creates a new Future by applying a function to the successful result of
* this Future. If this Future is completed with an exception then the new
@ -338,7 +369,15 @@ sealed trait Future[T] {
}
/* Java API */
final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))
final def map[A >: T, B](f: JFunc[A,B]): Future[B] = map(f(_))
final def flatMap[A >: T, B](f: JFunc[A,Future[B]]): Future[B] = flatMap(f(_))
final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_))
final def filter[A >: T](p: JFunc[A,Boolean]): Future[T] = filter(p(_))
}

View file

@ -3,7 +3,8 @@ package akka.dispatch;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.concurrent.Callable;
import scala.runtime.AbstractFunction1;
import akka.japi.Function;
import akka.japi.Procedure;
import scala.Some;
import scala.Right;
import static akka.dispatch.Futures.future;
@ -17,7 +18,7 @@ import static akka.dispatch.Futures.future;
}
});
Future f2 = f1.map(new AbstractFunction1<String, String>() {
Future f2 = f1.map(new Function<String, String>() {
public String apply(String s) {
return s + " World";
}

View file

@ -59,8 +59,21 @@ class FutureSpec extends JUnitSuite {
@Test def shouldFutureCompose {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
val future1 = actor1.!!![Any]("Hello") flatMap { case s: String => actor2.!!![Any](s) }
val future2 = actor1.!!![Any]("Hello") flatMap { case s: Int => actor2.!!![Any](s) }
val future1 = actor1 !!! "Hello" flatMap ((s: String) => actor2 !!! s)
val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String))
val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int))
assert(Some(Right("WORLD")) === future1.await.value)
assert(Some(Right("WORLD")) === future2.await.value)
intercept[ClassCastException] { future3.await.resultOrException }
actor1.stop
actor2.stop
}
@Test def shouldFutureComposePatternMatch {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
val future1 = actor1 !!! "Hello" collect { case (s: String) => s } flatMap (actor2 !!! _)
val future2 = actor1 !!! "Hello" collect { case (n: Int) => n } flatMap (actor2 !!! _)
assert(Some(Right("WORLD")) === future1.await.value)
intercept[MatchError] { future2.await.resultOrException }
actor1.stop
@ -103,15 +116,15 @@ class FutureSpec extends JUnitSuite {
}).start
val future1 = for {
Res(a: Int) <- (actor !!! Req("Hello")): Future[Any]
Res(b: String) <- (actor !!! Req(a)): Future[Any]
Res(c: String) <- (actor !!! Req(7)): Future[Any]
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
b <- actor !!! Req(a) collect { case Res(x: String) => x }
c <- actor !!! Req(7) collect { case Res(x: String) => x }
} yield b + "-" + c
val future2 = for {
Res(a: Int) <- (actor !!! Req("Hello")): Future[Any]
Res(b: Int) <- (actor !!! Req(a)): Future[Any]
Res(c: String) <- (actor !!! Req(7)): Future[Any]
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
b <- actor !!! Req(a) collect { case Res(x: Int) => x }
c <- actor !!! Req(7) collect { case Res(x: String) => x }
} yield b + "-" + c
assert(Some(Right("10-14")) === future1.await.value)