From bf2a8cdec6b72b55577ddadabd2ad7bd405094e5 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 18 Jun 2011 20:13:13 -0600 Subject: [PATCH] Better solution to ticket #853, better performance for DefaultPromise if already completed, especially if performing chained callbacks --- .../src/main/scala/akka/dispatch/Future.scala | 388 +++++++++--------- 1 file changed, 185 insertions(+), 203 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b73fe4bde2..d50d6e855f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -428,7 +428,40 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - def collect[A](pf: PartialFunction[Any, A]): Future[A] + final def collect[A](pf: PartialFunction[Any, A]): Future[A] = value match { + case Some(Right(r)) ⇒ + new KeptPromise[A](try { + if (pf isDefinedAt r) + Right(pf(r)) + else + Left(new MatchError(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + }) + case Some(_) ⇒ + this.asInstanceOf[Future[A]] + case None ⇒ + val future = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { self ⇒ + future complete { + self.value.get match { + case Right(r) ⇒ + try { + if (pf isDefinedAt r) Right(pf(r)) + else Left(new MatchError(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + } + case v ⇒ v.asInstanceOf[Either[Throwable, A]] + } + } + } + future + } /** * Creates a new Future that will handle any matching Throwable that this @@ -441,7 +474,39 @@ sealed trait Future[+T] { * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * */ - def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = value match { + case Some(Left(e)) ⇒ + try { + if (pf isDefinedAt e) + new KeptPromise(Right(pf(e))) + else + this.asInstanceOf[Future[A]] + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + new KeptPromise(Left(e)) + } + case Some(_) ⇒ + this.asInstanceOf[Future[A]] + case None ⇒ + val future = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { self ⇒ + future complete { + self.value.get match { + case Left(e) ⇒ + try { + if (pf isDefinedAt e) Right(pf(e)) + else Left(e) + } catch { + case x: Exception ⇒ + Left(x) + } + case v ⇒ v + } + } + } + future + } /** * Creates a new Future by applying a function to the successful result of @@ -456,13 +521,65 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - def map[A](f: T ⇒ A): Future[A] + final def map[A](f: T ⇒ A): Future[A] = value match { + case Some(Right(r)) ⇒ + new KeptPromise[A](try { + Right(f(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + }) + case Some(_) ⇒ + this.asInstanceOf[Future[A]] + case None ⇒ + val future = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { self ⇒ + future complete { + self.value.get match { + case Right(r) ⇒ + try { + Right(f(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + } + case v ⇒ v.asInstanceOf[Either[Throwable, A]] + } + } + } + future + } /** * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. */ - def mapTo[A](implicit m: Manifest[A]): Future[A] + final def mapTo[A](implicit m: Manifest[A]): Future[A] = value match { + case Some(Right(t)) ⇒ + new KeptPromise(try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + }) + case Some(_) ⇒ + this.asInstanceOf[Future[A]] + case None ⇒ + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { ft ⇒ + fa complete (ft.value.get match { + case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] + case Right(t) ⇒ + try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + } + }) + } + fa + } /** * Creates a new Future by applying a function to the successful result of @@ -478,7 +595,34 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - def flatMap[A](f: T ⇒ Future[A]): Future[A] + final def flatMap[A](f: T ⇒ Future[A]): Future[A] = value match { + case Some(Right(r)) ⇒ + try { + f(r) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + new KeptPromise(Left(e)) + } + case Some(_) ⇒ + this.asInstanceOf[Future[A]] + case None ⇒ + val future = new DefaultPromise[A](timeoutInNanos, NANOS) + onComplete { + _.value.get match { + case Right(r) ⇒ + try { + future completeWith f(r) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + future complete Left(e) + } + case v ⇒ future complete v.asInstanceOf[Either[Throwable, A]] + } + } + future + } final def foreach(f: T ⇒ Unit): Unit = onComplete { _.result match { @@ -496,7 +640,42 @@ sealed trait Future[+T] { def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - def filter(p: T ⇒ Boolean): Future[T] + final def filter(p: T ⇒ Boolean): Future[T] = value match { + case Some(Right(r)) ⇒ + try { + if (p(r)) + this + else + new KeptPromise(Left(new MatchError(r))) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + new KeptPromise(Left(e)) + } + case Some(_) ⇒ + this + case None ⇒ + val future = new DefaultPromise[T](timeoutInNanos, NANOS) + onComplete { self ⇒ + future complete { + self.value.get match { + case Right(r) ⇒ + try { + if (p(r)) + Right(r) + else + Left(new MatchError(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + } + case v ⇒ v + } + } + } + future + } /** * Returns the current result, throws the exception if one has been raised, else returns None @@ -764,124 +943,6 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { } } - final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val future = new DefaultPromise[A](timeoutInNanos, NANOS) - onComplete { self ⇒ - future complete { - self.value.get match { - case Right(r) ⇒ - try { - if (pf isDefinedAt r) Right(pf(r)) - else Left(new MatchError(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - } - case v ⇒ v.asInstanceOf[Either[Throwable, A]] - } - } - } - future - } - - final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val future = new DefaultPromise[A](timeoutInNanos, NANOS) - onComplete { self ⇒ - future complete { - self.value.get match { - case Left(e) ⇒ - try { - if (pf isDefinedAt e) Right(pf(e)) - else Left(e) - } catch { - case x: Exception ⇒ - Left(x) - } - case v ⇒ v - } - } - } - future - } - - final def map[A](f: T ⇒ A): Future[A] = { - val future = new DefaultPromise[A](timeoutInNanos, NANOS) - onComplete { self ⇒ - future complete { - self.value.get match { - case Right(r) ⇒ - try { - Right(f(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - } - case v ⇒ v.asInstanceOf[Either[Throwable, A]] - } - } - } - future - } - - final def mapTo[A](implicit m: Manifest[A]): Future[A] = { - val fa = new DefaultPromise[A](timeoutInNanos, NANOS) - onComplete { ft ⇒ - fa complete (ft.value.get match { - case l: Left[_, _] ⇒ l.asInstanceOf[Either[Throwable, A]] - case Right(t) ⇒ - try { - Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) - } catch { - case e: ClassCastException ⇒ Left(e) - } - }) - } - fa - } - - final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val future = new DefaultPromise[A](timeoutInNanos, NANOS) - onComplete { - _.value.get match { - case Right(r) ⇒ - try { - future completeWith f(r) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - future complete Left(e) - } - case v ⇒ future complete v.asInstanceOf[Either[Throwable, A]] - } - } - future - } - - final def filter(p: T ⇒ Boolean): Future[T] = { - val future = new DefaultPromise[T](timeoutInNanos, NANOS) - onComplete { self ⇒ - future complete { - self.value.get match { - case Right(r) ⇒ - try { - if (p(r)) - Right(r) - else - Left(new MatchError(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - } - case v ⇒ v - } - } - } - future - } - @inline private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) @inline @@ -932,83 +993,4 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise final def onTimeout(func: Future[T] ⇒ Unit): this.type = this - final def collect[A](pf: PartialFunction[Any, A]): Future[A] = value.get match { - case Right(r) ⇒ - new KeptPromise(try { - if (pf isDefinedAt r) - Right(pf(r)) - else - Left(new MatchError(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - }) - case _ ⇒ this.asInstanceOf[KeptPromise[A]] - } - - final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = value.get match { - case Left(e) ⇒ - try { - if (pf isDefinedAt e) - new KeptPromise(Right(pf(e))) - else - this.asInstanceOf[KeptPromise[A]] - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - new KeptPromise(Left(e)) - } - case _ ⇒ this.asInstanceOf[KeptPromise[A]] - } - - final def map[A](f: T ⇒ A): Future[A] = value.get match { - case Right(r) ⇒ - new KeptPromise(try { - Right(f(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - }) - case _ ⇒ this.asInstanceOf[KeptPromise[A]] - } - - final def mapTo[A](implicit m: Manifest[A]): Future[A] = value.get match { - case Right(t) ⇒ - new KeptPromise(try { - Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) - } catch { - case e: ClassCastException ⇒ Left(e) - }) - case _ ⇒ this.asInstanceOf[KeptPromise[A]] - } - - final def flatMap[A](f: T ⇒ Future[A]): Future[A] = value.get match { - case Right(r) ⇒ - try { - f(r) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - new KeptPromise(Left(e)) - } - case _ ⇒ this.asInstanceOf[KeptPromise[A]] - } - - final def filter(p: T ⇒ Boolean): Future[T] = value.get match { - case Right(r) ⇒ - try { - if (p(r)) - this - else - new KeptPromise(Left(new MatchError(r))) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - new KeptPromise(Left(e)) - } - case _ ⇒ this - } - }