diff --git a/akka-stream-tests/src/test/resources/reference.conf b/akka-stream-tests/src/test/resources/reference.conf index ffbc14a271..107a9f2c70 100644 --- a/akka-stream-tests/src/test/resources/reference.conf +++ b/akka-stream-tests/src/test/resources/reference.conf @@ -8,4 +8,5 @@ akka { akka.actor.warn-about-java-serializer-usage = false stream.materializer.debug.fuzzing-mode = on + stream.secret-test-fuzzing-warning-disable = 42 } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index da05c2e2c4..4e68f3b3cd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -511,7 +511,9 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { downstream.requestOne() lastEvents() should be(Set(RequestOne)) - upstream.onComplete() + EventFilter[IllegalArgumentException](pattern = ".*Cannot pull closed port.*", occurrences = 1).intercept { + upstream.onComplete() + } val ev = lastEvents() ev.nonEmpty should be(true) ev.forall { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 75ade88c1b..ad94d7d3e5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import akka.NotUsed - import scala.collection.immutable import scala.concurrent.duration._ import akka.stream.ActorMaterializer @@ -14,6 +13,7 @@ import akka.stream.testkit.Utils._ import org.reactivestreams.Subscription import akka.testkit.TestProbe import org.reactivestreams.Subscriber +import akka.testkit.EventFilter class FlowIteratorSpec extends AbstractFlowIteratorSpec { override def testName = "A Flow based on an iterator producing function" @@ -40,7 +40,9 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { sub.request(1) c.expectNext(1) c.expectNoMsg(100.millis) - sub.request(2) + EventFilter[IllegalStateException](message = "not two", occurrences = 1).intercept { + sub.request(2) + } c.expectError().getMessage should be("not two") sub.request(2) c.expectNoMsg(100.millis) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala index 6dd7130d00..b2df91f174 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher import scala.concurrent.duration._ +import akka.testkit.EventFilter class FlowZipWithSpec extends BaseTwoStreamsSetup { @@ -46,7 +47,9 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - subscription.request(2) + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } probe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala index bca40a7173..1c778547a4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala @@ -8,9 +8,9 @@ import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ import akka.stream.testkit._ import org.reactivestreams.Publisher - import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.testkit.EventFilter class GraphUnzipWithSpec extends AkkaSpec { @@ -174,7 +174,9 @@ class GraphUnzipWithSpec extends AkkaSpec { leftProbe.expectNext(1 / -1) rightProbe.expectNext("1/-1") - requestFromBoth() + EventFilter[ArithmeticException](occurrences = 1).intercept { + requestFromBoth() + } leftProbe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala index af77a0f652..bda0179180 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -3,6 +3,7 @@ package akka.stream.scaladsl import akka.stream.testkit._ import scala.concurrent.duration._ import akka.stream._ +import akka.testkit.EventFilter class GraphZipWithSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -65,7 +66,9 @@ class GraphZipWithSpec extends TwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - subscription.request(2) + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } probe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 257160b049..0655f1d248 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -13,6 +13,7 @@ import scala.util.control.NoStackTrace import akka.stream._ import akka.stream.testkit._ import akka.NotUsed +import akka.testkit.EventFilter class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { @@ -235,13 +236,14 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { "terminate with a failure if there is an exception thrown" in { val t = new RuntimeException("expected") - whenReady( - Source.unfold((0, 1)) { - case (a, _) if a > 10000000 ⇒ throw t - case (a, b) ⇒ Some((b, a + b) → a) - }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { - _ should be theSameInstanceAs (t) - } + EventFilter[RuntimeException](message = "expected", occurrences = 1) intercept + whenReady( + Source.unfold((0, 1)) { + case (a, _) if a > 10000000 ⇒ throw t + case (a, b) ⇒ Some((b, a + b) → a) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { + _ should be theSameInstanceAs (t) + } } "generate a finite fibonacci sequence asynchronously" in { diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 6af4c5ea84..c7aa13a5b3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -15,7 +15,7 @@ trait GraphApply { def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ⇒ S): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val s = buildBlock(builder) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -28,7 +28,7 @@ trait GraphApply { val builder = new GraphDSL.Builder val s1 = builder.add(g1) val s = buildBlock(builder)(s1) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -47,7 +47,7 @@ trait GraphApply { [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val s = buildBlock(builder)([#s1#]) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) }# @@ -63,7 +63,7 @@ private[stream] object GraphApply { extends Graph[S, Mat] { override def withAttributes(attr: Attributes): Graph[S, Mat] = - new GraphImpl(shape, module.withAttributes(attr).nest()) + new GraphImpl(shape, module.withAttributes(attr)) override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index db581e1860..2d6ee38d94 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -36,7 +36,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, private val _logger = Logging.getLogger(system, this) override def logger = _logger - if (settings.fuzzingMode) { + if (settings.fuzzingMode && !system.settings.config.hasPath("akka.stream.secret-test-fuzzing-warning-disable")) { _logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " + "set akka.stream.materializer.debug.fuzzing-mode to off.") } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 3c52cd29ee..4e4bf7393a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -136,6 +136,10 @@ object StreamLayout { /** * Verify that the given Shape has the same ports and return a new module with that shape. * Concrete implementations may throw UnsupportedOperationException where applicable. + * + * Please note that this method MUST NOT be implemented using a CopiedModule since + * the purpose of replaceShape can also be to rearrange the ports (as in BidiFlow.reversed) + * and that purpose would be defeated. */ def replaceShape(s: Shape): Module @@ -199,7 +203,7 @@ object StreamLayout { downstreams.updated(from, to), upstreams.updated(to, from), materializedValueComputation, - attributes) + if (isSealed) Attributes.none else attributes) } final def transformMaterializedValue(f: Any ⇒ Any): Module = { @@ -289,39 +293,20 @@ object StreamLayout { Attributes.none) } - /** - * Creates a new Module which contains `this` Module - * @return a new Module - */ - def nest(): Module = { - if (Debug) validate(this) - - CompositeModule( - Set(this), - shape, - /* - * Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the - * layout information of all the nested modules). Copied modules break the nesting, scoping them to the - * copied module. The MaterializerSession will take care of propagating the necessary Publishers and Subscribers - * from the enclosed scope to the outer scope. - */ - downstreams, - upstreams, - /* - * Wrapping like this shields the outer module from the details of the - * materialized value computation of its submodules. - */ - Atomic(this), - Attributes.none) - } - def subModules: Set[Module] - final def isSealed: Boolean = isAtomic || isCopied || isFused + final def isSealed: Boolean = isAtomic || isCopied || isFused || attributes.attributeList.nonEmpty def downstreams: Map[OutPort, InPort] = Map.empty def upstreams: Map[InPort, OutPort] = Map.empty def materializedValueComputation: MaterializedValueNode = Atomic(this) + + /** + * The purpose of this method is to create a copy to be included in a larger + * graph such that port identity clashes are avoided. Where a full copy is not + * possible or desirable, use a CopiedModule. The shape of the resulting + * module MUST NOT contain the same ports as this module’s shape. + */ def carbonCopy: Module def attributes: Attributes @@ -342,8 +327,6 @@ object StreamLayout { override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") - override def nest(): Module = this - override def subModules: Set[Module] = Set.empty override def withAttributes(attributes: Attributes): Module = @@ -368,7 +351,7 @@ object StreamLayout { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) - copy(shape = s) + CompositeModule(this, s) } override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf) @@ -379,12 +362,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) @@ -404,14 +387,18 @@ object StreamLayout { |""".stripMargin } + object CompositeModule { + def apply(m: Module, s: Shape): CompositeModule = CompositeModule(Set(m), s, Map.empty, Map.empty, Atomic(m), Attributes.none) + } + final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index a41f8c8fd8..1f82637e73 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.event.Logging import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ -import akka.stream.impl.StreamLayout.{ CopiedModule, Module } +import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } @@ -29,13 +29,9 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at override def subModules: Set[Module] = Set.empty override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) - override final def carbonCopy: Module = { - val newShape = shape.deepCopy() - replaceShape(newShape) - } + override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - override final def replaceShape(newShape: Shape): Module = - CopiedModule(newShape, attributes, copyOf = this) + override final def replaceShape(newShape: Shape): Module = CompositeModule(this, newShape) override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala index 9af78b8837..14f81c057d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -246,7 +246,7 @@ private[stream] object Fusing { struct: BuildStructuralInfo, openGroup: ju.Set[Module], indent: Int): List[(Module, MaterializedValueNode)] = { - def log(msg: String): Unit = println(indent + msg) + def log(msg: String): Unit = println(" " * indent + msg) val async = m match { case _: GraphStageModule ⇒ m.attributes.contains(AsyncBoundary) case _: GraphModule ⇒ m.attributes.contains(AsyncBoundary) @@ -275,7 +275,7 @@ private[stream] object Fusing { * - we need to register the contained modules but take care to not include the internal * wirings into the final result, see also `struct.removeInternalWires()` */ - if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + indent)}") + if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + " " * indent)}") // storing the old Shape in arrays for in-place updating as we clone the contained GraphStages val oldIns = oldShape.inlets.toArray @@ -356,7 +356,7 @@ private[stream] object Fusing { subMatBuilder ++= res } val subMat = subMatBuilder.result() - if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + indent, "\n " + indent, "")) + if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, "")) // we need to remove all wirings that this module copied from nested modules so that we // don’t do wirings twice val oldDownstreams = m match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 4ccc35bf5b..c51f059e2a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -25,10 +25,9 @@ import scala.util.Try private[akka] final case class GraphStageModule(shape: Shape, attributes: Attributes, stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { - def carbonCopy: Module = replaceShape(shape.deepCopy()) + def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - def replaceShape(s: Shape): Module = - CopiedModule(s, Attributes.none, this) + def replaceShape(s: Shape): Module = CompositeModule(this, s) def subModules: Set[Module] = Set.empty diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 913bdbae91..cc8c5e4488 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -9,7 +9,7 @@ import java.util.Optional import akka.{ NotUsed, japi } import akka.stream._ -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.{ Module, CompositeModule } import akka.util.ByteString import javax.net.ssl._ @@ -128,7 +128,7 @@ object SslTls { override def replaceShape(s: Shape) = if (s == shape) this - else if (shape.hasSamePortsAs(s)) copy(shape = s) + else if (shape.hasSamePortsAs(s)) CompositeModule(this, s) else throw new IllegalArgumentException("trying to replace shape with different ports") } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index a0f071cc27..1ab202d56a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -116,12 +116,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu /** * Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack. */ - def reversed: BidiFlow[I2, O2, I1, O1, Mat] = { - BidiFlow.fromGraph(GraphDSL.create(this) { implicit b ⇒ - reversed ⇒ - BidiShape(reversed.in2, reversed.out2, reversed.in1, reversed.out1) - }) - } + def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(module.replaceShape(BidiShape(shape.in2, shape.out2, shape.in1, shape.out1))) /** * Transform only the materialized value of this BidiFlow, leaving all other properties as they were. @@ -137,7 +132,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu * only to the contained processing stages). */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = - new BidiFlow(module.withAttributes(attr).nest()) + new BidiFlow(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 76bed8a5f7..e45d18a7c6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -28,7 +28,7 @@ import akka.NotUsed * A `Flow` is a set of stream processing steps that has one open input and one open output. */ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) - extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { + extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]] @@ -314,12 +314,12 @@ object Flow { fromSinkAndSourceMat(sink, source)(Keep.none) /** - * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input - * will be sent to the Sink and the Flow's output will come from the Source. - * - * The `combine` function is used to compose the materialized values of the `sink` and `source` - * into the materialized value of the resulting [[Flow]]. - */ + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. + */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] = fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) } @@ -354,7 +354,7 @@ final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Mo def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) override def withAttributes(attr: Attributes): RunnableGraph[Mat] = - new RunnableGraph(module.withAttributes(attr).nest()) + new RunnableGraph(module.withAttributes(attr)) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) } @@ -928,8 +928,6 @@ trait FlowOps[+Out, +Mat] { def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) - - /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 396298f44f..691a4e4469 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -58,7 +58,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Sink[In, Mat] = - new Sink(module.withAttributes(attr).nest()) + new Sink(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 802be68ccc..d65ee7bbde 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -132,7 +132,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - new Source(module.withAttributes(attr).nest()) + new Source(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes`