diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala deleted file mode 100644 index 7ac129e254..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ /dev/null @@ -1,145 +0,0 @@ -package akka.dispatch - -import Future.flow -import akka.util.cps._ -import akka.util.Timeout -import akka.util.duration._ -import akka.testkit.AkkaSpec -import akka.testkit.DefaultTimeout - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { - - "A PromiseStream" must { - - "work" in { - val a, b, c = Promise[Int]() - val q = PromiseStream[Int]() - flow { q << (1, 2, 3) } - flow { - a << q() - b << q - c << q() - } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - } - - "pend" in { - val a, b, c = Promise[Int]() - val q = PromiseStream[Int]() - flow { - a << q - b << q() - c << q - } - flow { q <<< List(1, 2, 3) } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - } - - "pend again" in { - val a, b, c, d = Promise[Int]() - val q1, q2 = PromiseStream[Int]() - val oneTwo = Future(List(1, 2)) - flow { - a << q2 - b << q2 - q1 << 3 << 4 - } - flow { - q2 <<< oneTwo - c << q1 - d << q1 - } - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - assert(Await.result(d, timeout.duration) === 4) - } - - "enque" in { - val q = PromiseStream[Int]() - val a = q.dequeue() - val b = q.dequeue() - val c, d = Promise[Int]() - flow { - c << q - d << q - } - q ++= List(1, 2, 3, 4) - - assert(Await.result(a, timeout.duration) === 1) - assert(Await.result(b, timeout.duration) === 2) - assert(Await.result(c, timeout.duration) === 3) - assert(Await.result(d, timeout.duration) === 4) - } - - "map" in { - val qs = PromiseStream[String]() - val qi = qs.map(_.length) - val a, c = Promise[Int]() - val b = Promise[String]() - flow { - a << qi - b << qs - c << qi - } - flow { - qs << ("Hello", "World!", "Test") - } - assert(Await.result(a, timeout.duration) === 5) - assert(Await.result(b, timeout.duration) === "World!") - assert(Await.result(c, timeout.duration) === 4) - } - - "map futures" in { - val q = PromiseStream[String]() - flow { - q << (Future("a"), Future("b"), Future("c")) - } - val a, b, c = q.dequeue - Await.result(a, timeout.duration) must be("a") - Await.result(b, timeout.duration) must be("b") - Await.result(c, timeout.duration) must be("c") - } - - "not fail under concurrent stress" in { - implicit val timeout = Timeout(60 seconds) - val q = PromiseStream[Long](timeout.duration.toMillis) - - flow { - var n = 0L - repeatC(50000) { - n += 1 - q << n - } - } - - val future = Future sequence { - List.fill(10) { - flow { - var total = 0L - repeatC(10000) { - val n = q() - total += n - } - total - } - } - } map (_.sum) - - flow { - var n = 50000L - repeatC(50000) { - n += 1 - q << n - } - } - - assert(Await.result(future, timeout.duration) === (1L to 100000L).sum) - } - } -} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index eea0ecbeef..54ec2d08b4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -819,21 +819,6 @@ trait Promise[T] extends Future[T] { } fr } - - final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = Promise[Any]() - val f = stream.dequeue(this) - f.onComplete { _ ⇒ - try { - fr completeWith cont(f) - } catch { - case NonFatal(e) ⇒ - executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e)) - fr failure e - } - } - fr - } } //Companion object to FState, just to provide a cheap, immutable default entry diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala deleted file mode 100644 index 882219f84d..0000000000 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dispatch - -import java.util.concurrent.atomic.AtomicReference -import scala.util.continuations._ -import scala.annotation.tailrec -import akka.util.Timeout - -object PromiseStream { - def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A] - def apply[A](timeout: Long)(implicit dispatcher: MessageDispatcher): PromiseStream[A] = new PromiseStream[A]()(dispatcher, Timeout(timeout)) - - private sealed trait State - private case object Normal extends State - private case object Pending extends State - private case object Busy extends State -} - -trait PromiseStreamOut[A] { - self ⇒ - - def dequeue(): Future[A] - - def dequeue(promise: Promise[A]): Future[A] - - def apply(): A @cps[Future[Any]] - - def apply(promise: Promise[A]): A @cps[Future[Any]] - - final def map[B](f: (A) ⇒ B)(implicit timeout: Timeout): PromiseStreamOut[B] = new PromiseStreamOut[B] { - - def dequeue(): Future[B] = self.dequeue().map(f) - - def dequeue(promise: Promise[B]): Future[B] = self.dequeue().flatMap(a ⇒ promise.complete(Right(f(a)))) - - def apply(): B @cps[Future[Any]] = this.dequeue().apply() - - def apply(promise: Promise[B]): B @cps[Future[Any]] = this.dequeue(promise).apply() - - } - -} - -trait PromiseStreamIn[A] { - - def enqueue(elem: A): Unit - - final def enqueue(elem1: A, elem2: A, elems: A*): Unit = - this += elem1 += elem2 ++= elems - - final def enqueue(elem: Future[A]): Unit = - elem foreach (enqueue(_)) - - final def enqueue(elem1: Future[A], elem2: Future[A], elems: Future[A]*) { - this += elem1 += elem2 - elems foreach (enqueue(_)) - } - - final def +=(elem: A): this.type = { - enqueue(elem) - this - } - - final def +=(elem1: A, elem2: A, elems: A*): this.type = { - enqueue(elem1, elem2, elems: _*) - this - } - - final def +=(elem: Future[A]): this.type = { - enqueue(elem) - this - } - - final def +=(elem1: Future[A], elem2: Future[A], elems: Future[A]*): this.type = { - enqueue(elem1, elem2, elems: _*) - this - } - - final def ++=(elem: Traversable[A]): this.type = { - elem foreach enqueue - this - } - - final def ++=(elem: Future[Traversable[A]]): this.type = { - elem foreach (this ++= _) - this - } - - def <<(elem: A): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem1: A, elem2: A, elems: A*): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem: Future[A]): PromiseStreamIn[A] @cps[Future[Any]] - - def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStreamIn[A] @cps[Future[Any]] - - def <<<(elems: Traversable[A]): PromiseStreamIn[A] @cps[Future[Any]] - - def <<<(elems: Future[Traversable[A]]): PromiseStreamIn[A] @cps[Future[Any]] - -} - -class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: Timeout) extends PromiseStreamOut[A] with PromiseStreamIn[A] { - import PromiseStream.{ State, Normal, Pending, Busy } - - private val _elemOut: AtomicReference[List[A]] = new AtomicReference(Nil) - private val _elemIn: AtomicReference[List[A]] = new AtomicReference(Nil) - private val _pendOut: AtomicReference[List[Promise[A]]] = new AtomicReference(null) - private val _pendIn: AtomicReference[List[Promise[A]]] = new AtomicReference(null) - private val _state: AtomicReference[State] = new AtomicReference(Normal) - - @tailrec - final def apply(): A @cps[Future[Any]] = - if (_state.get eq Normal) { - val eo = _elemOut.get - if (eo eq null) apply() - else { - if (eo.nonEmpty) { - if (_elemOut.compareAndSet(eo, eo.tail)) shift { cont: (A ⇒ Future[Any]) ⇒ cont(eo.head) } - else apply() - } else apply(Promise[A]) - } - } else apply(Promise[A]) - - final def apply(promise: Promise[A]): A @cps[Future[Any]] = - shift { cont: (A ⇒ Future[Any]) ⇒ dequeue(promise) flatMap cont } - - @tailrec - final def enqueue(elem: A): Unit = _state.get match { - case Normal ⇒ - val ei = _elemIn.get - if (ei eq null) enqueue(elem) - else if (!_elemIn.compareAndSet(ei, elem :: ei)) enqueue(elem) - - case Pending ⇒ - val po = _pendOut.get - if (po eq null) enqueue(elem) - else { - if (po.isEmpty) { - if (_state.compareAndSet(Pending, Busy)) { - var nextState: State = Pending - try { - val pi = _pendIn.get - if (pi ne null) { - if (pi.isEmpty) { - if (_pendIn.compareAndSet(Nil, null)) { - if (_pendOut.compareAndSet(Nil, null)) { - _elemIn.set(Nil) - _elemOut.set(List(elem)) - nextState = Normal - } else { - _pendIn.set(Nil) - } - } - } else { - if (_pendOut.get eq Nil) _pendOut.set(_pendIn.getAndSet(Nil).reverse) - } - } - } finally { - _state.set(nextState) - } - if (nextState eq Pending) enqueue(elem) - } else enqueue(elem) - } else { - if (_pendOut.compareAndSet(po, po.tail)) { - po.head success elem - if (!po.head.isCompleted) enqueue(elem) - } else enqueue(elem) - } - } - - case Busy ⇒ - enqueue(elem) - } - - @tailrec - final def dequeue(): Future[A] = - if (_state.get eq Normal) { - val eo = _elemOut.get - if (eo eq null) dequeue() - else { - if (eo.nonEmpty) { - if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head) - else dequeue() - } else dequeue(Promise[A]) - } - } else dequeue(Promise[A]) - - @tailrec - final def dequeue(promise: Promise[A]): Future[A] = _state.get match { - case Pending ⇒ - val pi = _pendIn.get - if ((pi ne null) && _pendIn.compareAndSet(pi, promise :: pi)) promise else dequeue(promise) - - case Normal ⇒ - val eo = _elemOut.get - if (eo eq null) dequeue(promise) - else { - if (eo.isEmpty) { - if (_state.compareAndSet(Normal, Busy)) { - var nextState: State = Normal - try { - val ei = _elemIn.get - if (ei ne null) { - if (ei.isEmpty) { - if (_elemIn.compareAndSet(Nil, null)) { - if (_elemOut.compareAndSet(Nil, null)) { - _pendIn.set(Nil) - _pendOut.set(List(promise)) - nextState = Pending - } else { - _elemIn.set(Nil) - } - } - } else { - if (_elemOut.get eq Nil) _elemOut.set(_elemIn.getAndSet(Nil).reverse) - } - } - } finally { - _state.set(nextState) - } - if (nextState eq Normal) dequeue(promise) - else promise - } else dequeue(promise) - } else { - if (_elemOut.compareAndSet(eo, eo.tail)) { - promise success eo.head - } else dequeue(promise) - } - } - - case Busy ⇒ - dequeue(promise) - } - - final def <<(elem: A): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this += elem) } - - final def <<(elem1: A, elem2: A, elems: A*): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this += (elem1, elem2, elems: _*)) } - - final def <<(elem: Future[A]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ elem map (a ⇒ cont(this += a)) } - - final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ - val seq = Future.sequence(elem1 +: elem2 +: elems) - seq map (a ⇒ cont(this ++= a)) - } - - final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ cont(this ++= elems) } - - final def <<<(elems: Future[Traversable[A]]): PromiseStream[A] @cps[Future[Any]] = - shift { cont: (PromiseStream[A] ⇒ Future[Any]) ⇒ elems map (as ⇒ cont(this ++= as)) } - -}