diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index d8ee8c370f..a66eefb625 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -33,7 +33,7 @@ Processing Stage Defining and running streams ---------------------------- -Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: +Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index 9c3ffe6f37..5fbb7716e1 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -142,6 +142,58 @@ For defining a ``Flow`` we need to expose both an undefined source and sink: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#flow-from-partial-flow-graph +.. _bidi-flow-java: + +Bidirectional Flows +------------------- + +A graph topology that is often useful is that of two flows going in opposite +directions. Take for example a codec stage that serializes outgoing messages +and deserializes incoming octet streams. Another such stage could add a framing +protocol that attaches a length header to outgoing data and parses incoming +frames back into the original octet stream chunks. These two stages are meant +to be composed, applying one atop the other as part of a protocol stack. For +this purpose exists the special type :class:`BidiFlow` which is a graph that +has exactly two open inlets and two open outlets. The corresponding shape is +called :ref:`BidiShape` and is defined like this: + +.. includecode:: ../../../akka-stream/src/main/scala/akka/stream/Shape.scala + :include: bidi-shape + :exclude: implementation-details-elided + +A bidirectional flow is defined just like a unidirectional :ref:`Flow` as +demonstrated for the codec mentioned above: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/BidiFlowDocTest.java + :include: codec + :exclude: implementation-details-elided + +The first version resembles the partial graph constructor, while for the simple +case of a functional 1:1 transformation there is a concise convenience method +as shown on the last line. The implementation of the two functions is not +difficult either: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/BidiFlowDocTest.java#codec-impl + +In this way you could easily integrate any other serialization library that +turns an object into a sequence of bytes. + +The other stage that we talked about is a little more involved since reversing +a framing protocol means that any received chunk of bytes may correspond to +zero or more messages. This is best implemented using a :class:`PushPullStage` +(see also :ref:`stream-using-push-pull-stage-java`). + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/BidiFlowDocTest.java#framing + +With these implementations we can build a protocol stack and test it: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/BidiFlowDocTest.java#compose + +This example demonstrates how :class:`BidiFlow` subgraphs can be hooked +together and also turned around with the ``.reversed()`` method. The test +simulates both parties of a network communication protocol without actually +having to open a network connection—the flows can just be connected directly. + .. _graph-cycles-java: Graph cycles, liveness and deadlocks diff --git a/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala new file mode 100644 index 0000000000..05e963c1a4 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/BidiFlowDocSpec.scala @@ -0,0 +1,176 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.stream + +import akka.stream.testkit.AkkaSpec +import akka.stream.scaladsl._ +import akka.stream._ +import akka.util.ByteString +import java.nio.ByteOrder +import akka.stream.stage._ +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.concurrent.Await +import org.scalactic.ConversionCheckedTripleEquals + +object BidiFlowDocSpec { + //#codec + trait Message + case class Ping(id: Int) extends Message + case class Pong(id: Int) extends Message + + //#codec-impl + def toBytes(msg: Message): ByteString = { + //#implementation-details-elided + implicit val order = ByteOrder.LITTLE_ENDIAN + msg match { + case Ping(id) => ByteString.newBuilder.putByte(1).putInt(id).result() + case Pong(id) => ByteString.newBuilder.putByte(2).putInt(id).result() + } + //#implementation-details-elided + } + + def fromBytes(bytes: ByteString): Message = { + //#implementation-details-elided + implicit val order = ByteOrder.LITTLE_ENDIAN + val it = bytes.iterator + it.getByte match { + case 1 => Ping(it.getInt) + case 2 => Pong(it.getInt) + case other => throw new RuntimeException(s"parse error: expected 1|2 got $other") + } + //#implementation-details-elided + } + //#codec-impl + + val codecVerbose = BidiFlow() { b => + // construct and add the top flow, going outbound + val outbound = b.add(Flow[Message].map(toBytes)) + // construct and add the bottom flow, going inbound + val inbound = b.add(Flow[ByteString].map(fromBytes)) + // fuse them together into a BidiShape + BidiShape(outbound, inbound) + } + + // this is the same as the above + val codec = BidiFlow(toBytes _, fromBytes _) + //#codec + + //#framing + val framing = BidiFlow() { b => + implicit val order = ByteOrder.LITTLE_ENDIAN + + def addLengthHeader(bytes: ByteString) = { + val len = bytes.length + ByteString.newBuilder.putInt(len).append(bytes).result() + } + + class FrameParser extends PushPullStage[ByteString, ByteString] { + // this holds the received but not yet parsed bytes + var stash = ByteString.empty + // this holds the current message length or -1 if at a boundary + var needed = -1 + + override def onPush(bytes: ByteString, ctx: Context[ByteString]) = { + stash ++= bytes + run(ctx) + } + override def onPull(ctx: Context[ByteString]) = run(ctx) + override def onUpstreamFinish(ctx: Context[ByteString]) = + if (stash.isEmpty) ctx.finish() + else ctx.absorbTermination() // we still have bytes to emit + + private def run(ctx: Context[ByteString]): Directive = + if (needed == -1) { + // are we at a boundary? then figure out next length + if (stash.length < 4) pullOrFinish(ctx) + else { + needed = stash.iterator.getInt + stash = stash.drop(4) + run(ctx) // cycle back to possibly already emit the next chunk + } + } else if (stash.length < needed) { + // we are in the middle of a message, need more bytes + pullOrFinish(ctx) + } else { + // we have enough to emit at least one message, so do it + val emit = stash.take(needed) + stash = stash.drop(needed) + needed = -1 + ctx.push(emit) + } + + /* + * After having called absorbTermination() we cannot pull any more, so if we need + * more data we will just have to give up. + */ + private def pullOrFinish(ctx: Context[ByteString]) = + if (ctx.isFinishing) ctx.finish() + else ctx.pull() + } + + val outbound = b.add(Flow[ByteString].map(addLengthHeader)) + val inbound = b.add(Flow[ByteString].transform(() => new FrameParser)) + BidiShape(outbound, inbound) + } + //#framing + + val chopUp = BidiFlow() { b => + val f = Flow[ByteString].mapConcat(_.map(ByteString(_))) + BidiShape(b.add(f), b.add(f)) + } + + val accumulate = BidiFlow() { b => + val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _)) + BidiShape(b.add(f), b.add(f)) + } +} + +class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals { + import BidiFlowDocSpec._ + + implicit val mat = ActorFlowMaterializer() + + "A BidiFlow" must { + + "compose" in { + //#compose + /* construct protocol stack + * +------------------------------------+ + * | stack | + * | | + * | +-------+ +---------+ | + * ~> O~~o | ~> | o~~O ~> + * Message | | codec | ByteString | framing | | ByteString + * <~ O~~o | <~ | o~~O <~ + * | +-------+ +---------+ | + * +------------------------------------+ + */ + val stack = codec.atop(framing) + + // test it by plugging it into its own inverse and closing the right end + val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) } + val flow = stack.atop(stack.reversed).join(pingpong) + val result = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + Await.result(result, 1.second) should ===((0 to 9).map(Pong)) + //#compose + } + + "work when chopped up" in { + val stack = codec.atop(framing) + val flow = stack.atop(chopUp).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) }) + val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + Await.result(f, 1.second) should ===((0 to 9).map(Pong)) + } + + "work when accumulated" in { + val stack = codec.atop(framing) + val flow = stack.atop(accumulate).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) }) + val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head) + Await.result(f, 1.second) should ===((0 to 9).map(Pong)) + } + + } + +} diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 176a16dace..eec1a26ea7 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -33,7 +33,7 @@ Processing Stage Defining and running streams ---------------------------- -Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: +Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index b711721ad9..9c8d10bd89 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -196,6 +196,58 @@ using ``add()`` twice. .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-use +.. _bidi-flow-scala: + +Bidirectional Flows +------------------- + +A graph topology that is often useful is that of two flows going in opposite +directions. Take for example a codec stage that serializes outgoing messages +and deserializes incoming octet streams. Another such stage could add a framing +protocol that attaches a length header to outgoing data and parses incoming +frames back into the original octet stream chunks. These two stages are meant +to be composed, applying one atop the other as part of a protocol stack. For +this purpose exists the special type :class:`BidiFlow` which is a graph that +has exactly two open inlets and two open outlets. The corresponding shape is +called :ref:`BidiShape` and is defined like this: + +.. includecode:: ../../../akka-stream/src/main/scala/akka/stream/Shape.scala + :include: bidi-shape + :exclude: implementation-details-elided + +A bidirectional flow is defined just like a unidirectional :ref:`Flow` as +demonstrated for the codec mentioned above: + +.. includecode:: code/docs/stream/BidiFlowDocSpec.scala + :include: codec + :exclude: implementation-details-elided + +The first version resembles the partial graph constructor, while for the simple +case of a functional 1:1 transformation there is a concise convenience method +as shown on the last line. The implementation of the two functions is not +difficult either: + +.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#codec-impl + +In this way you could easily integrate any other serialization library that +turns an object into a sequence of bytes. + +The other stage that we talked about is a little more involved since reversing +a framing protocol means that any received chunk of bytes may correspond to +zero or more messages. This is best implemented using a :class:`PushPullStage` +(see also :ref:`stream-using-push-pull-stage-scala`). + +.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#framing + +With these implementations we can build a protocol stack and test it: + +.. includecode:: code/docs/stream/BidiFlowDocSpec.scala#compose + +This example demonstrates how :class:`BidiFlow` subgraphs can be hooked +together and also turned around with the ``.reversed`` method. The test +simulates both parties of a network communication protocol without actually +having to open a network connection—the flows can just be connected directly. + .. _graph-cycles-scala: Graph cycles, liveness and deadlocks diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index cdd6770272..0ec0533df4 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -201,7 +201,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * and the respective materialization result returned. */ def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: FlowMaterializer): Mat = - flow.join(handler).mapMaterialized(_._2).run() + flow.joinMat(handler)(Keep.right).run() /** * Handles the connection with the given handler function. diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java new file mode 100644 index 0000000000..a762b46fb8 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -0,0 +1,270 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.javadsl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.junit.ClassRule; +import org.junit.Test; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.runtime.BoxedUnit; +import akka.japi.Pair; +import akka.stream.*; +import akka.stream.testkit.AkkaSpec; +import akka.stream.javadsl.FlowGraph.Builder; +import akka.stream.javadsl.japi.*; +import akka.util.ByteString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; + +public class BidiFlowTest extends StreamTest { + public BidiFlowTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource( + "FlowTest", AkkaSpec.testConf()); + + private final BidiFlow bidi = BidiFlow + .factory() + .create( + new Function>() { + @Override + public BidiShape apply(Builder b) + throws Exception { + final FlowShape top = b.graph(Flow + . empty().map(new Function() { + @Override + public Long apply(Integer arg) { + return (long) ((int) arg) + 2; + } + })); + final FlowShape bottom = b.graph(Flow + . empty().map(new Function() { + @Override + public String apply(ByteString arg) { + return arg.decodeString("UTF-8"); + } + })); + return new BidiShape(top + .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + } + }); + + private final BidiFlow inverse = BidiFlow + .factory() + .create( + new Function>() { + @Override + public BidiShape apply(Builder b) + throws Exception { + final FlowShape top = b.graph(Flow. empty() + .map(new Function() { + @Override + public Integer apply(Long arg) { + return (int) ((long) arg) + 2; + } + })); + final FlowShape bottom = b.graph(Flow + . empty().map(new Function() { + @Override + public ByteString apply(String arg) { + return ByteString.fromString(arg); + } + })); + return new BidiShape(top + .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + } + }); + + private final BidiFlow> bidiMat = BidiFlow + .factory() + .create( + Sink. head(), + new Function2, BidiShape>() { + @Override + public BidiShape apply(Builder b, SinkShape sink) + throws Exception { + b.from(Source.single(42)).to(sink); + final FlowShape top = b.graph(Flow + . empty().map(new Function() { + @Override + public Long apply(Integer arg) { + return (long) ((int) arg) + 2; + } + })); + final FlowShape bottom = b.graph(Flow + . empty().map(new Function() { + @Override + public String apply(ByteString arg) { + return arg.decodeString("UTF-8"); + } + })); + return new BidiShape(top + .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + } + }); + + private final String str = "Hello World"; + private final ByteString bytes = ByteString.fromString(str); + private final List list = new ArrayList(); + { + list.add(1); + list.add(2); + list.add(3); + } + private final FiniteDuration oneSec = Duration.create(1, TimeUnit.SECONDS); + + @Test + public void mustWorkInIsolation() throws Exception { + final Pair, Future> p = FlowGraph + .factory() + .closed(Sink. head(), Sink. head(), + Keep., Future> both(), + new Procedure3, SinkShape>() { + @Override + public void apply(Builder b, SinkShape st, + SinkShape sb) throws Exception { + final BidiShape s = b + .graph(bidi); + b.from(Source.single(1)).to(s.in1()); + b.from(s.out1()).to(st); + b.from(Source.single(bytes)).to(s.in2()); + b.from(s.out2()).to(sb); + } + }).run(materializer); + + final Long rt = Await.result(p.first(), oneSec); + final String rb = Await.result(p.second(), oneSec); + + assertEquals((Long) 3L, rt); + assertEquals(str, rb); + } + + @Test + public void mustWorkAsAFlowThatIsOpenOnTheLeft() throws Exception { + final Flow f = bidi.join(Flow. empty().map( + new Function() { + @Override public ByteString apply(Long arg) { + return ByteString.fromString("Hello " + arg); + } + })); + final Future> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + assertEquals(Arrays.asList("Hello 3", "Hello 4", "Hello 5"), Await.result(result, oneSec)); + } + + @Test + public void mustWorkAsAFlowThatIsOpenOnTheRight() throws Exception { + final Flow f = Flow. empty().map( + new Function() { + @Override public Integer apply(String arg) { + return Integer.valueOf(arg); + } + }).join(bidi); + final List inputs = Arrays.asList(ByteString.fromString("1"), ByteString.fromString("2")); + final Future> result = Source.from(inputs).via(f).grouped(10).runWith(Sink.> head(), materializer); + assertEquals(Arrays.asList(3L, 4L), Await.result(result, oneSec)); + } + + @Test + public void mustWorkWhenAtopItsInverse() throws Exception { + final Flow f = bidi.atop(inverse).join(Flow. empty().map( + new Function() { + @Override public String apply(Integer arg) { + return arg.toString(); + } + })); + final Future> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + assertEquals(Arrays.asList("5", "6", "7"), Await.result(result, oneSec)); + } + + @Test + public void mustWorkWhenReversed() throws Exception { + final Flow f = Flow. empty().map( + new Function() { + @Override public String apply(Integer arg) { + return arg.toString(); + } + }).join(inverse.reversed()).join(bidi.reversed()); + final Future> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); + assertEquals(Arrays.asList("5", "6", "7"), Await.result(result, oneSec)); + } + + @Test + public void mustMaterializeToItsValue() throws Exception { + final Future f = FlowGraph.factory().closed(bidiMat, new Procedure2>() { + @Override + public void apply(Builder b, + BidiShape shape) throws Exception { + final FlowShape left = b.graph(Flow. empty().map( + new Function() { + @Override public Integer apply(String arg) { + return Integer.valueOf(arg); + } + })); + final FlowShape right = b.graph(Flow. empty().map( + new Function() { + @Override public ByteString apply(Long arg) { + return ByteString.fromString("Hello " + arg); + } + })); + b.from(shape.out2()).via(left).to(shape.in1()) + .from(shape.out1()).via(right).to(shape.in2()); + } + }).run(materializer); + assertEquals((Integer) 42, Await.result(f, oneSec)); + } + + @Test + public void mustCombineMaterializationValues() throws Exception { + final Flow> left = Flow.factory().create( + Sink. head(), new Function2, Pair, Outlet>>() { + @Override + public Pair, Outlet> apply(Builder b, + SinkShape sink) throws Exception { + final UniformFanOutShape bcast = b.graph(Broadcast. create(2)); + final UniformFanInShape merge = b.graph(Merge. create(2)); + final FlowShape flow = b.graph(Flow. empty().map( + new Function() { + @Override + public Integer apply(String arg) { + return Integer.valueOf(arg); + } + })); + b.from(bcast).to(sink) + .from(Source.single(1)).via(bcast).to(merge) + .from(flow).to(merge); + return new Pair, Outlet>(flow.inlet(), merge.out()); + } + }); + final Flow>> right = Flow.factory().create( + Sink.> head(), new Function2>, Pair, Outlet>>() { + @Override + public Pair, Outlet> apply(Builder b, + SinkShape> sink) throws Exception { + final FlowShape> flow = b.graph(Flow. empty().grouped(10)); + b.from(flow).to(sink); + return new Pair, Outlet>(flow.inlet(), b.source(Source.single(ByteString.fromString("10")))); + } + }); + final Pair, Future>, Future>> result = + left.join(bidiMat, Keep., Future> both()).join(right, Keep., Future>, Future>> both()).run(materializer); + final Future l = result.first().first(); + final Future m = result.first().second(); + final Future> r = result.second(); + assertEquals((Integer) 1, Await.result(l, oneSec)); + assertEquals((Integer) 42, Await.result(m, oneSec)); + final Long[] rr = Await.result(r, oneSec).toArray(new Long[0]); + Arrays.sort(rr); + assertArrayEquals(new Long[] { 3L, 12L }, rr); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala new file mode 100644 index 0000000000..bd028eef1f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.AkkaSpec +import org.scalactic.ConversionCheckedTripleEquals +import akka.util.ByteString +import akka.stream.BidiShape +import akka.stream.ActorFlowMaterializer +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.collection.immutable + +class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { + import OperationAttributes._ + import FlowGraph.Implicits._ + + implicit val mat = ActorFlowMaterializer() + + val bidi = BidiFlow() { b ⇒ + val top = b.add(Flow[Int].map(x ⇒ x.toLong + 2).withAttributes(name("top"))) + val bottom = b.add(Flow[ByteString].map(_.decodeString("UTF-8")).withAttributes(name("bottom"))) + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + } + + val inverse = BidiFlow() { b ⇒ + val top = b.add(Flow[Long].map(x ⇒ x.toInt + 2).withAttributes(name("top"))) + val bottom = b.add(Flow[String].map(ByteString(_)).withAttributes(name("bottom"))) + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + } + + val bidiMat = BidiFlow(Sink.head[Int]) { implicit b ⇒ + s ⇒ + Source.single(42) ~> s + + val top = b.add(Flow[Int].map(x ⇒ x.toLong + 2)) + val bottom = b.add(Flow[ByteString].map(_.decodeString("UTF-8"))) + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + } + + val str = "Hello World" + val bytes = ByteString(str) + + "A BidiFlow" must { + + "work top/bottom in isolation" in { + val (top, bottom) = FlowGraph.closed(Sink.head[Long], Sink.head[String])(Keep.both) { implicit b ⇒ + (st, sb) ⇒ + val s = b.add(bidi) + + Source.single(1) ~> s.in1; s.out1 ~> st + sb <~ s.out2; s.in2 <~ Source.single(bytes) + }.run() + + Await.result(top, 1.second) should ===(3) + Await.result(bottom, 1.second) should ===(str) + } + + "work as a Flow that is open on the left" in { + val f = bidi.join(Flow[Long].map(x ⇒ ByteString(s"Hello $x"))) + val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + Await.result(result, 1.second) should ===(Seq("Hello 3", "Hello 4", "Hello 5")) + } + + "work as a Flow that is open on the right" in { + val f = Flow[String].map(Integer.valueOf(_).toInt).join(bidi) + val result = Source(List(ByteString("1"), ByteString("2"))).via(f).grouped(10).runWith(Sink.head) + Await.result(result, 1.second) should ===(Seq(3L, 4L)) + } + + "work when atop its inverse" in { + val f = bidi.atop(inverse).join(Flow[Int].map(_.toString)) + val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + Await.result(result, 1.second) should ===(Seq("5", "6", "7")) + } + + "work when reversed" in { + // just reversed from the case above; observe that Flow inverts itself automatically by being on the left side + val f = Flow[Int].map(_.toString).join(inverse.reversed).join(bidi.reversed) + val result = Source(List(1, 2, 3)).via(f).grouped(10).runWith(Sink.head) + Await.result(result, 1.second) should ===(Seq("5", "6", "7")) + } + + "materialize to its value" in { + val f = FlowGraph.closed(bidiMat) { implicit b ⇒ + bidi ⇒ + Flow[String].map(Integer.valueOf(_).toInt) <~> bidi <~> Flow[Long].map(x ⇒ ByteString(s"Hello $x")) + }.run() + Await.result(f, 1.second) should ===(42) + } + + "combine materialization values" in { + val left = Flow(Sink.head[Int]) { implicit b ⇒ + sink ⇒ + val bcast = b.add(Broadcast[Int](2)) + val merge = b.add(Merge[Int](2)) + val flow = b.add(Flow[String].map(Integer.valueOf(_).toInt)) + bcast ~> sink + Source.single(1) ~> bcast ~> merge + flow ~> merge + (flow.inlet, merge.out) + } + val right = Flow(Sink.head[immutable.Seq[Long]]) { implicit b ⇒ + sink ⇒ + val flow = b.add(Flow[Long].grouped(10)) + flow ~> sink + (flow.inlet, b.add(Source.single(ByteString("10")))) + } + val ((l, m), r) = left.joinMat(bidiMat)(Keep.both).joinMat(right)(Keep.both).run() + Await.result(l, 1.second) should ===(1) + Await.result(m, 1.second) should ===(42) + Await.result(r, 1.second).toSet should ===(Set(3L, 12L)) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index a26bb30c52..2356445d51 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -166,4 +166,11 @@ class SourceSpec extends AkkaSpec { } } + "Iterator Source" must { + "properly iterate" in { + val result = Await.result(Source(() ⇒ Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head), 1.second) + result should ===(Seq(false, true, false, true, false, true, false, true, false, true)) + } + } + } diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template new file mode 100644 index 0000000000..0099ddb96a --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.javadsl + +import akka.stream.scaladsl +import akka.stream.{ Inlet, Outlet, Shape, Graph, BidiShape } +import akka.stream.scaladsl.JavaConverters._ +import akka.japi.Pair + +trait BidiFlowCreate { + + import language.implicitConversions + private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second + + def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = + new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) }) + + def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = + new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) }) + + [2..21#def create[I##1, O##1, I##2, O##2, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], + block: japi.Function2[FlowGraph.Builder, [#S1#], BidiShape[I##1, O##1, I##2, O##2]]): BidiFlow[I##1, O##1, I##2, O##2, M] = + new BidiFlow(scaladsl.BidiFlow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# + + ] + +} diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template index 088914b155..3b7b555f05 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template @@ -19,7 +19,7 @@ trait FlowCreate { def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) }) - [3..21#def create[I, O, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], + [2..21#def create[I, O, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template index b699ab7a79..d29b28ced1 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template @@ -23,7 +23,7 @@ trait SinkCreate { def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) }) - [3..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], + [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template index 484ac9f89d..b000d1b868 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template @@ -15,7 +15,7 @@ trait SourceCreate { def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Outlet[T]]): Source[T, M] = new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) }) - [3..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], + [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], block: japi.Function2[FlowGraph.Builder, [#S1#], Outlet[T]]): Source[T, M] = new Source(scaladsl.Source([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template new file mode 100644 index 0000000000..9cfbb815ea --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2014-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ Shape, Inlet, Outlet, Graph, BidiShape } + +trait BidiFlowApply { + + def apply[I1, O1, I2, O2]()(block: FlowGraph.Builder ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = { + val builder = new FlowGraph.Builder + val shape = block(builder) + builder.buildBidiFlow(shape) + } + + def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { + val builder = new FlowGraph.Builder + val p = builder.add(g1, Keep.right) + val shape = buildBlock(builder)(p) + builder.buildBidiFlow(shape) + } + + [2..#def apply[I##1, O##1, I##2, O##2, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( + buildBlock: FlowGraph.Builder => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = { + val builder = new FlowGraph.Builder + val curried = combineMat.curried + val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) + [2..#val p1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# + ] + val shape = buildBlock(builder)([#p1#]) + builder.buildBidiFlow(shape) + }# + + ] + +} diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index ed62bc9ca4..2337087f40 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -180,19 +180,32 @@ final case class SinkShape[-T](inlet: Inlet[T]) extends Shape { } } +//#bidi-shape /** * A bidirectional flow of elements that consequently has two inputs and two * outputs, arranged like this: * * {{{ - * In1 => Out1 - * Out2 <= In2 + * +------+ + * In1 ~>| |~> Out1 + * | bidi | + * Out2 <~| |<~ In2 + * +------+ * }}} */ -final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], out1: Outlet[Out1], in2: Inlet[In2], out2: Outlet[Out2]) extends Shape { +final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], + out1: Outlet[Out1], + in2: Inlet[In2], + out2: Outlet[Out2]) extends Shape { + //#implementation-details-elided override val inlets: immutable.Seq[Inlet[_]] = List(in1, in2) override val outlets: immutable.Seq[Outlet[_]] = List(out1, out2) + /** + * Java API for creating from a pair of unidirectional flows. + */ + def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + override def deepCopy(): BidiShape[In1, Out1, In2, Out2] = BidiShape(new Inlet(in1.toString), new Outlet(out1.toString), new Inlet(in2.toString), new Outlet(out2.toString)) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { @@ -200,4 +213,11 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1], out1: Outl require(outlets.size == 2, s"proposed outlets [${outlets.mkString(", ")}] do not fit BidiShape") BidiShape(inlets(0), outlets(0), inlets(1), outlets(1)) } + //#implementation-details-elided +} +//#bidi-shape + +object BidiShape { + def apply[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 769d482e52..53d568d779 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -42,6 +42,7 @@ private[akka] object StreamLayout { def isSink: Boolean = (inPorts.size == 1) && outPorts.isEmpty def isSource: Boolean = (outPorts.size == 1) && inPorts.isEmpty def isFlow: Boolean = (inPorts.size == 1) && (outPorts.size == 1) + def isBidiFlow: Boolean = (inPorts.size == 2) && (outPorts.size == 2) def growConnect(that: Module, from: OutPort, to: InPort): Module = growConnect(that, from, to, Keep.left) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala new file mode 100644 index 0000000000..3199ff2dba --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.javadsl + +import akka.stream.scaladsl +import akka.stream.Graph +import akka.stream.BidiShape + +object BidiFlow { + + val factory: BidiFlowCreate = new BidiFlowCreate {} + + /** + * A graph with the shape of a flow logically is a flow, this method makes + * it so also in type. + */ + def wrap[I1, O1, I2, O2, M](g: Graph[BidiShape[I1, O1, I2, O2], M]): BidiFlow[I1, O1, I2, O2, M] = new BidiFlow(scaladsl.BidiFlow.wrap(g)) + + /** + * Create a BidiFlow where the top and bottom flows are just one simple mapping + * stage each, expressed by the two functions. + */ + def fromFunctions[I1, O1, I2, O2](top: japi.Function[I1, O1], bottom: japi.Function[I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = + new BidiFlow(scaladsl.BidiFlow(top.apply _, bottom.apply _)) + +} + +class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { + private[stream] override def module = delegate.module + override def shape = delegate.shape + + def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = delegate + + /** + * Add the given BidiFlow as the next step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +----------------------------+ + * | Resulting BidiFlow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | ~~> OO1 + * | | this | | bidi | | + * O2 <~~ | | <~I2~ | | <~~ II2 + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[BidiFlow]] will be the materialized + * value of the current flow (ignoring the other BidiFlow’s value), use + * [[BidiFlow#atopMat atopMat]] if a different strategy is needed. + */ + def atop[OO1, II2, Mat2](bidi: BidiFlow[O1, OO1, II2, I2, Mat2]): BidiFlow[I1, OO1, II2, O2, Mat] = + new BidiFlow(delegate.atop(bidi.asScala)) + + /** + * Add the given BidiFlow as the next step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +----------------------------+ + * | Resulting BidiFlow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | ~~> OO1 + * | | this | | bidi | | + * O2 <~~ | | <~I2~ | | <~~ II2 + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting BidiFlow. + */ + def atop[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): BidiFlow[I1, OO1, II2, O2, M] = + new BidiFlow(delegate.atopMat(bidi.asScala)(combinerToScala(combine))) + + /** + * Add the given Flow as the final step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | | + * | | this | | flow | | + * O2 <~~ | | <~I2~ | | | + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[BidiFlow#joinMat joinMat]] if a different strategy is needed. + */ + def join[Mat2](flow: Flow[O1, I2, Mat2]): Flow[I1, O2, Mat] = + new Flow(delegate.join(flow.asScala)) + + /** + * Add the given Flow as the final step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | | + * | | this | | flow | | + * O2 <~~ | | <~I2~ | | | + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting [[Flow]]. + */ + def join[Mat2, M](flow: Flow[O1, I2, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I1, O2, M] = + new Flow(delegate.joinMat(flow.asScala)(combinerToScala(combine))) + + /** + * Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack. + */ + def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(delegate.reversed) +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 0ce9c1bec0..67bec8b44c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -33,6 +33,12 @@ object Flow { /** Create a `Flow` which can process elements of type `T`. */ def of[T](clazz: Class[T]): javadsl.Flow[T, T, Unit] = create[T]() + + /** + * A graph with the shape of a flow logically is a flow, this method makes + * it so also in type. + */ + def wrap[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = new Flow(scaladsl.Flow.wrap(g)) } /** Create a `Flow` which can process elements of type `T`. */ @@ -79,8 +85,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] */ - def join[M](flow: javadsl.Flow[Out, In, M]): javadsl.RunnableFlow[Mat @uncheckedVariance Pair M] = - new RunnableFlowAdapter(delegate.join(flow.asScala).mapMaterialized(p ⇒ new Pair(p._1, p._2))) + def join[M](flow: javadsl.Flow[Out, In, M]): javadsl.RunnableFlow[Mat] = + new RunnableFlowAdapter(delegate.join(flow.asScala)) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] @@ -88,6 +94,45 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def join[M, M2](flow: javadsl.Flow[Out, In, M], combine: japi.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = new RunnableFlowAdapter(delegate.joinMat(flow.asScala)(combinerToScala(combine))) + /** + * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | ~Out~> | | ~~> O2 + * | | flow | | bidi | | + * | | | <~In~ | | <~~ I2 + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the [[BidiFlow]]’s value), use + * [[Flow#joinMat[I2* joinMat]] if a different strategy is needed. + */ + def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] = + new Flow(delegate.join(bidi.asScala)) + + /** + * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | ~Out~> | | ~~> O2 + * | | flow | | bidi | | + * | | | <~In~ | | <~~ I2 + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. + */ + def join[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2], combine: japi.Function2[Mat, Mat2, M]): Flow[I2, O2, M] = + new Flow(delegate.joinMat(bidi.asScala)(combinerToScala(combine))) + /** * Connect the `KeyedSource` to this `Flow` and then connect it to the `KeyedSink` and run it. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 2cb7f48df9..6010845443 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -98,6 +98,11 @@ object Sink { def head[In](): Sink[In, Future[In]] = new Sink(scaladsl.Sink.head[In]) + /** + * A graph with the shape of a sink logically is a sink, this method makes + * it so also in type. + */ + def wrap[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] = new Sink(scaladsl.Sink.wrap(g)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b269315d4a..5875727e63 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -18,6 +18,7 @@ import scala.language.higherKinds import scala.language.implicitConversions import akka.stream.stage.Stage import akka.stream.impl.StreamLayout +import scala.annotation.varargs /** Java API */ object Source { @@ -134,6 +135,12 @@ object Source { def single[T](element: T): Source[T, Unit] = new Source(scaladsl.Source.single(element)) + /** + * Create a `Source` with the given elements. + */ + def elements[T](elems: T*): Source[T, Unit] = + new Source(scaladsl.Source(() ⇒ elems.iterator)) + /** * Create a `Source` that will continually emit the given element. */ @@ -159,6 +166,12 @@ object Source { */ def concat[T, M1, M2](first: Source[T, M1], second: Source[T, M2]): Source[T, (M1, M2)] = new Source(scaladsl.Source.concat(first.asScala, second.asScala)) + + /** + * A graph with the shape of a source logically is a source, this method makes + * it so also in type. + */ + def wrap[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = new Source(scaladsl.Source.wrap(g)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala new file mode 100644 index 0000000000..56e7ab6156 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.Graph +import akka.stream.BidiShape +import akka.stream.impl.StreamLayout.Module +import akka.stream.FlowShape + +final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { + override val shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] + + /** + * Add the given BidiFlow as the next step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +----------------------------+ + * | Resulting BidiFlow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | ~~> OO1 + * | | this | | bidi | | + * O2 <~~ | | <~I2~ | | <~~ II2 + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[BidiFlow]] will be the materialized + * value of the current flow (ignoring the other BidiFlow’s value), use + * [[BidiFlow#atopMat atopMat]] if a different strategy is needed. + */ + def atop[OO1, II2, Mat2](bidi: BidiFlow[O1, OO1, II2, I2, Mat2]): BidiFlow[I1, OO1, II2, O2, Mat] = atopMat(bidi)(Keep.left) + + /** + * Add the given BidiFlow as the next step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +----------------------------+ + * | Resulting BidiFlow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | ~~> OO1 + * | | this | | bidi | | + * O2 <~~ | | <~I2~ | | <~~ II2 + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting BidiFlow. + */ + def atopMat[OO1, II2, Mat2, M](bidi: BidiFlow[O1, OO1, II2, I2, Mat2])(combine: (Mat, Mat2) ⇒ M): BidiFlow[I1, OO1, II2, O2, M] = { + val copy = bidi.module.carbonCopy + val ins = copy.shape.inlets + val outs = copy.shape.outlets + new BidiFlow(module + .grow(copy, combine) + .connect(shape.out1, ins(0)) + .connect(outs(1), shape.in2) + .replaceShape(BidiShape(shape.in1, outs(0), ins(1), shape.out2))) + } + + /** + * Add the given Flow as the final step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | | + * | | this | | flow | | + * O2 <~~ | | <~I2~ | | | + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[BidiFlow#joinMat joinMat]] if a different strategy is needed. + */ + def join[Mat2](flow: Flow[O1, I2, Mat2]): Flow[I1, O2, Mat] = joinMat(flow)(Keep.left) + + /** + * Add the given Flow as the final step in a bidirectional transformation + * pipeline. By convention protocol stacks are growing to the left: the right most is the bottom + * layer, the closest to the metal. + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * I1 ~~> | | ~O1~> | | | + * | | this | | flow | | + * O2 <~~ | | <~I2~ | | | + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting [[Flow]]. + */ + def joinMat[Mat2, M](flow: Flow[O1, I2, Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I1, O2, M] = { + val copy = flow.module.carbonCopy + val in = copy.shape.inlets.head + val out = copy.shape.outlets.head + new Flow(module + .grow(copy, combine) + .connect(shape.out1, in) + .connect(out, shape.in2) + .replaceShape(FlowShape(shape.in1, shape.out2))) + } + + /** + * Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack. + */ + def reversed: BidiFlow[I2, O2, I1, O1, Mat] = { + val ins = shape.inlets + val outs = shape.outlets + new BidiFlow(module.replaceShape(shape.copyFromPorts(ins.reverse, outs.reverse))) + } +} + +object BidiFlow extends BidiFlowApply { + + /** + * A graph with the shape of a flow logically is a flow, this method makes + * it so also in type. + */ + def wrap[I1, O1, I2, O2, Mat](graph: Graph[BidiShape[I1, O1, I2, O2], Mat]): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(graph.module) + + /** + * Create a BidiFlow where the top and bottom flows are just one simple mapping + * stage each, expressed by the two functions. + */ + def apply[I1, O1, I2, O2](outbound: I1 ⇒ O1, inbound: I2 ⇒ O2): BidiFlow[I1, O1, I2, O2, Unit] = + BidiFlow() { b ⇒ + val top = b.add(Flow[I1].map(outbound)) + val bottom = b.add(Flow[I2].map(inbound)) + BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c5d7811b83..c11b587595 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -31,11 +31,38 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Transform this [[Flow]] by appending the given processing steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. */ def via[T, Mat2](flow: Flow[Out, T, Mat2]): Flow[In, T, Mat] = viaMat(flow)(Keep.left) /** * Transform this [[Flow]] by appending the given processing steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. */ def viaMat[T, Mat2, Mat3](flow: Flow[Out, T, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { if (this.isIdentity) flow.asInstanceOf[Flow[In, T, Mat3]] @@ -50,6 +77,20 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. + * {{{ + * +----------------------------+ + * | Resulting Sink | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | flow | ~Out~> | sink | | + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Sink]] will be the materialized + * value of the current flow (ignoring the given Sink’s value), use + * [[Flow#toMat[Mat2* toMat]] if a different strategy is needed. */ def to[Mat2](sink: Sink[Out, Mat2]): Sink[In, Mat] = { toMat(sink)(Keep.left) @@ -57,6 +98,19 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. + * {{{ + * +----------------------------+ + * | Resulting Sink | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | flow | ~Out~> | sink | | + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * Sink into the materialized value of the resulting Sink. */ def toMat[Mat2, Mat3](sink: Sink[Out, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = { if (isIdentity) sink.asInstanceOf[Sink[In, Mat3]] @@ -75,8 +129,32 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) def mapMaterialized[Mat2](f: Mat ⇒ Mat2): Repr[Out, Mat2] = new Flow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) + /** + * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]. + * {{{ + * +------+ +-------+ + * | | ~Out~> | | + * | this | | other | + * | | <~In~ | | + * +------+ +-------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed. + */ + def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left) + /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] + * {{{ + * +------+ +-------+ + * | | ~Out~> | | + * | this | | other | + * | | <~In~ | | + * +------+ +-------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * Flow into the materialized value of the resulting Flow. */ def joinMat[Mat2, Mat3](flow: Flow[Out, In, Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { val flowCopy = flow.module.carbonCopy @@ -88,12 +166,61 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) } /** - * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] + * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | ~Out~> | | ~~> O2 + * | | flow | | bidi | | + * | | | <~In~ | | <~~ I2 + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the [[BidiFlow]]’s value), use + * [[Flow#joinMat[I2* joinMat]] if a different strategy is needed. */ - def join[Mat2](flow: Flow[Out, In, Mat2]): RunnableFlow[(Mat, Mat2)] = { - joinMat(flow)(Keep.both) + def join[I2, O2, Mat2](bidi: BidiFlow[Out, O2, I2, In, Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left) + + /** + * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: + * {{{ + * +---------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | ~Out~> | | ~~> O2 + * | | flow | | bidi | | + * | | | <~In~ | | <~~ I2 + * | +------+ +------+ | + * +---------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * [[BidiFlow]] into the materialized value of the resulting [[Flow]]. + */ + def joinMat[I2, O2, Mat2, M](bidi: BidiFlow[Out, O2, I2, In, Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I2, O2, M] = { + val copy = bidi.module.carbonCopy + val ins = copy.shape.inlets + val outs = copy.shape.outlets + new Flow(module + .grow(copy, combine) + .connect(shape.outlet, ins(0)) + .connect(outs(1), shape.inlet) + .replaceShape(FlowShape(ins(1), outs(0)))) } + /** + * Concatenate the given [[Source]] to this [[Flow]], meaning that once this + * Flow’s input is exhausted and all result elements have been generated, + * the Source’s elements will be produced. Note that the Source is materialized + * together with this Flow and just kept from producing elements by asserting + * back-pressure until its time comes. + * + * The resulting Flow’s materialized value is a Tuple2 containing both materialized + * values (of this Flow and that Source). + */ def concat[Out2 >: Out, Mat2](source: Source[Out2, Mat2]): Flow[In, Out2, (Mat, Mat2)] = { this.viaMat(Flow(source) { implicit builder ⇒ s ⇒ @@ -122,9 +249,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) else new Flow[In, U, Mat2](module.growConnect(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) } + /** + * Change the attributes of this [[Flow]] to the given ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = { - require(this.module ne EmptyModule, "Cannot set the attributes of empty flow") - new Flow(module.withAttributes(attr).wrap()) + if (this.module eq EmptyModule) this + else new Flow(module.withAttributes(attr).wrap()) } /** @@ -136,6 +268,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) source.via(this).toMat(sink)(Keep.both).run() } + // FIXME remove (in favor of .via) def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) ⇒ Mat3)(section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Mat2]): Flow[In, O, Mat3] = { val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap() if (this.isIdentity) new Flow(subFlow).asInstanceOf[Flow[In, O, Mat3]] @@ -148,6 +281,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Applies given [[OperationAttributes]] to a given section. */ + // FIXME remove (in favor of .via) def section[O, O2 >: Out, Mat2](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Mat2]): Flow[In, O, Mat2] = { this.section[O, O2, Mat2, Mat2](attributes, Keep.right)(section) } @@ -165,7 +299,7 @@ object Flow extends FlowApply { def apply[T]: Flow[T, T, Unit] = new Flow[Any, Any, Any](Stages.Identity()).asInstanceOf[Flow[T, T, Unit]] /** - * A graph with the shape of a source logically is a source, this method makes + * A graph with the shape of a flow logically is a flow, this method makes * it so also in type. */ def wrap[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] = new Flow(g.module) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index fa456f4d9b..128a9acf66 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -253,6 +253,17 @@ object FlowGraph extends GraphApply { new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet))) } + private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { + if (!moduleInProgress.isBidiFlow) + throw new IllegalArgumentException( + s"Cannot build BidiFlow with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") + if (moduleInProgress.outPorts.toSet != shape.outlets.toSet) + throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the module’s open Outlets [${moduleInProgress.outPorts.mkString(",")}]") + if (moduleInProgress.inPorts.toSet != shape.inlets.toSet) + throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the module’s open Inlets [${moduleInProgress.inPorts.mkString(",")}]") + new BidiFlow(moduleInProgress.replaceShape(shape)) + } + private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = { if (moduleInProgress.isRunnable) throw new IllegalArgumentException("Cannot build the Sink since no ports remain open") @@ -429,6 +440,75 @@ object FlowGraph extends GraphApply { implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] { override def importAndGetPortReverse(b: Builder): Inlet[I] = f.inlet + + def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + val shape = b.add(bidi) + b.addEdge(f.outlet, shape.in1) + b.addEdge(shape.out2, f.inlet) + shape + } + + def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + b.addEdge(f.outlet, bidi.in1) + b.addEdge(bidi.out2, f.inlet) + bidi + } + + def <~>[M](flow: Flow[O, I, M])(implicit b: Builder): Unit = { + val shape = b.add(flow) + b.addEdge(shape.outlet, f.inlet) + b.addEdge(f.outlet, shape.inlet) + } + } + + implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal { + def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + val shape = b.add(bidi) + val flow = b.add(f) + b.addEdge(flow.outlet, shape.in1) + b.addEdge(shape.out2, flow.inlet) + shape + } + + def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + val flow = b.add(f) + b.addEdge(flow.outlet, bidi.in1) + b.addEdge(bidi.out2, flow.inlet) + bidi + } + + def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder): Unit = { + val shape = b.add(flow) + val ff = b.add(f) + b.addEdge(shape.outlet, ff.inlet) + b.addEdge(ff.outlet, shape.inlet) + } + } + + implicit class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal { + def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { + b.addEdge(bidi.out1, other.in1) + b.addEdge(other.out2, bidi.in2) + other + } + + def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { + val other = b.add(otherFlow) + b.addEdge(bidi.out1, other.in1) + b.addEdge(other.out2, bidi.in2) + other + } + + def <~>(flow: FlowShape[O1, I2])(implicit b: Builder): Unit = { + b.addEdge(bidi.out1, flow.inlet) + b.addEdge(flow.outlet, bidi.in2) + } + + def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder): Unit = { + val flow = b.add(f) + b.addEdge(bidi.out1, flow.inlet) + b.addEdge(flow.outlet, bidi.in2) + } } import scala.language.implicitConversions diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala index 7aab257926..44ce29cba8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala @@ -17,6 +17,9 @@ private[akka] object JavaConverters { implicit final class AddAsJavaFlow[In, Out, Mat](val flow: scaladsl.Flow[In, Out, Mat]) extends AnyVal { def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(flow) } + implicit final class AddAsJavaBidiFlow[I1, O1, I2, O2, Mat](val flow: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { + def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(flow) + } implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal { def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) } @@ -30,6 +33,9 @@ private[akka] object JavaConverters { implicit final class AddAsScalaFlow[In, Out, Mat](val flow: javadsl.Flow[In, Out, Mat]) extends AnyVal { def asScala: scaladsl.Flow[In, Out, Mat] = flow.asScala } + implicit final class AddAsScalaBidiFlow[I1, O1, I2, O2, Mat](val flow: javadsl.BidiFlow[I1, O1, I2, O2, Mat]) extends AnyVal { + def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = flow.asScala + } implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal { def asScala: scaladsl.Sink[In, Mat] = sink.asScala } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index d376c385fe..f5436c7024 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -49,7 +49,7 @@ object Sink extends SinkApply { private def shape[T](name: String): SinkShape[T] = SinkShape(new Inlet(name + ".in")) /** - * A graph with the shape of a source logically is a source, this method makes + * A graph with the shape of a sink logically is a sink, this method makes * it so also in type. */ def wrap[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] = new Sink(g.module)