This commit is contained in:
Viktor Klang 2012-01-26 14:15:25 +01:00
parent 87cb83f0d7
commit b310407334
5 changed files with 80 additions and 132 deletions

View file

@ -45,7 +45,7 @@ public class JavaFutureTests {
}
}, system.dispatcher());
Future<String> f2 = f1.map(new Function<String, String>() {
Future<String> f2 = f1.map(new Mapper<String, String>() {
public String apply(String s) {
return s + " World";
}
@ -59,8 +59,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onSuccess(new Procedure<String>() {
public void apply(String result) {
f.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
@ -76,8 +76,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onFailure(new Procedure<Throwable>() {
public void apply(Throwable t) {
f.onFailure(new OnFailure() {
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
@ -94,8 +94,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onComplete(new Procedure2<Throwable,String>() {
public void apply(Throwable t, String r) {
f.onComplete(new OnComplete<String>() {
public void onComplete(Throwable t, String r) {
latch.countDown();
}
});
@ -110,8 +110,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.foreach(new Procedure<String>() {
public void apply(String future) {
f.foreach(new Foreach<String>() {
public void each(String future) {
latch.countDown();
}
});
@ -127,7 +127,7 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher());
cf.success("1000");
Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
Future<Integer> r = f.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(String r) {
latch.countDown();
Promise<Integer> cf = Futures.promise(system.dispatcher());
@ -146,8 +146,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
Future<String> r = f.filter(new Function<String, Boolean>() {
public Boolean apply(String r) {
Future<String> r = f.filter(new Filter<String>() {
public boolean filter(String r) {
latch.countDown();
return r.equals("foo");
}

View file

@ -343,7 +343,7 @@ object Future {
}
}
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
sealed trait Future[+T] extends Await.Awaitable[T] {
implicit def executor: ExecutionContext
@ -828,3 +828,65 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe
case Right(r) r
}
}
object japi {
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class CallbackBridge[-T] extends PartialFunction[T, Unit] {
override final def isDefinedAt(t: T): Boolean = true
override final def apply(t: T): Unit = internal(t)
protected def internal(result: T): Unit = ()
}
@deprecated("Do not use this directly, use 'Recover'", "2.0")
class RecoverBridge[+T] extends PartialFunction[Throwable, T] {
override final def isDefinedAt(t: Throwable): Boolean = true
override final def apply(t: Throwable): T = internal(t)
protected def internal(result: Throwable): T = null.asInstanceOf[T]
}
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class BooleanFunctionBridge[-T] extends scala.Function1[T, Boolean] {
override final def apply(t: T): Boolean = internal(t)
protected def internal(result: T): Boolean = false
}
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class UnitFunctionBridge[-T] extends (T Unit) {
override final def apply(t: T): Unit = internal(t)
protected def internal(result: T): Unit = ()
}
}
abstract class OnSuccess[-T] extends japi.CallbackBridge[T] {
protected final override def internal(result: T) = onSuccess(result)
def onSuccess(result: T): Unit
}
abstract class OnFailure extends japi.CallbackBridge[Throwable] {
protected final override def internal(failure: Throwable) = onFailure(failure)
def onFailure(failure: Throwable): Unit
}
abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] {
protected final override def internal(value: Either[Throwable, T]): Unit = value match {
case Left(t) onComplete(t, null.asInstanceOf[T])
case Right(r) onComplete(null, r)
}
def onComplete(failure: Throwable, success: T): Unit
}
abstract class Recover[+T] extends japi.RecoverBridge[T] {
protected final override def internal(result: Throwable): T = recover(result)
def recover(failure: Throwable): T
}
abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] {
override final def internal(t: T): Boolean = filter(t)
def filter(result: T): Boolean
}
abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] {
override final def internal(t: T): Unit = each(t)
def each(result: T): Unit
}
abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R]

View file

@ -1,108 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch.japi
import akka.util.Timeout
import akka.japi.{ Procedure2, Procedure, Function JFunc, Option JOption }
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class Callback[-T] extends PartialFunction[T, Unit] {
override final def isDefinedAt(t: T): Boolean = true
override final def apply(t: T): Unit = on(t)
protected def on(result: T): Unit = ()
}
abstract class OnSuccess[-T] extends Callback[T] {
protected final override def on(result: T) = onSuccess(result)
def onSuccess(result: T): Unit
}
abstract class OnFailure extends Callback[Throwable] {
protected final override def on(failure: Throwable) = onFailure(failure)
def onFailure(failure: Throwable): Unit
}
abstract class OnComplete[-T] extends Callback[Either[Throwable, T]] {
protected final override def on(value: Either[Throwable, T]): Unit = value match {
case Left(t) onComplete(t, null.asInstanceOf[T])
case Right(r) onComplete(null, r)
}
def onComplete(failure: Throwable, success: T): Unit
}
@deprecated("Do not use this directly, use 'Recover'", "2.0")
class RecoverBridge[+T] extends PartialFunction[Throwable, T] {
override final def isDefinedAt(t: Throwable): Boolean = true
override final def apply(t: Throwable): T = on(t)
protected def on(result: Throwable): T = null.asInstanceOf[T]
}
abstract class Recover[+T] extends RecoverBridge[T] {
protected final override def on(result: Throwable): T = recover(result)
def recover(failure: Throwable): T
}
abstract class Filter[-T] extends (T Boolean) {
override final def apply(t: T): Boolean = filter(t)
def filter(result: T): Boolean
}
abstract class Foreach[-T] extends (T Unit) {
override final def apply(t: T): Unit = each(t)
def each(result: T): Unit
}
abstract class Mapper[-T, +R] extends (T R)
/*
map => A => B
flatMap => A => F[B]
foreach
*/
/* Java API */
trait Future[+T] { self: akka.dispatch.Future[T]
/**
* Asynchronously called when this Future gets a successful result
*/
private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit])
/**
* Asynchronously called when this Future gets a failed result
*/
private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
/**
* Asynchronously called when this future is completed with either a failed or a successful result
* In case of a success, the first parameter (Throwable) will be null
* In case of a failure, the second parameter (T) will be null
* For no reason will both be null or neither be null
*/
private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t proc(t, null.asInstanceOf[T]), r proc(null, r)))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it.
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
/**
* Asynchronously applies the provided Procedure to the (if any) successful result of this Future
* Provided Procedure will not be called in case of no-result or in case of failed result
*/
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
/**
* Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
}

View file

@ -12,6 +12,7 @@ import akka.actor.Props;
//#import-future
import akka.dispatch.Future;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Await;
import akka.util.Duration;
import akka.util.Timeout;
@ -236,7 +237,7 @@ public class UntypedActorDocTestBase {
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new akka.japi.Function<Iterable<Object>, Result>() {
final Future<Result> transformed = aggregate.map(new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String s = (String) it.next();

View file

@ -4,16 +4,10 @@
package akka.docs.future;
//#imports1
import akka.dispatch.Promise;
import akka.dispatch.*;
import akka.japi.Procedure;
import akka.japi.Procedure2;
import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
import akka.dispatch.japi.Mapper;
import akka.dispatch.japi.OnSuccess;
import akka.dispatch.japi.OnFailure;
import akka.dispatch.japi.Filter;
//#imports1
@ -61,7 +55,6 @@ import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import static org.junit.Assert.*;
@ -355,8 +348,8 @@ public class FutureDocTestBase {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new Procedure2<Throwable, String>() {
public void apply(Throwable failure, String result) {
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {