Make better use of implicit Timeouts, fixes problem with using KeptPromise with aggregate methods (like traverse and sequence)

This commit is contained in:
Derek Williams 2011-06-25 14:20:54 -06:00
parent 2abb768e85
commit 1d710cc0a4
2 changed files with 31 additions and 18 deletions

View file

@ -404,7 +404,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
latch.open
assert(f2.get === 10)
val f3 = Future({ Thread.sleep(100); 5 }, 10)
val f3 = Future({ Thread.sleep(10); 5 }, 10)
intercept[FutureTimeoutException] {
f3.get
}
@ -431,7 +431,7 @@ class FutureSpec extends WordSpec with MustMatchers with Checkers with BeforeAnd
val x = Future("Hello")
val y = x map (_.length)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString)(), 100)
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
intercept[java.lang.ArithmeticException](r.get)
}

View file

@ -55,13 +55,13 @@ object Futures {
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Timeout, dispatcher: MessageDispatcher): Future[T] =
Future(body.call, timeout)(dispatcher)
Future(body.call)(dispatcher, timeout)
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], timeout: Long, dispatcher: MessageDispatcher): Future[T] =
Future(body.call, timeout)(dispatcher)
Future(body.call)(dispatcher, timeout)
/**
* Returns a Future to the result of the first future in the list that is completed
@ -111,7 +111,8 @@ object Futures {
case e: Exception
EventHandler.error(e, this, e.getMessage)
result completeWithException e
} finally {
}
finally {
results.clear
}
}
@ -232,9 +233,15 @@ object Future {
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T, timeout: Timeout = Timeout.default)(implicit dispatcher: MessageDispatcher): Future[T] =
def apply[T](body: T)(implicit dispatcher: MessageDispatcher, timeout: Timeout = implicitly): Future[T] =
dispatcher.dispatchFuture(() body, timeout)
def apply[T](body: T, timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
apply(body)(dispatcher, timeout)
def apply[T](body: T, timeout: Long)(implicit dispatcher: MessageDispatcher): Future[T] =
apply(body)(dispatcher, timeout)
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
@ -242,8 +249,11 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout = Timeout.default)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout): Future[M[A]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
sequence(in)(cbf, timeout)
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B].
@ -253,12 +263,15 @@ object Future {
* val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout = Timeout.default)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout): Future[M[B]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
}.map(_.result)
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
traverse(in)(fn)(cbf, timeout)
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
@ -275,7 +288,7 @@ object Future {
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]], timeout: Timeout = Timeout.default): Future[A] = {
def flow[A](body: A @cps[Future[Any]])(implicit timeout: Timeout): Future[A] = {
val future = Promise[A](timeout)
(reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete {
_.exception match {
@ -450,7 +463,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = value match {
final def collect[A](pf: PartialFunction[Any, A])(implicit timeout: Timeout): Future[A] = value match {
case Some(Right(r))
new KeptPromise[A](try {
if (pf isDefinedAt r)
@ -496,7 +509,7 @@ sealed trait Future[+T] {
* Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* </pre>
*/
final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = value match {
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = value match {
case Some(Left(e))
try {
if (pf isDefinedAt e)
@ -543,7 +556,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def map[A](f: T A): Future[A] = value match {
final def map[A](f: T A)(implicit timeout: Timeout): Future[A] = value match {
case Some(Right(r))
new KeptPromise[A](try {
Right(f(r))
@ -578,7 +591,7 @@ sealed trait Future[+T] {
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[A](implicit m: Manifest[A]): Future[A] = value match {
final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = value match {
case Some(Right(t))
new KeptPromise(try {
Right(BoxedType(m.erasure).cast(t).asInstanceOf[A])
@ -617,7 +630,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def flatMap[A](f: T Future[A]): Future[A] = value match {
final def flatMap[A](f: T Future[A])(implicit timeout: Timeout): Future[A] = value match {
case Some(Right(r))
try {
f(r)
@ -655,14 +668,14 @@ sealed trait Future[+T] {
final def withFilter(p: T Boolean) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean) {
final class FutureWithFilter[+A](self: Future[A], p: A Boolean)(implicit timeout: Timeout) {
def foreach(f: A Unit): Unit = self filter p foreach f
def map[B](f: A B): Future[B] = self filter p map f
def flatMap[B](f: A Future[B]): Future[B] = self filter p flatMap f
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
final def filter(p: T Boolean): Future[T] = value match {
final def filter(p: T Boolean)(implicit timeout: Timeout): Future[T] = value match {
case Some(Right(r))
try {
if (p(r))