From e80d7e18015530bda7235241e22b3d69081f0b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 24 May 2016 12:35:32 +0200 Subject: [PATCH] #20564 Fix compose method of EmptyModule to be able to Keep.left or right ... and use the proper version from the GraphDSL depending on whether materialized value must be altered on an import or not. --- .../stream/scaladsl/GraphMatValueSpec.scala | 41 +++++++++++++++++++ .../stream/scaladsl/GraphApply.scala.template | 2 +- .../scala/akka/stream/impl/StreamLayout.scala | 31 ++++++++++++-- .../scala/akka/stream/scaladsl/Graph.scala | 4 +- 4 files changed, 72 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 3e8eb4aaa8..2452adbce9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -169,6 +169,47 @@ class GraphMatValueSpec extends AkkaSpec { done should ===(true) } + "ignore materialized values for a graph with no materialized values exposed" in { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + val sink = Sink.ignore + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] ⇒ + import GraphDSL.Implicits._ + val s = builder.add(sink) + val src = builder.add(Source(1 to 3)) + val flow = builder.add(Flow[Int]) + + src ~> flow ~> s + ClosedShape + }) + + val result = g.run() + } + + "ignore materialized values for a graph with no materialized values exposed, but keep side-effects" in { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + val sink = Sink.ignore.mapMaterializedValue(_ ⇒ testActor ! "side effect!") + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] ⇒ + import GraphDSL.Implicits._ + val s = builder.add(sink) + val src = builder.add(Source(1 to 3)) + val flow = builder.add(Flow[Int]) + + src ~> flow ~> s + ClosedShape + }) + + var result = g.run() + + expectMsg("side effect!") + + } + } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 5282c5ca83..327b7f04b3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -26,7 +26,7 @@ trait GraphApply { */ def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = { val builder = new GraphDSL.Builder - val s1 = builder.add(g1) + val s1 = builder.add(g1, Keep.right) val s = buildBlock(builder)(s1) val mod = builder.module.replaceShape(s) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index d91a783f4a..e351a38c8b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -170,6 +170,7 @@ object StreamLayout { /** * Fuses this Module to `that` Module by wiring together `from` and `to`, * retaining the materialized value of `this` in the result + * * @param that a Module to fuse with * @param from the data source to wire * @param to the data sink to wire @@ -182,6 +183,7 @@ object StreamLayout { * Fuses this Module to `that` Module by wiring together `from` and `to`, * transforming the materialized values of `this` and `that` using the * provided function `f` + * * @param that a Module to fuse with * @param from the data source to wire * @param to the data sink to wire @@ -242,6 +244,7 @@ object StreamLayout { * Creates a new Module which is `this` Module composed with `that` Module, * using the given function `f` to compose the materialized value of `this` with * the materialized value of `that`. + * * @param that a Module to be composed with (cannot be itself) * @param f a function which combines the materialized values * @tparam A the type of the materialized value of `this` @@ -345,10 +348,32 @@ object StreamLayout { if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") else this - override def compose(that: Module): Module = that + override def compose(that: Module): Module = compose(that, scaladsl.Keep.left) - override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = - throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") + override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = { + if (f eq scaladsl.Keep.right) { + that + } else if (f eq scaladsl.Keep.left) { + // If "that" has a fully ignorable materialized value, we ignore it, otherwise we keep the side effect and + // explicitly map to NotUsed + val mat = + if (IgnorableMatValComp(that)) + Ignore + else + Transform(_ => NotUsed, that.materializedValueComputation) + + CompositeModule( + if (that.isSealed) Set(that) else that.subModules, + that.shape, + that.downstreams, + that.upstreams, + mat, + if (this.isSealed) Attributes.none else attributes) + } else { + throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule " + + "except with Keep.left or Keep.right") + } + } override def withAttributes(attributes: Attributes): Module = throw new UnsupportedOperationException("EmptyModule cannot carry attributes") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e7c67f23b0..d30a726c1c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -913,7 +913,7 @@ object GraphDSL extends GraphApply { def add[S <: Shape](graph: Graph[S, _]): S = { if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.compose(copy) + moduleInProgress = moduleInProgress.compose(copy, Keep.left) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } @@ -926,7 +926,7 @@ object GraphDSL extends GraphApply { private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) ⇒ Any): S = { if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any])) + moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any]), Keep.right) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] }