!str #15236 Replace Transformer with Stage

* replace all existing Transformer with Stage (PushPullStage)
* use Flow[ByteString, ByteString] as encoder/decoder transformer in http
* use the IteratorInterpreter for strict if possible
* emit then become
* emit then finish
* termination emits
* FlowTransformerSpec
* rework types to work with Java API
* rename and move things
* add scaladoc
This commit is contained in:
Patrik Nordwall 2014-11-12 10:43:39 +01:00
parent 299854905b
commit a82f266367
61 changed files with 1478 additions and 1518 deletions

View file

@ -3,25 +3,25 @@
*/
package akka.stream.impl.fusing
import scala.collection.immutable
import akka.stream.OverflowStrategy
import akka.stream.impl.FixedSizeBuffer
import scala.collection.immutable
import akka.stream.stage._
/**
* INTERNAL API
*/
private[akka] final case class Map[In, Out](f: In Out) extends TransitivePullOp[In, Out] {
override def onPush(elem: In, ctxt: Context[Out]): Directive = ctxt.push(f(elem))
private[akka] final case class Map[In, Out](f: In Out) extends PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): Directive = ctx.push(f(elem))
}
/**
* INTERNAL API
*/
private[akka] final case class Filter[T](p: T Boolean) extends TransitivePullOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive =
if (p(elem)) ctxt.push(elem)
else ctxt.pull()
private[akka] final case class Filter[T](p: T Boolean) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
private[akka] final object Collect {
@ -31,103 +31,103 @@ private[akka] final object Collect {
final val NotApplied: Any Any = _ Collect.NotApplied
}
private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends TransitivePullOp[In, Out] {
private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends PushStage[In, Out] {
import Collect.NotApplied
override def onPush(elem: In, ctxt: Context[Out]): Directive =
override def onPush(elem: In, ctx: Context[Out]): Directive =
pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctxt.pull()
case result: Out ctxt.push(result)
case NotApplied ctx.pull()
case result: Out ctx.push(result)
}
}
/**
* INTERNAL API
*/
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends DeterministicOp[In, Out] {
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends PushPullStage[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): Directive = {
currentIterator = f(elem).iterator
if (currentIterator.isEmpty) ctxt.pull()
else ctxt.push(currentIterator.next())
if (currentIterator.isEmpty) ctx.pull()
else ctx.push(currentIterator.next())
}
override def onPull(ctxt: Context[Out]): Directive =
if (currentIterator.hasNext) ctxt.push(currentIterator.next())
else if (isFinishing) ctxt.finish()
else ctxt.pull()
override def onPull(ctx: Context[Out]): Directive =
if (currentIterator.hasNext) ctx.push(currentIterator.next())
else if (ctx.isFinishing) ctx.finish()
else ctx.pull()
override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective =
ctxt.absorbTermination()
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
ctx.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Take[T](count: Int) extends TransitivePullOp[T, T] {
private[akka] final case class Take[T](count: Int) extends PushStage[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): Directive = {
left -= 1
if (left > 0) ctxt.push(elem)
else if (left == 0) ctxt.pushAndFinish(elem)
else ctxt.finish() //Handle negative take counts
if (left > 0) ctx.push(elem)
else if (left == 0) ctx.pushAndFinish(elem)
else ctx.finish() //Handle negative take counts
}
}
/**
* INTERNAL API
*/
private[akka] final case class Drop[T](count: Int) extends TransitivePullOp[T, T] {
private[akka] final case class Drop[T](count: Int) extends PushStage[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive =
override def onPush(elem: T, ctx: Context[T]): Directive =
if (left > 0) {
left -= 1
ctxt.pull()
} else ctxt.push(elem)
ctx.pull()
} else ctx.push(elem)
}
/**
* INTERNAL API
*/
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): Directive = {
val old = aggregator
aggregator = f(old, elem)
ctxt.push(old)
ctx.push(old)
}
override def onPull(ctxt: Context[Out]): Directive =
if (isFinishing) ctxt.pushAndFinish(aggregator)
else ctxt.pull()
override def onPull(ctx: Context[Out]): Directive =
if (ctx.isFinishing) ctx.pushAndFinish(aggregator)
else ctx.pull()
override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective = ctxt.absorbTermination()
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): Directive = {
aggregator = f(aggregator, elem)
ctxt.pull()
ctx.pull()
}
override def onPull(ctxt: Context[Out]): Directive =
if (isFinishing) ctxt.pushAndFinish(aggregator)
else ctxt.pull()
override def onPull(ctx: Context[Out]): Directive =
if (ctx.isFinishing) ctx.pushAndFinish(aggregator)
else ctx.pull()
override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective = ctxt.absorbTermination()
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] {
private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] {
private val buf = {
val b = Vector.newBuilder[T]
b.sizeHint(n)
@ -135,83 +135,83 @@ private[akka] final case class Grouped[T](n: Int) extends DeterministicOp[T, imm
}
private var left = n
override def onPush(elem: T, ctxt: Context[immutable.Seq[T]]): Directive = {
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): Directive = {
buf += elem
left -= 1
if (left == 0) {
val emit = buf.result()
buf.clear()
left = n
ctxt.push(emit)
} else ctxt.pull()
ctx.push(emit)
} else ctx.pull()
}
override def onPull(ctxt: Context[immutable.Seq[T]]): Directive =
if (isFinishing) {
override def onPull(ctx: Context[immutable.Seq[T]]): Directive =
if (ctx.isFinishing) {
val elem = buf.result()
buf.clear() //FIXME null out the reference to the `buf`?
left = n
ctxt.pushAndFinish(elem)
} else ctxt.pull()
ctx.pushAndFinish(elem)
} else ctx.pull()
override def onUpstreamFinish(ctxt: Context[immutable.Seq[T]]): TerminationDirective =
if (left == n) ctxt.finish()
else ctxt.absorbTermination()
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
if (left == n) ctx.finish()
else ctx.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] {
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
import OverflowStrategy._
private val buffer = FixedSizeBuffer(size)
override def onPush(elem: T, ctxt: DetachedContext[T]): UpstreamDirective =
if (isHolding) ctxt.pushAndPull(elem)
else enqueueAction(ctxt, elem)
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
if (ctx.isHolding) ctx.pushAndPull(elem)
else enqueueAction(ctx, elem)
override def onPull(ctxt: DetachedContext[T]): DownstreamDirective = {
if (isFinishing) {
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (ctx.isFinishing) {
val elem = buffer.dequeue().asInstanceOf[T]
if (buffer.isEmpty) ctxt.pushAndFinish(elem)
else ctxt.push(elem)
} else if (isHolding) ctxt.pushAndPull(buffer.dequeue().asInstanceOf[T])
else if (buffer.isEmpty) ctxt.hold()
else ctxt.push(buffer.dequeue().asInstanceOf[T])
if (buffer.isEmpty) ctx.pushAndFinish(elem)
else ctx.push(elem)
} else if (ctx.isHolding) ctx.pushAndPull(buffer.dequeue().asInstanceOf[T])
else if (buffer.isEmpty) ctx.hold()
else ctx.push(buffer.dequeue().asInstanceOf[T])
}
override def onUpstreamFinish(ctxt: DetachedContext[T]): TerminationDirective =
if (buffer.isEmpty) ctxt.finish()
else ctxt.absorbTermination()
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective =
if (buffer.isEmpty) ctx.finish()
else ctx.absorbTermination()
val enqueueAction: (DetachedContext[T], T) UpstreamDirective = {
overflowStrategy match {
case DropHead { (ctxt, elem)
case DropHead { (ctx, elem)
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
ctxt.pull()
ctx.pull()
}
case DropTail { (ctxt, elem)
case DropTail { (ctx, elem)
if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem)
ctxt.pull()
ctx.pull()
}
case DropBuffer { (ctxt, elem)
case DropBuffer { (ctx, elem)
if (buffer.isFull) buffer.clear()
buffer.enqueue(elem)
ctxt.pull()
ctx.pull()
}
case Backpressure { (ctxt, elem)
case Backpressure { (ctx, elem)
buffer.enqueue(elem)
if (buffer.isFull) ctxt.hold()
else ctxt.pull()
if (buffer.isFull) ctx.hold()
else ctx.pull()
}
case Error { (ctxt, elem)
if (buffer.isFull) ctxt.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
case Error { (ctx, elem)
if (buffer.isFull) ctx.fail(new Error.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
ctxt.pull()
ctx.pull()
}
}
}
@ -221,70 +221,70 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
/**
* INTERNAL API
*/
private[akka] final case class Completed[T]() extends DeterministicOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive = ctxt.finish()
override def onPull(ctxt: Context[T]): Directive = ctxt.finish()
private[akka] final case class Completed[T]() extends PushPullStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive = ctx.finish()
override def onPull(ctx: Context[T]): Directive = ctx.finish()
}
/**
* INTERNAL API
*/
private[akka] final case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedOp[In, Out] {
private[akka] final case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedStage[In, Out] {
private var agg: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
agg = if (agg == null) seed(elem)
else aggregate(agg.asInstanceOf[Out], elem)
if (!isHolding) ctxt.pull()
if (!ctx.isHolding) ctx.pull()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.pushAndPull(result)
ctx.pushAndPull(result)
}
}
override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = {
if (isFinishing) {
if (agg == null) ctxt.finish()
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (ctx.isFinishing) {
if (agg == null) ctx.finish()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.pushAndFinish(result)
ctx.pushAndFinish(result)
}
} else if (agg == null) ctxt.hold()
} else if (agg == null) ctx.hold()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.push(result)
ctx.push(result)
}
}
override def onUpstreamFinish(ctxt: DetachedContext[Out]): TerminationDirective = ctxt.absorbTermination()
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedOp[In, Out] {
private[akka] final case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedStage[In, Out] {
private var s: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
s = seed(elem)
if (isHolding) {
if (ctx.isHolding) {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
s = newS
ctxt.pushAndPull(emit)
} else ctxt.hold()
ctx.pushAndPull(emit)
} else ctx.hold()
}
override def onPull(ctxt: DetachedContext[Out]): DownstreamDirective = {
if (s == null) ctxt.hold()
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (s == null) ctx.hold()
else {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
s = newS
if (isHolding) ctxt.pushAndPull(emit)
else ctxt.push(emit)
if (ctx.isHolding) ctx.pushAndPull(emit)
else ctx.push(emit)
}
}