From ab98618a2585fb19b786187a599d86007f51a54d Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 2 Jun 2021 07:17:31 -0700 Subject: [PATCH] Fix specificity of inherited attributes for {Source,Flow,Sink}.fromMaterializer (#30171) #30142 --- .../scaladsl/FromMaterializationSpec.scala | 100 +++++++++++++----- .../akka/stream/scaladsl/SetupSpec.scala | 8 +- .../30171-setup-stage.backwards.excludes | 2 + .../scala/akka/stream/impl/SetupStage.scala | 47 +------- .../scala/akka/stream/scaladsl/Sink.scala | 13 ++- 5 files changed, 92 insertions(+), 78 deletions(-) create mode 100644 akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30171-setup-stage.backwards.excludes diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala index 7f24902996..142b230419 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FromMaterializationSpec.scala @@ -7,7 +7,9 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.Attributes import akka.stream.Attributes.Attribute +import akka.stream.scaladsl.AttributesSpec.{ whateverAttribute, WhateverAttribute } import akka.stream.testkit.StreamSpec +import akka.stream.testkit.scaladsl.StreamTestKit._ class FromMaterializerSpec extends StreamSpec { @@ -16,7 +18,7 @@ class FromMaterializerSpec extends StreamSpec { "Source.fromMaterializer" should { - "expose materializer" in { + "expose materializer" in assertAllStagesStopped { val source = Source.fromMaterializer { (mat, _) => Source.single(mat.isShutdown) } @@ -24,7 +26,7 @@ class FromMaterializerSpec extends StreamSpec { source.runWith(Sink.head).futureValue shouldBe false } - "expose attributes" in { + "expose attributes" in assertAllStagesStopped { val source = Source.fromMaterializer { (_, attr) => Source.single(attr.attributeList) } @@ -32,7 +34,7 @@ class FromMaterializerSpec extends StreamSpec { source.runWith(Sink.head).futureValue should not be empty } - "propagate materialized value" in { + "propagate materialized value" in assertAllStagesStopped { val source = Source.fromMaterializer { (_, _) => Source.maybe[NotUsed] } @@ -42,7 +44,7 @@ class FromMaterializerSpec extends StreamSpec { element.futureValue shouldBe NotUsed } - "propagate attributes" in { + "propagate attributes" in assertAllStagesStopped { val source = Source .fromMaterializer { (_, attr) => Source.single(attr.nameLifted) @@ -52,7 +54,7 @@ class FromMaterializerSpec extends StreamSpec { source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } - "propagate attributes when nested" in { + "propagate attributes when nested" in assertAllStagesStopped { val source = Source .fromMaterializer { (_, _) => Source.fromMaterializer { (_, attr) => @@ -61,10 +63,10 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup") } - "preserve attributes of inner source" in { + "preserve attributes of inner source" in assertAllStagesStopped { val source = Source.fromMaterializer { (_, _) => Source .fromMaterializer { (_, attr) => @@ -76,7 +78,21 @@ class FromMaterializerSpec extends StreamSpec { source.runWith(Sink.head).futureValue shouldBe Some(MyAttribute()) } - "handle factory failure" in { + "give priority to attributes of inner source" in assertAllStagesStopped { + val source = Source + .fromMaterializer { (_, _) => + Source + .fromMaterializer { (_, attr) => + Source.single(attr.get[WhateverAttribute]) + } + .addAttributes(whateverAttribute("inner")) + } + .addAttributes(whateverAttribute("outer")) + + source.runWith(Sink.head).futureValue shouldBe Some(WhateverAttribute("inner")) + } + + "handle factory failure" in assertAllStagesStopped { val error = new Error("boom") val source = Source.fromMaterializer { (_, _) => throw error @@ -87,7 +103,7 @@ class FromMaterializerSpec extends StreamSpec { completion.failed.futureValue.getCause shouldBe error } - "handle materialization failure" in { + "handle materialization failure" in assertAllStagesStopped { val error = new Error("boom") val source = Source.fromMaterializer { (_, _) => Source.empty.mapMaterializedValue(_ => throw error) @@ -102,7 +118,7 @@ class FromMaterializerSpec extends StreamSpec { "Flow.fromMaterializer" should { - "expose materializer" in { + "expose materializer" in assertAllStagesStopped { val flow = Flow.fromMaterializer { (mat, _) => Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown)) } @@ -110,7 +126,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false } - "expose attributes" in { + "expose attributes" in assertAllStagesStopped { val flow = Flow.fromMaterializer { (_, attr) => Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList)) } @@ -118,7 +134,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty } - "propagate materialized value" in { + "propagate materialized value" in assertAllStagesStopped { val flow = Flow.fromMaterializer { (_, _) => Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right) } @@ -128,7 +144,7 @@ class FromMaterializerSpec extends StreamSpec { element.futureValue shouldBe NotUsed } - "propagate attributes" in { + "propagate attributes" in assertAllStagesStopped { val flow = Flow .fromMaterializer { (_, attr) => Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) @@ -138,7 +154,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") } - "propagate attributes when nested" in { + "propagate attributes when nested" in assertAllStagesStopped { val flow = Flow .fromMaterializer { (_, _) => Flow.fromMaterializer { (_, attr) => @@ -147,10 +163,10 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup") } - "preserve attributes of inner flow" in { + "preserve attributes of inner flow" in assertAllStagesStopped { val flow = Flow.fromMaterializer { (_, _) => Flow .fromMaterializer { (_, attr) => @@ -162,7 +178,21 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some(MyAttribute()) } - "handle factory failure" in { + "give priority to attributes of inner flow" in assertAllStagesStopped { + val flow = Flow + .fromMaterializer { (_, _) => + Flow + .fromMaterializer { (_, attr) => + Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.get[WhateverAttribute])) + } + .addAttributes(whateverAttribute("inner")) + } + .addAttributes(whateverAttribute("outer")) + + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some(WhateverAttribute("inner")) + } + + "handle factory failure" in assertAllStagesStopped { val error = new Error("boom") val flow = Flow.fromMaterializer { (_, _) => throw error @@ -173,7 +203,7 @@ class FromMaterializerSpec extends StreamSpec { completion.failed.futureValue.getCause shouldBe error } - "handle materialization failure" in { + "handle materialization failure" in assertAllStagesStopped { val error = new Error("boom") val flow = Flow.fromMaterializer { (_, _) => Flow[NotUsed].mapMaterializedValue(_ => throw error) @@ -188,7 +218,7 @@ class FromMaterializerSpec extends StreamSpec { "Sink.fromMaterializer" should { - "expose materializer" in { + "expose materializer" in assertAllStagesStopped { val sink = Sink.fromMaterializer { (mat, _) => Sink.fold(mat.isShutdown)(Keep.left) } @@ -196,7 +226,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.runWith(sink).flatten.futureValue shouldBe false } - "expose attributes" in { + "expose attributes" in assertAllStagesStopped { val sink = Sink.fromMaterializer { (_, attr) => Sink.fold(attr.attributeList)(Keep.left) } @@ -204,7 +234,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.runWith(sink).flatten.futureValue should not be empty } - "propagate materialized value" in { + "propagate materialized value" in assertAllStagesStopped { val sink = Sink.fromMaterializer { (_, _) => Sink.fold(NotUsed)(Keep.left) } @@ -212,17 +242,17 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.runWith(sink).flatten.futureValue shouldBe NotUsed } - "propagate attributes" in { + "propagate attributes" in assertAllStagesStopped { val sink = Sink .fromMaterializer { (_, attr) => Sink.fold(attr.nameLifted)(Keep.left) } .named("my-name") - Source.empty.runWith(sink).flatten.futureValue shouldBe Some("setup-my-name") + Source.empty.runWith(sink).flatten.futureValue shouldBe Some("my-name-setup") } - "propagate attributes when nested" in { + "propagate attributes when nested" in assertAllStagesStopped { val sink = Sink .fromMaterializer { (_, _) => Sink.fromMaterializer { (_, attr) => @@ -231,10 +261,10 @@ class FromMaterializerSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("setup-setup-my-name") + Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("my-name-setup-setup") } - "preserve attributes of inner sink" in { + "preserve attributes of inner sink" in assertAllStagesStopped { val sink = Sink.fromMaterializer { (_, _) => Sink .fromMaterializer { (_, attr) => @@ -246,7 +276,21 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some(MyAttribute()) } - "handle factory failure" in { + "give priority to attributes of inner sink" in assertAllStagesStopped { + val sink = Sink + .fromMaterializer { (_, _) => + Sink + .fromMaterializer { (_, attr) => + Sink.fold(attr.get[WhateverAttribute])(Keep.left) + } + .addAttributes(whateverAttribute("inner")) + } + .addAttributes(whateverAttribute("outer")) + + Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some(WhateverAttribute("inner")) + } + + "handle factory failure" in assertAllStagesStopped { val error = new Error("boom") val sink = Sink.fromMaterializer { (_, _) => throw error @@ -255,7 +299,7 @@ class FromMaterializerSpec extends StreamSpec { Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error } - "handle materialization failure" in { + "handle materialization failure" in assertAllStagesStopped { val error = new Error("boom") val sink = Sink.fromMaterializer { (_, _) => Sink.ignore.mapMaterializedValue(_ => throw error) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala index bbc6fafc90..6bf0d6754d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala @@ -59,7 +59,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - source.runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") + source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup") } "handle factory failure" in { @@ -133,7 +133,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") + Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup") } "handle factory failure" in { @@ -193,7 +193,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatten.futureValue shouldBe Some("setup-my-name") + Source.empty.runWith(sink).flatten.futureValue shouldBe Some("my-name-setup") } "propagate attributes when nested" in { @@ -205,7 +205,7 @@ class SetupSpec extends StreamSpec { } .named("my-name") - Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("setup-setup-my-name") + Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("my-name-setup-setup") } "handle factory failure" in { diff --git a/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30171-setup-stage.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30171-setup-stage.backwards.excludes new file mode 100644 index 0000000000..a4b515255d --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.14.backwards.excludes/30171-setup-stage.backwards.excludes @@ -0,0 +1,2 @@ +# Was @InternalApi private[stream] +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SetupSinkStage") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala index 805c8b82fb..a211a4c956 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala @@ -19,43 +19,6 @@ import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler -/** Internal Api */ -@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (Materializer, Attributes) => Sink[T, M]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { - - private val in = Inlet[T]("SetupSinkStage.in") - override val shape = SinkShape(in) - - override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M]() - (createStageLogic(matPromise), matPromise.future) - } - - private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) { - import SetupStage._ - - val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause))) - setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) - - override def preStart(): Unit = { - try { - val sink = factory(materializer, attributes) - - val mat = Source.fromGraph(subOutlet.source).runWith(sink.addAttributes(attributes))(subFusingMaterializer) - matPromise.success(mat) - } catch { - case NonFatal(ex) => - matPromise.failure(ex) - throw ex - } - } - } - -} - /** Internal Api */ @InternalApi private[stream] final class SetupFlowStage[T, U, M](factory: (Materializer, Attributes) => Flow[T, U, M]) extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { @@ -87,11 +50,9 @@ import akka.stream.stage.OutHandler try { val flow = factory(materializer, attributes) - val mat = Source - .fromGraph(subOutlet.source) - .viaMat(flow.addAttributes(attributes))(Keep.right) - .to(Sink.fromGraph(subInlet.sink)) - .run()(subFusingMaterializer) + val mat = subFusingMaterializer.materialize( + Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)), + attributes) matPromise.success(mat) } catch { case NonFatal(ex) => @@ -127,7 +88,7 @@ import akka.stream.stage.OutHandler try { val source = factory(materializer, attributes) - val mat = source.addAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer) + val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes) matPromise.success(mat) } catch { case NonFatal(ex) => diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 1f00cddee7..39b4a6f08c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -159,7 +159,13 @@ object Sink { * [[Attributes]] of the [[Sink]] returned by this method. */ def fromMaterializer[T, M](factory: (Materializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage(factory)) + Flow + .fromMaterializer({ (mat, attr) => + Flow.fromGraph(GraphDSL.create(factory(mat, attr)) { b => sink => + FlowShape(sink.in, b.materializedValue.outlet) + }) + }) + .to(Sink.head) /** * Defers the creation of a [[Sink]] until materialization. The `factory` function @@ -168,8 +174,9 @@ object Sink { */ @deprecated("Use 'fromMaterializer' instead", "2.6.0") def setup[T, M](factory: (ActorMaterializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = - Sink.fromGraph(new SetupSinkStage((materializer, attributes) => - factory(ActorMaterializerHelper.downcast(materializer), attributes))) + fromMaterializer { (mat, attr) => + factory(ActorMaterializerHelper.downcast(mat), attr) + } /** * Helper to create [[Sink]] from `Subscriber`.