#2046 - Remove PromiseStream

This commit is contained in:
Viktor Klang 2012-05-07 19:49:55 +02:00
parent ca0d400a3a
commit 57ca80ab50
3 changed files with 0 additions and 420 deletions

View file

@ -1,145 +0,0 @@
package akka.dispatch
import Future.flow
import akka.util.cps._
import akka.util.Timeout
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
"A PromiseStream" must {
"work" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow { q << (1, 2, 3) }
flow {
a << q()
b << q
c << q()
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow {
a << q
b << q()
c << q
}
flow { q <<< List(1, 2, 3) }
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
}
"pend again" in {
val a, b, c, d = Promise[Int]()
val q1, q2 = PromiseStream[Int]()
val oneTwo = Future(List(1, 2))
flow {
a << q2
b << q2
q1 << 3 << 4
}
flow {
q2 <<< oneTwo
c << q1
d << q1
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"enque" in {
val q = PromiseStream[Int]()
val a = q.dequeue()
val b = q.dequeue()
val c, d = Promise[Int]()
flow {
c << q
d << q
}
q ++= List(1, 2, 3, 4)
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
"map" in {
val qs = PromiseStream[String]()
val qi = qs.map(_.length)
val a, c = Promise[Int]()
val b = Promise[String]()
flow {
a << qi
b << qs
c << qi
}
flow {
qs << ("Hello", "World!", "Test")
}
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === "World!")
assert(Await.result(c, timeout.duration) === 4)
}
"map futures" in {
val q = PromiseStream[String]()
flow {
q << (Future("a"), Future("b"), Future("c"))
}
val a, b, c = q.dequeue
Await.result(a, timeout.duration) must be("a")
Await.result(b, timeout.duration) must be("b")
Await.result(c, timeout.duration) must be("c")
}
"not fail under concurrent stress" in {
implicit val timeout = Timeout(60 seconds)
val q = PromiseStream[Long](timeout.duration.toMillis)
flow {
var n = 0L
repeatC(50000) {
n += 1
q << n
}
}
val future = Future sequence {
List.fill(10) {
flow {
var total = 0L
repeatC(10000) {
val n = q()
total += n
}
total
}
}
} map (_.sum)
flow {
var n = 50000L
repeatC(50000) {
n += 1
q << n
}
}
assert(Await.result(future, timeout.duration) === (1L to 100000L).sum)
}
}
}

View file

@ -819,21 +819,6 @@ trait Promise[T] extends Future[T] {
}
fr
}
final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] Future[Any])
val fr = Promise[Any]()
val f = stream.dequeue(this)
f.onComplete { _
try {
fr completeWith cont(f)
} catch {
case NonFatal(e)
executor.reportFailure(new LogEventException(Debug("Future", getClass, e.getMessage), e))
fr failure e
}
}
fr
}
}
//Companion object to FState, just to provide a cheap, immutable default entry

View file

@ -1,260 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import java.util.concurrent.atomic.AtomicReference
import scala.util.continuations._
import scala.annotation.tailrec
import akka.util.Timeout
object PromiseStream {
def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A]
def apply[A](timeout: Long)(implicit dispatcher: MessageDispatcher): PromiseStream[A] = new PromiseStream[A]()(dispatcher, Timeout(timeout))
private sealed trait State
private case object Normal extends State
private case object Pending extends State
private case object Busy extends State
}
trait PromiseStreamOut[A] {
self
def dequeue(): Future[A]
def dequeue(promise: Promise[A]): Future[A]
def apply(): A @cps[Future[Any]]
def apply(promise: Promise[A]): A @cps[Future[Any]]
final def map[B](f: (A) B)(implicit timeout: Timeout): PromiseStreamOut[B] = new PromiseStreamOut[B] {
def dequeue(): Future[B] = self.dequeue().map(f)
def dequeue(promise: Promise[B]): Future[B] = self.dequeue().flatMap(a promise.complete(Right(f(a))))
def apply(): B @cps[Future[Any]] = this.dequeue().apply()
def apply(promise: Promise[B]): B @cps[Future[Any]] = this.dequeue(promise).apply()
}
}
trait PromiseStreamIn[A] {
def enqueue(elem: A): Unit
final def enqueue(elem1: A, elem2: A, elems: A*): Unit =
this += elem1 += elem2 ++= elems
final def enqueue(elem: Future[A]): Unit =
elem foreach (enqueue(_))
final def enqueue(elem1: Future[A], elem2: Future[A], elems: Future[A]*) {
this += elem1 += elem2
elems foreach (enqueue(_))
}
final def +=(elem: A): this.type = {
enqueue(elem)
this
}
final def +=(elem1: A, elem2: A, elems: A*): this.type = {
enqueue(elem1, elem2, elems: _*)
this
}
final def +=(elem: Future[A]): this.type = {
enqueue(elem)
this
}
final def +=(elem1: Future[A], elem2: Future[A], elems: Future[A]*): this.type = {
enqueue(elem1, elem2, elems: _*)
this
}
final def ++=(elem: Traversable[A]): this.type = {
elem foreach enqueue
this
}
final def ++=(elem: Future[Traversable[A]]): this.type = {
elem foreach (this ++= _)
this
}
def <<(elem: A): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem1: A, elem2: A, elems: A*): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem: Future[A]): PromiseStreamIn[A] @cps[Future[Any]]
def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStreamIn[A] @cps[Future[Any]]
def <<<(elems: Traversable[A]): PromiseStreamIn[A] @cps[Future[Any]]
def <<<(elems: Future[Traversable[A]]): PromiseStreamIn[A] @cps[Future[Any]]
}
class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: Timeout) extends PromiseStreamOut[A] with PromiseStreamIn[A] {
import PromiseStream.{ State, Normal, Pending, Busy }
private val _elemOut: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _elemIn: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _pendOut: AtomicReference[List[Promise[A]]] = new AtomicReference(null)
private val _pendIn: AtomicReference[List[Promise[A]]] = new AtomicReference(null)
private val _state: AtomicReference[State] = new AtomicReference(Normal)
@tailrec
final def apply(): A @cps[Future[Any]] =
if (_state.get eq Normal) {
val eo = _elemOut.get
if (eo eq null) apply()
else {
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) shift { cont: (A Future[Any]) cont(eo.head) }
else apply()
} else apply(Promise[A])
}
} else apply(Promise[A])
final def apply(promise: Promise[A]): A @cps[Future[Any]] =
shift { cont: (A Future[Any]) dequeue(promise) flatMap cont }
@tailrec
final def enqueue(elem: A): Unit = _state.get match {
case Normal
val ei = _elemIn.get
if (ei eq null) enqueue(elem)
else if (!_elemIn.compareAndSet(ei, elem :: ei)) enqueue(elem)
case Pending
val po = _pendOut.get
if (po eq null) enqueue(elem)
else {
if (po.isEmpty) {
if (_state.compareAndSet(Pending, Busy)) {
var nextState: State = Pending
try {
val pi = _pendIn.get
if (pi ne null) {
if (pi.isEmpty) {
if (_pendIn.compareAndSet(Nil, null)) {
if (_pendOut.compareAndSet(Nil, null)) {
_elemIn.set(Nil)
_elemOut.set(List(elem))
nextState = Normal
} else {
_pendIn.set(Nil)
}
}
} else {
if (_pendOut.get eq Nil) _pendOut.set(_pendIn.getAndSet(Nil).reverse)
}
}
} finally {
_state.set(nextState)
}
if (nextState eq Pending) enqueue(elem)
} else enqueue(elem)
} else {
if (_pendOut.compareAndSet(po, po.tail)) {
po.head success elem
if (!po.head.isCompleted) enqueue(elem)
} else enqueue(elem)
}
}
case Busy
enqueue(elem)
}
@tailrec
final def dequeue(): Future[A] =
if (_state.get eq Normal) {
val eo = _elemOut.get
if (eo eq null) dequeue()
else {
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) Promise.successful(eo.head)
else dequeue()
} else dequeue(Promise[A])
}
} else dequeue(Promise[A])
@tailrec
final def dequeue(promise: Promise[A]): Future[A] = _state.get match {
case Pending
val pi = _pendIn.get
if ((pi ne null) && _pendIn.compareAndSet(pi, promise :: pi)) promise else dequeue(promise)
case Normal
val eo = _elemOut.get
if (eo eq null) dequeue(promise)
else {
if (eo.isEmpty) {
if (_state.compareAndSet(Normal, Busy)) {
var nextState: State = Normal
try {
val ei = _elemIn.get
if (ei ne null) {
if (ei.isEmpty) {
if (_elemIn.compareAndSet(Nil, null)) {
if (_elemOut.compareAndSet(Nil, null)) {
_pendIn.set(Nil)
_pendOut.set(List(promise))
nextState = Pending
} else {
_elemIn.set(Nil)
}
}
} else {
if (_elemOut.get eq Nil) _elemOut.set(_elemIn.getAndSet(Nil).reverse)
}
}
} finally {
_state.set(nextState)
}
if (nextState eq Normal) dequeue(promise)
else promise
} else dequeue(promise)
} else {
if (_elemOut.compareAndSet(eo, eo.tail)) {
promise success eo.head
} else dequeue(promise)
}
}
case Busy
dequeue(promise)
}
final def <<(elem: A): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this += elem) }
final def <<(elem1: A, elem2: A, elems: A*): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this += (elem1, elem2, elems: _*)) }
final def <<(elem: Future[A]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) elem map (a cont(this += a)) }
final def <<(elem1: Future[A], elem2: Future[A], elems: Future[A]*): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any])
val seq = Future.sequence(elem1 +: elem2 +: elems)
seq map (a cont(this ++= a))
}
final def <<<(elems: Traversable[A]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) cont(this ++= elems) }
final def <<<(elems: Future[Traversable[A]]): PromiseStream[A] @cps[Future[Any]] =
shift { cont: (PromiseStream[A] Future[Any]) elems map (as cont(this ++= as)) }
}