Merge pull request #270 from jboner/wip-fix-futures-√

Wip fix futures √
This commit is contained in:
viktorklang 2012-01-31 07:10:59 -08:00
commit a0a4a85e49
12 changed files with 485 additions and 126 deletions

View file

@ -14,6 +14,7 @@ import java.util.LinkedList;
import java.lang.Iterable; import java.lang.Iterable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static akka.japi.Util.manifest;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
@ -45,7 +46,7 @@ public class JavaFutureTests {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<String> f2 = f1.map(new Function<String, String>() { Future<String> f2 = f1.map(new Mapper<String, String>() {
public String apply(String s) { public String apply(String s) {
return s + " World"; return s + " World";
} }
@ -59,8 +60,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onSuccess(new Procedure<String>() { f.onSuccess(new OnSuccess<String>() {
public void apply(String result) { public void onSuccess(String result) {
if (result.equals("foo")) if (result.equals("foo"))
latch.countDown(); latch.countDown();
} }
@ -76,8 +77,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onFailure(new Procedure<Throwable>() { f.onFailure(new OnFailure() {
public void apply(Throwable t) { public void onFailure(Throwable t) {
if (t instanceof NullPointerException) if (t instanceof NullPointerException)
latch.countDown(); latch.countDown();
} }
@ -94,8 +95,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.onComplete(new Procedure2<Throwable,String>() { f.onComplete(new OnComplete<String>() {
public void apply(Throwable t, String r) { public void onComplete(Throwable t, String r) {
latch.countDown(); latch.countDown();
} }
}); });
@ -110,8 +111,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
f.foreach(new Procedure<String>() { f.foreach(new Foreach<String>() {
public void apply(String future) { public void each(String future) {
latch.countDown(); latch.countDown();
} }
}); });
@ -127,7 +128,7 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
cf.success("1000"); cf.success("1000");
Future<String> f = cf; Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() { Future<Integer> r = f.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(String r) { public Future<Integer> apply(String r) {
latch.countDown(); latch.countDown();
Promise<Integer> cf = Futures.promise(system.dispatcher()); Promise<Integer> cf = Futures.promise(system.dispatcher());
@ -146,8 +147,8 @@ public class JavaFutureTests {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher()); Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf; Future<String> f = cf;
Future<String> r = f.filter(new Function<String, Boolean>() { Future<String> r = f.filter(new Filter<String>() {
public Boolean apply(String r) { public boolean filter(String r) {
latch.countDown(); latch.countDown();
return r.equals("foo"); return r.equals("foo");
} }
@ -267,15 +268,55 @@ public class JavaFutureTests {
} }
}, system.dispatcher()); }, system.dispatcher());
assertEquals(expect, Await.result(f, timeout)); assertEquals(expect, Await.result(f, timeout).get());
} }
@Test @Test
public void BlockMustBeCallable() { public void blockMustBeCallable() {
Promise<String> p = Futures.promise(system.dispatcher()); Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS); Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo"); p.success("foo");
Await.ready(p, d); Await.ready(p, d);
assertEquals(Await.result(p, d), "foo"); assertEquals(Await.result(p, d), "foo");
} }
@Test
public void mapToMustBeCallable() {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<String> f = p.future().mapTo(manifest(String.class));
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.success("foo");
Await.ready(p, d);
assertEquals(Await.result(p, d), "foo");
}
@Test
public void recoverToMustBeCallable() {
final IllegalStateException fail = new IllegalStateException("OHNOES");
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recover(new Recover<Object>() {
public Object recover(Throwable t) throws Throwable {
if (t == fail) return "foo";
else throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.failure(fail);
assertEquals(Await.result(f, d), "foo");
}
@Test
public void tryRecoverToMustBeCallable() {
final IllegalStateException fail = new IllegalStateException("OHNOES");
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().tryRecover(new Recover<Future<Object>>() {
public Future<Object> recover(Throwable t) throws Throwable {
if (t == fail) return Futures.<Object>successful("foo", system.dispatcher()).future();
else throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.failure(fail);
assertEquals(Await.result(f, d), "foo");
}
} }

View file

@ -13,10 +13,10 @@ import akka.testkit.AkkaSpec
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch import akka.testkit.TestLatch
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import scala.runtime.NonLocalReturnControl import scala.runtime.NonLocalReturnControl
import akka.pattern.ask import akka.pattern.ask
import java.lang.{ IllegalStateException, ArithmeticException } import java.lang.{ IllegalStateException, ArithmeticException }
import java.util.concurrent._
object FutureSpec { object FutureSpec {
class TestActor extends Actor { class TestActor extends Actor {
@ -39,7 +39,6 @@ object FutureSpec {
} }
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class JavaFutureSpec extends JavaFutureTests with JUnitSuite class JavaFutureSpec extends JavaFutureTests with JUnitSuite
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -303,6 +302,32 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
} }
} }
"tryRecover from exceptions" in {
val o = new IllegalStateException("original")
val r = new IllegalStateException("recovered")
intercept[IllegalStateException] {
Await.result(Promise.failed[String](o) tryRecover { case _ if false == true Promise.successful("yay!") }, timeout.duration)
} must be(o)
Await.result(Promise.failed[String](o) tryRecover { case _ Promise.successful("yay!") }, timeout.duration) must equal("yay!")
intercept[IllegalStateException] {
Await.result(Promise.failed[String](o) tryRecover { case _ Promise.failed[String](r) }, timeout.duration)
} must be(r)
}
"andThen like a boss" in {
val q = new LinkedBlockingQueue[Int]
for (i 1 to 1000) {
Await.result(Future { q.add(1); 3 } andThen { case _ q.add(2) } andThen { case Right(0) q.add(Int.MaxValue) } andThen { case _ q.add(3); }, timeout.duration) must be(3)
q.poll() must be(1)
q.poll() must be(2)
q.poll() must be(3)
q.clear()
}
}
"firstCompletedOf" in { "firstCompletedOf" in {
val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5) val futures = Vector.fill[Future[Int]](10)(Promise[Int]()) :+ Promise.successful[Int](5)
Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5) Await.result(Future.firstCompletedOf(futures), timeout.duration) must be(5)
@ -856,7 +881,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"be completed" in { f((future, _) future must be('completed)) } "be completed" in { f((future, _) future must be('completed)) }
"contain a value" in { f((future, result) future.value must be(Some(Right(result)))) } "contain a value" in { f((future, result) future.value must be(Some(Right(result)))) }
"return result with 'get'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) } "return result with 'get'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) }
"return result with 'Await.sync'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) } "return result with 'Await.result'" in { f((future, result) Await.result(future, timeout.duration) must be(result)) }
"not timeout" in { f((future, _) Await.ready(future, 0 millis)) } "not timeout" in { f((future, _) Await.ready(future, 0 millis)) }
"filter result" in { "filter result" in {
f { (future, result) f { (future, result)
@ -907,7 +932,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}) })
} }
"throw exception with 'get'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "throw exception with 'get'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"throw exception with 'Await.sync'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) } "throw exception with 'Await.result'" in { f((future, message) (evaluating { Await.result(future, timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with filter" in { "retain exception with filter" in {
f { (future, message) f { (future, message)
(evaluating { Await.result(future filter (_ true), timeout.duration) } must produce[E]).getMessage must be(message) (evaluating { Await.result(future filter (_ true), timeout.duration) } must produce[E]).getMessage must be(message)

View file

@ -340,9 +340,9 @@ object Future {
} }
} }
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { sealed trait Future[+T] extends Await.Awaitable[T] {
implicit def executor: ExecutionContext protected implicit def executor: ExecutionContext
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X]) case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
@ -362,7 +362,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case Right(r) that onSuccess { case r2 p success ((r, r2)) } case Right(r) that onSuccess { case r2 p success ((r, r2)) }
} }
that onFailure { case f p failure f } that onFailure { case f p failure f }
p p.future
} }
/** /**
@ -435,7 +435,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case Left(t) p success t case Left(t) p success t
case Right(r) p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r) case Right(r) p failure new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + r)
} }
p p.future
} }
/** /**
@ -448,7 +448,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case r @ Right(_) p complete r case r @ Right(_) p complete r
case _ p completeWith that case _ p completeWith that
} }
p p.future
} }
/** /**
@ -463,12 +463,59 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
* </pre> * </pre>
*/ */
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = {
val future = Promise[A]() val p = Promise[A]()
onComplete { onComplete {
case Left(e) if pf isDefinedAt e future.complete(try { Right(pf(e)) } catch { case x: Exception Left(x) }) case Left(e) if pf isDefinedAt e p.complete(try { Right(pf(e)) } catch { case x: Exception Left(x) })
case otherwise future complete otherwise case otherwise p complete otherwise
} }
future p.future
}
/**
* Returns a new Future that will, in case this future fails,
* be completed with the resulting Future of the given PartialFunction,
* if the given PartialFunction matches the failure of the original Future.
*
* If the PartialFunction throws, that Throwable will be propagated to the returned Future.
*
* Example:
*
* {{{
* val f = Future { Int.MaxValue }
* Future (6 / 0) tryRecover { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*/
def tryRecover[U >: T](pf: PartialFunction[Throwable, Future[U]]): Future[U] = {
val p = Promise[U]()
onComplete {
case Left(t) if pf isDefinedAt t
try { p completeWith pf(t) } catch { case t: Throwable p complete resolve(Left(t)) }
case otherwise p complete otherwise
}
p.future
}
/**
* Returns a new Future that will contain the completed result of this Future,
* and which will invoke the supplied PartialFunction when completed.
*
* This allows for establishing order of side-effects.
*
* {{{
* Future { 5 } andThen {
* case something => assert(something is awesome)
* } andThen {
* case Left(t) => handleProblem(t)
* case Right(v) => dealWithSuccess(v)
* }
* }}}
*/
def andThen[U](pf: PartialFunction[Either[Throwable, T], U]): Future[T] = {
val p = Promise[T]()
onComplete { case r try if (pf isDefinedAt r) pf(r) finally p complete r }
p.future
} }
/** /**
@ -503,6 +550,10 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
/** /**
* Creates a new Future[A] which is completed with this Future's result if * Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise. * that conforms to A's erased type or a ClassCastException otherwise.
*
* When used from Java, to create the Manifest, use:
* import static akka.japi.Util.manifest;
* future.mapTo(manifest(MyClass.class));
*/ */
final def mapTo[A](implicit m: Manifest[A]): Future[A] = { final def mapTo[A](implicit m: Manifest[A]): Future[A] = {
val fa = Promise[A]() val fa = Promise[A]()
@ -515,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case e: ClassCastException Left(e) case e: ClassCastException Left(e)
}) })
} }
fa fa.future
} }
/** /**
@ -546,7 +597,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
logError("Future.flatMap", e) logError("Future.flatMap", e)
} }
} }
p p.future
} }
/** /**
@ -586,7 +637,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
Left(e) Left(e)
}) })
} }
p p.future
} }
protected def logError(msg: String, problem: Throwable): Unit = { protected def logError(msg: String, problem: Throwable): Unit = {
@ -818,3 +869,158 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe
case Right(r) r case Right(r) r
} }
} }
/**
* This class contains bridge classes between Scala and Java.
* Internal use only.
*/
object japi {
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class CallbackBridge[-T] extends PartialFunction[T, Unit] {
override final def isDefinedAt(t: T): Boolean = true
override final def apply(t: T): Unit = internal(t)
protected def internal(result: T): Unit = ()
}
@deprecated("Do not use this directly, use 'Recover'", "2.0")
class RecoverBridge[+T] extends PartialFunction[Throwable, T] {
override final def isDefinedAt(t: Throwable): Boolean = true
override final def apply(t: Throwable): T = internal(t)
protected def internal(result: Throwable): T = null.asInstanceOf[T]
}
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class BooleanFunctionBridge[-T] extends scala.Function1[T, Boolean] {
override final def apply(t: T): Boolean = internal(t)
protected def internal(result: T): Boolean = false
}
@deprecated("Do not use this directly, use subclasses of this", "2.0")
class UnitFunctionBridge[-T] extends (T Unit) {
override final def apply(t: T): Unit = internal(t)
protected def internal(result: T): Unit = ()
}
}
/**
* Callback for when a Future is completed successfully
* SAM (Single Abstract Method) class
*
* Java API
*/
abstract class OnSuccess[-T] extends japi.CallbackBridge[T] {
protected final override def internal(result: T) = onSuccess(result)
/**
* This method will be invoked once when/if a Future that this callback is registered on
* becomes successfully completed
*/
def onSuccess(result: T): Unit
}
/**
* Callback for when a Future is completed with a failure
* SAM (Single Abstract Method) class
*
* Java API
*/
abstract class OnFailure extends japi.CallbackBridge[Throwable] {
protected final override def internal(failure: Throwable) = onFailure(failure)
/**
* This method will be invoked once when/if a Future that this callback is registered on
* becomes completed with a failure
*/
def onFailure(failure: Throwable): Unit
}
/**
* Callback for when a Future is completed with either failure or a success
* SAM (Single Abstract Method) class
*
* Java API
*/
abstract class OnComplete[-T] extends japi.CallbackBridge[Either[Throwable, T]] {
protected final override def internal(value: Either[Throwable, T]): Unit = value match {
case Left(t) onComplete(t, null.asInstanceOf[T])
case Right(r) onComplete(null, r)
}
/**
* This method will be invoked once when/if a Future that this callback is registered on
* becomes completed with a failure or a success.
* In the case of success then "failure" will be null, and in the case of failure the "success" will be null.
*/
def onComplete(failure: Throwable, success: T): Unit
}
/**
* Callback for the Future.recover operation that conditionally turns failures into successes.
*
* SAM (Single Abstract Method) class
*
* Java API
*/
abstract class Recover[+T] extends japi.RecoverBridge[T] {
protected final override def internal(result: Throwable): T = recover(result)
/**
* This method will be invoked once when/if the Future this recover callback is registered on
* becomes completed with a failure.
*
* @returns a successful value for the passed in failure
* @throws the passed in failure to propagate it.
*
* Java API
*/
@throws(classOf[Throwable])
def recover(failure: Throwable): T
}
/**
* Callback for the Future.filter operation that creates a new Future which will
* conditionally contain the success of another Future.
*
* SAM (Single Abstract Method) class
* Java API
*/
abstract class Filter[-T] extends japi.BooleanFunctionBridge[T] {
override final def internal(t: T): Boolean = filter(t)
/**
* This method will be invoked once when/if a Future that this callback is registered on
* becomes completed with a success.
*
* @returns true if the successful value should be propagated to the new Future or not
*/
def filter(result: T): Boolean
}
/**
* Callback for the Future.foreach operation that will be invoked if the Future that this callback
* is registered on becomes completed with a success. This method is essentially the same operation
* as onSuccess.
*
* SAM (Single Abstract Method) class
* Java API
*/
abstract class Foreach[-T] extends japi.UnitFunctionBridge[T] {
override final def internal(t: T): Unit = each(t)
/**
* This method will be invoked once when/if a Future that this callback is registered on
* becomes successfully completed
*/
def each(result: T): Unit
}
/**
* Callback for the Future.map and Future.flatMap operations that will be invoked
* if the Future that this callback is registered on becomes completed with a success.
* This callback is the equivalent of an akka.japi.Function
*
* SAM (Single Abstract Method) class
*
* Java API
*/
abstract class Mapper[-T, +R] extends scala.runtime.AbstractFunction1[T, R]

View file

@ -1,62 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch.japi
import akka.japi.{ Procedure2, Procedure, Function JFunc }
/* Java API */
trait Future[+T] { self: akka.dispatch.Future[T]
/**
* Asynchronously called when this Future gets a successful result
*/
private[japi] final def onSuccess[A >: T](proc: Procedure[A]): this.type = self.onSuccess({ case r proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit])
/**
* Asynchronously called when this Future gets a failed result
*/
private[japi] final def onFailure(proc: Procedure[Throwable]): this.type = self.onFailure({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
/**
* Asynchronously called when this future is completed with either a failed or a successful result
* In case of a success, the first parameter (Throwable) will be null
* In case of a failure, the second parameter (T) will be null
* For no reason will both be null or neither be null
*/
private[japi] final def onComplete[A >: T](proc: Procedure2[Throwable, A]): this.type = self.onComplete(_.fold(t proc(t, null.asInstanceOf[T]), r proc(null, r)))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
/**
* Asynchronously applies the provided function to the (if any) successful result of this Future and flattens it.
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
/**
* Asynchronously applies the provided Procedure to the (if any) successful result of this Future
* Provided Procedure will not be called in case of no-result or in case of failed result
*/
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
/**
* Returns a new Future whose successful result will be the successful result of this Future if that result conforms to the provided predicate
* Any failure of this Future will be propagated to the Future returned by this method.
*/
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
/**
* Returns a new Future whose value will be of the specified type if it really is
* Or a failure with a ClassCastException if it wasn't.
*/
private[japi] final def mapTo[A](clazz: Class[A]): akka.dispatch.Future[A] = {
implicit val manifest: Manifest[A] = Manifest.classType(clazz)
self.mapTo[A]
}
}

View file

@ -119,3 +119,13 @@ object Option {
implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala implicit def java2ScalaOption[A](o: Option[A]): scala.Option[A] = o.asScala
implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] = if (o.isDefined) some(o.get) else none implicit def scala2JavaOption[A](o: scala.Option[A]): Option[A] = if (o.isDefined) some(o.get) else none
} }
/**
* This class hold common utilities for Java
*/
object Util {
/**
* Given a Class returns a Scala Manifest of that Class
*/
def manifest[T](clazz: Class[T]): Manifest[T] = Manifest.classType(clazz)
}

View file

@ -3,9 +3,8 @@
*/ */
package akka.util package akka.util
import java.{ lang jl }
object BoxedType { object BoxedType {
import java.{ lang jl }
private val toBoxed = Map[Class[_], Class[_]]( private val toBoxed = Map[Class[_], Class[_]](
classOf[Boolean] -> classOf[jl.Boolean], classOf[Boolean] -> classOf[jl.Boolean],
@ -18,8 +17,5 @@ object BoxedType {
classOf[Double] -> classOf[jl.Double], classOf[Double] -> classOf[jl.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) classOf[Unit] -> classOf[scala.runtime.BoxedUnit])
def apply(c: Class[_]): Class[_] = { final def apply(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
if (c.isPrimitive) toBoxed(c) else c
}
} }

View file

@ -12,6 +12,7 @@ import akka.actor.Props;
//#import-future //#import-future
import akka.dispatch.Future; import akka.dispatch.Future;
import akka.dispatch.Futures; import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Await; import akka.dispatch.Await;
import akka.util.Duration; import akka.util.Duration;
import akka.util.Timeout; import akka.util.Timeout;
@ -236,8 +237,8 @@ public class UntypedActorDocTestBase {
futures.add(ask(actorB, "reqeest", t)); // using timeout from above futures.add(ask(actorB, "reqeest", t)); // using timeout from above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher()); final Future<Iterable<Object>> aggregate = Futures.sequence(futures, system.dispatcher());
final Future<Result> transformed = aggregate.map(new akka.japi.Function<Iterable<Object>, Result>() { final Future<Result> transformed = aggregate.map(new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) { public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator(); final Iterator<Object> it = coll.iterator();
final String s = (String) it.next(); final String s = (String) it.next();

View file

@ -11,6 +11,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import akka.actor.*; import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.Function; import akka.japi.Function;
import akka.util.Duration; import akka.util.Duration;
import akka.util.Timeout; import akka.util.Timeout;
@ -19,6 +20,8 @@ import akka.event.LoggingAdapter;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import static akka.japi.Util.manifest;
import static akka.actor.SupervisorStrategy.*; import static akka.actor.SupervisorStrategy.*;
import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipeTo; import static akka.pattern.Patterns.pipeTo;
@ -142,10 +145,12 @@ public class FaultHandlingDocSample {
counterService.tell(new Increment(1), getSelf()); counterService.tell(new Increment(1), getSelf());
// Send current progress to the initial sender // Send current progress to the initial sender
pipeTo(ask(counterService, GetCurrentCount, askTimeout).map(new Function<CurrentCount, Progress>() { pipeTo(ask(counterService, GetCurrentCount, askTimeout)
public Progress apply(CurrentCount c) { .mapTo(manifest(CurrentCount.class))
return new Progress(100.0 * c.count / totalCount); .map(new Mapper<CurrentCount, Progress>() {
} public Progress apply(CurrentCount c) {
return new Progress(100.0 * c.count / totalCount);
}
}), progressListener); }), progressListener);
} else { } else {
unhandled(msg); unhandled(msg);

View file

@ -4,12 +4,10 @@
package akka.docs.future; package akka.docs.future;
//#imports1 //#imports1
import akka.dispatch.Promise; import akka.dispatch.*;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.japi.Procedure2; import akka.japi.Procedure2;
import akka.util.Timeout; import akka.util.Timeout;
import akka.dispatch.Await;
import akka.dispatch.Future;
//#imports1 //#imports1
@ -57,7 +55,6 @@ import akka.actor.ActorSystem;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.dispatch.Futures;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -110,7 +107,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -131,7 +128,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -153,7 +150,7 @@ public class FutureDocTestBase {
Thread.sleep(100); Thread.sleep(100);
Future<Integer> f2 = f1.map(new Function<String, Integer>() { Future<Integer> f2 = f1.map(new Mapper<String, Integer>() {
public Integer apply(String s) { public Integer apply(String s) {
return s.length(); return s.length();
} }
@ -173,7 +170,7 @@ public class FutureDocTestBase {
} }
}, system.dispatcher()); }, system.dispatcher());
Future<Integer> f2 = f1.flatMap(new Function<String, Future<Integer>>() { Future<Integer> f2 = f1.flatMap(new Mapper<String, Future<Integer>>() {
public Future<Integer> apply(final String s) { public Future<Integer> apply(final String s) {
return future(new Callable<Integer>() { return future(new Callable<Integer>() {
public Integer call() { public Integer call() {
@ -204,7 +201,7 @@ public class FutureDocTestBase {
// Find the sum of the odd numbers // Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map( Future<Long> futureSum = futureListOfInts.map(
new Function<Iterable<Integer>, Long>() { new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) { public Long apply(Iterable<Integer> ints) {
long sum = 0; long sum = 0;
for (Integer i : ints) for (Integer i : ints)
@ -306,24 +303,87 @@ public class FutureDocTestBase {
//#filter //#filter
Future<Integer> future1 = Futures.successful(4, system.dispatcher()); Future<Integer> future1 = Futures.successful(4, system.dispatcher());
Future<Integer> successfulFilter = Future<Integer> successfulFilter =
future1.filter(new Function<Integer, Boolean>() { future1.filter(new Filter<Integer>() {
public Boolean apply(Integer i) { return i % 2 == 0; } public boolean filter(Integer i) { return i % 2 == 0; }
}); });
Future<Integer> failedFilter = Future<Integer> failedFilter =
future1.filter(new Function<Integer, Boolean>() { future1.filter(new Filter<Integer>() {
public Boolean apply(Integer i) { return i % 2 != 0; } public boolean filter(Integer i) { return i % 2 != 0; }
}); });
//When filter fails, the returned Future will be failed with a scala.MatchError //When filter fails, the returned Future will be failed with a scala.MatchError
//#filter //#filter
} }
public void sendToTheInternetz(String s) {
}
public void sendToIssueTracker(Throwable t) {
}
@Test public void useAndThen() {
//#and-then
Future<String> future1 = Futures.successful("value", system.dispatcher()).
andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) sendToIssueTracker(failure);
}
}).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null) sendToTheInternetz(result);
}
});
//#and-then
}
@Test public void useRecover() {
//#recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recover(new Recover<Integer>() {
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) return 0;
else throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#recover
}
@Test public void useTryRecover() {
//#try-recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).tryRecover(new Recover<Future<Integer>>() {
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
}, system.dispatcher());
}
else throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#try-recover
}
@Test public void useOnSuccessOnFailureAndOnComplete() { @Test public void useOnSuccessOnFailureAndOnComplete() {
{ {
Future<String> future = Futures.successful("foo", system.dispatcher()); Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess //#onSuccess
future.onSuccess(new Procedure<String>() { future.onSuccess(new OnSuccess<String>() {
public void apply(String result) { public void onSuccess(String result) {
if ("bar" == result) { if ("bar" == result) {
//Do something if it resulted in "bar" //Do something if it resulted in "bar"
} else { } else {
@ -337,8 +397,8 @@ public class FutureDocTestBase {
Future<String> future = Future<String> future =
Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure //#onFailure
future.onFailure( new Procedure<Throwable>() { future.onFailure( new OnFailure() {
public void apply(Throwable failure) { public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) { if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure //Do something if it was this particular failure
} else { } else {
@ -351,8 +411,8 @@ public class FutureDocTestBase {
{ {
Future<String> future = Futures.successful("foo", system.dispatcher()); Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete //#onComplete
future.onComplete(new Procedure2<Throwable, String>() { future.onComplete(new OnComplete<String>() {
public void apply(Throwable failure, String result) { public void onComplete(Throwable failure, String result) {
if (failure != null) { if (failure != null) {
//We got a failure, handle it here //We got a failure, handle it here
} else { } else {
@ -370,7 +430,7 @@ public class FutureDocTestBase {
Future<String> future1 = Futures.successful("foo", system.dispatcher()); Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher()); Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 = Future<String> future3 =
future1.zip(future2).map(new Function<scala.Tuple2<String,String>, String>() { future1.zip(future2).map(new Mapper<scala.Tuple2<String,String>, String>() {
public String apply(scala.Tuple2<String,String> zipped) { public String apply(scala.Tuple2<String,String> zipped) {
return zipped._1() + " " + zipped._2(); return zipped._1() + " " + zipped._2();
} }

View file

@ -67,7 +67,7 @@ These allow you to create 'pipelines' or 'streams' that the result will travel t
Future is a Monad Future is a Monad
^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^
The first method for working with ``Future`` functionally is ``map``. This method takes a ``Function`` which performs The first method for working with ``Future`` functionally is ``map``. This method takes a ``Mapper`` which performs
some operation on the result of the ``Future``, and returning a new result. some operation on the result of the ``Future``, and returning a new result.
The return value of the ``map`` method is another ``Future`` that will contain the new result: The return value of the ``map`` method is another ``Future`` that will contain the new result:
@ -176,6 +176,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which
.. includecode:: code/akka/docs/future/FutureDocTestBase.java .. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: onComplete :include: onComplete
Ordering
--------
Since callbacks are executed in any order and potentially in parallel,
it can be tricky at the times when you need sequential ordering of operations.
But there's a solution! And it's name is ``andThen``, and it creates a new Future with
the specified callback, a Future that will have the same result as the Future it's called on,
which allows for ordering like in the following sample:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: and-then
Auxiliary methods Auxiliary methods
----------------- -----------------
@ -197,4 +209,22 @@ Exceptions
Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently.
It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught It doesn't matter if an ``UntypedActor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught
the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``,
calling ``Await.result`` will cause it to be thrown again so it can be handled properly. calling ``Await.result`` will cause it to be thrown again so it can be handled properly.
It is also possible to handle an ``Exception`` by returning a different result.
This is done with the ``recover`` method. For example:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: recover
In this example, if the actor replied with a ``akka.actor.Status.Failure`` containing the ``ArithmeticException``,
our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks,
so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way
it will behave as if we hadn't used the ``recover`` method.
You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``,
and is use like this:
.. includecode:: code/akka/docs/future/FutureDocTestBase.java
:include: try-recover

View file

@ -13,6 +13,7 @@ import akka.dispatch.Future
import akka.dispatch.Await import akka.dispatch.Await
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Promise import akka.dispatch.Promise
import java.lang.IllegalStateException
object FutureDocSpec { object FutureDocSpec {
@ -266,6 +267,19 @@ class FutureDocSpec extends AkkaSpec {
Await.result(future, 1 second) must be(0) Await.result(future, 1 second) must be(0)
} }
"demonstrate usage of tryRecover" in {
implicit val timeout = system.settings.ActorTimeout
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#try-recover
val future = akka.pattern.ask(actor, msg1) tryRecover {
case e: ArithmeticException Promise.successful(0)
case foo: IllegalArgumentException Promise.failed[Int](new IllegalStateException("All br0ken!"))
}
//#try-recover
Await.result(future, 1 second) must be(0)
}
"demonstrate usage of zip" in { "demonstrate usage of zip" in {
val future1 = Future { "foo" } val future1 = Future { "foo" }
val future2 = Future { "bar" } val future2 = Future { "bar" }
@ -275,6 +289,21 @@ class FutureDocSpec extends AkkaSpec {
Await.result(future3, 1 second) must be("foo bar") Await.result(future3, 1 second) must be("foo bar")
} }
"demonstrate usage of andThen" in {
def loadPage(s: String) = s
val url = "foo bar"
def log(cause: Throwable) = ()
def watchSomeTV = ()
//#and-then
val result = Future { loadPage(url) } andThen {
case Left(exception) log(exception)
} andThen {
case _ watchSomeTV
}
//#and-then
Await.result(result, 1 second) must be("foo bar")
}
"demonstrate usage of or" in { "demonstrate usage of or" in {
val future1 = Future { "foo" } val future1 = Future { "foo" }
val future2 = Future { "bar" } val future2 = Future { "bar" }

View file

@ -198,6 +198,18 @@ For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which
.. includecode:: code/akka/docs/future/FutureDocSpec.scala .. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: onComplete :include: onComplete
Ordering
--------
Since callbacks are executed in any order and potentially in parallel,
it can be tricky at the times when you need sequential ordering of operations.
But there's a solution! And it's name is ``andThen``, and it creates a new Future with
the specified callback, a Future that will have the same result as the Future it's called on,
which allows for ordering like in the following sample:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: and-then
Auxiliary methods Auxiliary methods
----------------- -----------------
@ -232,3 +244,9 @@ our ``Future`` would have a result of 0. The ``recover`` method works very simil
so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way
it will behave as if we hadn't used the ``recover`` method. it will behave as if we hadn't used the ``recover`` method.
You can also use the ``tryRecover`` method, which has the same relationship to ``recover`` as ``flatMap` has to ``map``,
and is use like this:
.. includecode:: code/akka/docs/future/FutureDocSpec.scala
:include: try-recover