diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 3e8eb4aaa8..2452adbce9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -169,6 +169,47 @@ class GraphMatValueSpec extends AkkaSpec { done should ===(true) } + "ignore materialized values for a graph with no materialized values exposed" in { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + val sink = Sink.ignore + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] ⇒ + import GraphDSL.Implicits._ + val s = builder.add(sink) + val src = builder.add(Source(1 to 3)) + val flow = builder.add(Flow[Int]) + + src ~> flow ~> s + ClosedShape + }) + + val result = g.run() + } + + "ignore materialized values for a graph with no materialized values exposed, but keep side-effects" in { + // The bug here was that the default behavior for "compose" in Module is Keep.left, but + // EmptyModule.compose broke this by always returning the other module intact, which means + // that the materialized value was that of the other module (which is inconsistent with Keep.left behavior). + + val sink = Sink.ignore.mapMaterializedValue(_ ⇒ testActor ! "side effect!") + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] ⇒ + import GraphDSL.Implicits._ + val s = builder.add(sink) + val src = builder.add(Source(1 to 3)) + val flow = builder.add(Flow[Int]) + + src ~> flow ~> s + ClosedShape + }) + + var result = g.run() + + expectMsg("side effect!") + + } + } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 5282c5ca83..327b7f04b3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -26,7 +26,7 @@ trait GraphApply { */ def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = { val builder = new GraphDSL.Builder - val s1 = builder.add(g1) + val s1 = builder.add(g1, Keep.right) val s = buildBlock(builder)(s1) val mod = builder.module.replaceShape(s) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index d91a783f4a..e351a38c8b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -170,6 +170,7 @@ object StreamLayout { /** * Fuses this Module to `that` Module by wiring together `from` and `to`, * retaining the materialized value of `this` in the result + * * @param that a Module to fuse with * @param from the data source to wire * @param to the data sink to wire @@ -182,6 +183,7 @@ object StreamLayout { * Fuses this Module to `that` Module by wiring together `from` and `to`, * transforming the materialized values of `this` and `that` using the * provided function `f` + * * @param that a Module to fuse with * @param from the data source to wire * @param to the data sink to wire @@ -242,6 +244,7 @@ object StreamLayout { * Creates a new Module which is `this` Module composed with `that` Module, * using the given function `f` to compose the materialized value of `this` with * the materialized value of `that`. + * * @param that a Module to be composed with (cannot be itself) * @param f a function which combines the materialized values * @tparam A the type of the materialized value of `this` @@ -345,10 +348,32 @@ object StreamLayout { if (s != shape) throw new UnsupportedOperationException("cannot replace the shape of the EmptyModule") else this - override def compose(that: Module): Module = that + override def compose(that: Module): Module = compose(that, scaladsl.Keep.left) - override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = - throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") + override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = { + if (f eq scaladsl.Keep.right) { + that + } else if (f eq scaladsl.Keep.left) { + // If "that" has a fully ignorable materialized value, we ignore it, otherwise we keep the side effect and + // explicitly map to NotUsed + val mat = + if (IgnorableMatValComp(that)) + Ignore + else + Transform(_ => NotUsed, that.materializedValueComputation) + + CompositeModule( + if (that.isSealed) Set(that) else that.subModules, + that.shape, + that.downstreams, + that.upstreams, + mat, + if (this.isSealed) Attributes.none else attributes) + } else { + throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule " + + "except with Keep.left or Keep.right") + } + } override def withAttributes(attributes: Attributes): Module = throw new UnsupportedOperationException("EmptyModule cannot carry attributes") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index e7c67f23b0..d30a726c1c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -913,7 +913,7 @@ object GraphDSL extends GraphApply { def add[S <: Shape](graph: Graph[S, _]): S = { if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.compose(copy) + moduleInProgress = moduleInProgress.compose(copy, Keep.left) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } @@ -926,7 +926,7 @@ object GraphDSL extends GraphApply { private[stream] def add[S <: Shape, A](graph: Graph[S, _], transform: (A) ⇒ Any): S = { if (StreamLayout.Debug) StreamLayout.validate(graph.module) val copy = graph.module.carbonCopy - moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any])) + moduleInProgress = moduleInProgress.compose(copy.transformMaterializedValue(transform.asInstanceOf[Any ⇒ Any]), Keep.right) graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] }