+doc: Add documentation for DetachedStages

also extend documentation with more diagrams
This commit is contained in:
Endre Sándor Varga 2015-07-12 10:28:49 +02:00
parent 6b4f52afb2
commit 0d8fd40b09
7 changed files with 2465 additions and 10 deletions

View file

@ -124,6 +124,55 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
}
}
"demonstrate DetachedStage" in {
//#detached
class Buffer2[T]() extends DetachedStage[T, T] {
private var buf = Vector.empty[T]
private var capacity = 2
private def isFull = capacity == 0
private def isEmpty = capacity == 2
private def dequeue(): T = {
capacity += 1
val next = buf.head
buf = buf.tail
next
}
private def enqueue(elem: T) = {
capacity -= 1
buf = buf :+ elem
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (isEmpty) {
if (ctx.isFinishing) ctx.finish() // No more elements will arrive
else ctx.holdDownstream() // waiting until new elements
} else {
val next = dequeue()
if (ctx.isHoldingUpstream) ctx.pushAndPull(next) // release upstream
else ctx.push(next)
}
}
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
enqueue(elem)
if (isFull) ctx.holdUpstream() // Queue is now full, wait until new empty slot
else {
if (ctx.isHoldingDownstream) ctx.pushAndPull(dequeue()) // Release downstream
else ctx.pull()
}
}
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
if (!isEmpty) ctx.absorbTermination() // still need to flush from buffer
else ctx.finish() // already empty, finishing
}
}
//#detached
}
}
}