!str #16416,#16994 BidiFlow DSL and Flow.join Mat

- add BidiFlow, with atop and join combinators
- add Flow.join(BidiFlow)
- correct Flow.join’s default materialized value selection to Keep.left
This commit is contained in:
Roland Kuhn 2015-03-04 15:22:33 +01:00
parent 1b52ae333f
commit 157629f8af
25 changed files with 1328 additions and 18 deletions

View file

@ -0,0 +1,176 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl._
import akka.stream._
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.stage._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalactic.ConversionCheckedTripleEquals
object BidiFlowDocSpec {
//#codec
trait Message
case class Ping(id: Int) extends Message
case class Pong(id: Int) extends Message
//#codec-impl
def toBytes(msg: Message): ByteString = {
//#implementation-details-elided
implicit val order = ByteOrder.LITTLE_ENDIAN
msg match {
case Ping(id) => ByteString.newBuilder.putByte(1).putInt(id).result()
case Pong(id) => ByteString.newBuilder.putByte(2).putInt(id).result()
}
//#implementation-details-elided
}
def fromBytes(bytes: ByteString): Message = {
//#implementation-details-elided
implicit val order = ByteOrder.LITTLE_ENDIAN
val it = bytes.iterator
it.getByte match {
case 1 => Ping(it.getInt)
case 2 => Pong(it.getInt)
case other => throw new RuntimeException(s"parse error: expected 1|2 got $other")
}
//#implementation-details-elided
}
//#codec-impl
val codecVerbose = BidiFlow() { b =>
// construct and add the top flow, going outbound
val outbound = b.add(Flow[Message].map(toBytes))
// construct and add the bottom flow, going inbound
val inbound = b.add(Flow[ByteString].map(fromBytes))
// fuse them together into a BidiShape
BidiShape(outbound, inbound)
}
// this is the same as the above
val codec = BidiFlow(toBytes _, fromBytes _)
//#codec
//#framing
val framing = BidiFlow() { b =>
implicit val order = ByteOrder.LITTLE_ENDIAN
def addLengthHeader(bytes: ByteString) = {
val len = bytes.length
ByteString.newBuilder.putInt(len).append(bytes).result()
}
class FrameParser extends PushPullStage[ByteString, ByteString] {
// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1
override def onPush(bytes: ByteString, ctx: Context[ByteString]) = {
stash ++= bytes
run(ctx)
}
override def onPull(ctx: Context[ByteString]) = run(ctx)
override def onUpstreamFinish(ctx: Context[ByteString]) =
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
private def run(ctx: Context[ByteString]): Directive =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)
else {
needed = stash.iterator.getInt
stash = stash.drop(4)
run(ctx) // cycle back to possibly already emit the next chunk
}
} else if (stash.length < needed) {
// we are in the middle of a message, need more bytes
pullOrFinish(ctx)
} else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
ctx.push(emit)
}
/*
* After having called absorbTermination() we cannot pull any more, so if we need
* more data we will just have to give up.
*/
private def pullOrFinish(ctx: Context[ByteString]) =
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
}
val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].transform(() => new FrameParser))
BidiShape(outbound, inbound)
}
//#framing
val chopUp = BidiFlow() { b =>
val f = Flow[ByteString].mapConcat(_.map(ByteString(_)))
BidiShape(b.add(f), b.add(f))
}
val accumulate = BidiFlow() { b =>
val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _))
BidiShape(b.add(f), b.add(f))
}
}
class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals {
import BidiFlowDocSpec._
implicit val mat = ActorFlowMaterializer()
"A BidiFlow" must {
"compose" in {
//#compose
/* construct protocol stack
* +------------------------------------+
* | stack |
* | |
* | +-------+ +---------+ |
* ~> O~~o | ~> | o~~O ~>
* Message | | codec | ByteString | framing | | ByteString
* <~ O~~o | <~ | o~~O <~
* | +-------+ +---------+ |
* +------------------------------------+
*/
val stack = codec.atop(framing)
// test it by plugging it into its own inverse and closing the right end
val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) }
val flow = stack.atop(stack.reversed).join(pingpong)
val result = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(result, 1.second) should ===((0 to 9).map(Pong))
//#compose
}
"work when chopped up" in {
val stack = codec.atop(framing)
val flow = stack.atop(chopUp).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
}
"work when accumulated" in {
val stack = codec.atop(framing)
val flow = stack.atop(accumulate).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
}
}
}

View file

@ -33,7 +33,7 @@ Processing Stage
Defining and running streams
----------------------------
Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions:
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source
A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are

View file

@ -196,6 +196,58 @@ using ``add()`` twice.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-use
.. _bidi-flow-scala:
Bidirectional Flows
-------------------
A graph topology that is often useful is that of two flows going in opposite
directions. Take for example a codec stage that serializes outgoing messages
and deserializes incoming octet streams. Another such stage could add a framing
protocol that attaches a length header to outgoing data and parses incoming
frames back into the original octet stream chunks. These two stages are meant
to be composed, applying one atop the other as part of a protocol stack. For
this purpose exists the special type :class:`BidiFlow` which is a graph that
has exactly two open inlets and two open outlets. The corresponding shape is
called :ref:`BidiShape` and is defined like this:
.. includecode:: ../../../akka-stream/src/main/scala/akka/stream/Shape.scala
:include: bidi-shape
:exclude: implementation-details-elided
A bidirectional flow is defined just like a unidirectional :ref:`Flow` as
demonstrated for the codec mentioned above:
.. includecode:: code/docs/stream/BidiFlowDocSpec.scala
:include: codec
:exclude: implementation-details-elided
The first version resembles the partial graph constructor, while for the simple
case of a functional 1:1 transformation there is a concise convenience method
as shown on the last line. The implementation of the two functions is not
difficult either:
.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#codec-impl
In this way you could easily integrate any other serialization library that
turns an object into a sequence of bytes.
The other stage that we talked about is a little more involved since reversing
a framing protocol means that any received chunk of bytes may correspond to
zero or more messages. This is best implemented using a :class:`PushPullStage`
(see also :ref:`stream-using-push-pull-stage-scala`).
.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#framing
With these implementations we can build a protocol stack and test it:
.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#compose
This example demonstrates how :class:`BidiFlow` subgraphs can be hooked
together and also turned around with the ``.reversed`` method. The test
simulates both parties of a network communication protocol without actually
having to open a network connection—the flows can just be connected directly.
.. _graph-cycles-scala:
Graph cycles, liveness and deadlocks