This commit is contained in:
Viktor Klang 2012-01-25 22:36:03 +01:00
parent 25606e33b7
commit 5f83340d5e
4 changed files with 64 additions and 26 deletions

View file

@ -507,13 +507,20 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* Creates a new Future[A] which is completed with this Future's result if * Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise. * that conforms to A's erased type or a ClassCastException otherwise.
*/ */
final def mapTo[A](implicit m: Manifest[A]): Future[A] = { final def mapTo[A](implicit m: Manifest[A]): Future[A] =
mapTo[A](m.erasure.asInstanceOf[Class[A]])
/**
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[A](clazz: Class[A]): Future[A] = {
val fa = Promise[A]() val fa = Promise[A]()
onComplete { onComplete {
case l: Left[_, _] fa complete l.asInstanceOf[Either[Throwable, A]] case l: Left[_, _] fa complete l.asInstanceOf[Either[Throwable, A]]
case Right(t) case Right(t)
fa complete (try { fa complete (try {
Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) Right(BoxedType(clazz).cast(t).asInstanceOf[A])
} catch { } catch {
case e: ClassCastException Left(e) case e: ClassCastException Left(e)
}) })

View file

@ -6,6 +6,47 @@ package akka.dispatch.japi
import akka.util.Timeout import akka.util.Timeout
import akka.japi.{ Procedure2, Procedure, Function JFunc, Option JOption } import akka.japi.{ Procedure2, Procedure, Function JFunc, Option JOption }
class Callback[-T] extends PartialFunction[T, Unit] {
override final def isDefinedAt(t: T): Boolean = true
override final def apply(t: T): Unit = on(t)
protected def on(result: T): Unit = ()
}
abstract class OnSuccess[-T] extends Callback[T] {
protected final override def on(result: T) = onSuccess(result)
def onSuccess(result: T): Unit
}
abstract class OnFailure extends Callback[Throwable] {
protected final override def on(failure: Throwable) = onFailure(failure)
def onFailure(failure: Throwable): Unit
}
abstract class OnComplete[-T] extends Callback[Either[Throwable, T]] {
protected final override def on(value: Either[Throwable, T]): Unit = value match {
case Left(t) onComplete(t, null.asInstanceOf[T])
case Right(r) onComplete(null, r)
}
def onComplete(failure: Throwable, success: T): Unit
}
abstract class Filter[-T] extends (T Boolean) {
override final def apply(t: T): Boolean = filter(t)
def filter(result: T): Boolean
}
abstract class Foreach[-T] extends (T Unit) {
override final def apply(t: T): Unit = each(t)
def each(result: T): Unit
}
abstract class Mapper[-T, +R] extends (T R)
/*
map => A => B
flatMap => A => F[B]
foreach
*/
/* Java API */ /* Java API */
trait Future[+T] { self: akka.dispatch.Future[T] trait Future[+T] { self: akka.dispatch.Future[T]
/** /**
@ -50,14 +91,5 @@ trait Future[+T] { self: akka.dispatch.Future[T] ⇒
*/ */
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
/**
* Returns a new Future whose value will be of the specified type if it really is
* Or a failure with a ClassCastException if it wasn't.
*/
private[japi] final def mapTo[A](clazz: Class[A]): akka.dispatch.Future[A] = {
implicit val manifest: Manifest[A] = Manifest.classType(clazz)
self.mapTo[A]
}
} }

View file

@ -3,9 +3,8 @@
*/ */
package akka.util package akka.util
import java.{ lang jl }
object BoxedType { object BoxedType {
import java.{ lang jl }
private val toBoxed = Map[Class[_], Class[_]]( private val toBoxed = Map[Class[_], Class[_]](
classOf[Boolean] -> classOf[jl.Boolean], classOf[Boolean] -> classOf[jl.Boolean],
@ -18,8 +17,5 @@ object BoxedType {
classOf[Double] -> classOf[jl.Double], classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
def apply(c: Class[_]): Class[_] = { final def apply(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
if (c.isPrimitive) toBoxed(c) else c
}
} }

View file

@ -10,6 +10,9 @@ import akka.japi.Procedure2;
import akka.util.Timeout; import akka.util.Timeout;
import akka.dispatch.Await; import akka.dispatch.Await;
import akka.dispatch.Future; import akka.dispatch.Future;
import akka.dispatch.japi.Mapper;
import akka.dispatch.japi.OnSuccess;
import akka.dispatch.japi.OnFailure;
//#imports1 //#imports1
@ -110,7 +113,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -131,7 +134,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -153,7 +156,7 @@ public class FutureDocTestBase {
Thread.sleep(100); Thread.sleep(100);
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -173,7 +176,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() { Future<Integer> f2 = f1.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(final String s) { public Future<Integer> apply(final String s) {
return future(new Callable<Integer>() { return future(new Callable<Integer>() {
public Integer call() { public Integer call() {
@ -322,8 +325,8 @@ public class FutureDocTestBase {
{ {
Future<String> future = Futures.successful("foo", system.dispatcher()); Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess //#onSuccess
future.onSuccess(new Procedure<String>() { future.onSuccess(new OnSuccess<String>() {
public void apply(String result) { public void onSuccess(String result) {
if ("bar" == result) { if ("bar" == result) {
//Do something if it resulted in "bar" //Do something if it resulted in "bar"
} else { } else {
@ -337,8 +340,8 @@ public class FutureDocTestBase {
Future<String> future = Future<String> future =
Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure //#onFailure
future.onFailure( new Procedure<Throwable>() { future.onFailure( new OnFailure() {
public void apply(Throwable failure) { public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) { if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure //Do something if it was this particular failure
} else { } else {
@ -370,7 +373,7 @@ public class FutureDocTestBase {
Future<String> future1 = Futures.successful("foo", system.dispatcher()); Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher()); Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 = Future<String> future3 =
future1.zip(future2).map(new Function<scala.Tuple2<String,String>, String>() { future1.zip(future2).map(new Mapper<scala.Tuple2<String,String>, String>() {
public String apply(scala.Tuple2<String,String> zipped) { public String apply(scala.Tuple2<String,String> zipped) {
return zipped._1() + " " + zipped._2(); return zipped._1() + " " + zipped._2();
} }