Fix specificity of inherited attributes for {Source,Flow,Sink}.fromMaterializer (#30171) #30142

This commit is contained in:
Arman Bilge 2021-06-02 07:17:31 -07:00 committed by GitHub
parent 41d5f1571b
commit ab98618a25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 78 deletions

View file

@ -7,7 +7,9 @@ package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.Attributes.Attribute import akka.stream.Attributes.Attribute
import akka.stream.scaladsl.AttributesSpec.{ whateverAttribute, WhateverAttribute }
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.StreamTestKit._
class FromMaterializerSpec extends StreamSpec { class FromMaterializerSpec extends StreamSpec {
@ -16,7 +18,7 @@ class FromMaterializerSpec extends StreamSpec {
"Source.fromMaterializer" should { "Source.fromMaterializer" should {
"expose materializer" in { "expose materializer" in assertAllStagesStopped {
val source = Source.fromMaterializer { (mat, _) => val source = Source.fromMaterializer { (mat, _) =>
Source.single(mat.isShutdown) Source.single(mat.isShutdown)
} }
@ -24,7 +26,7 @@ class FromMaterializerSpec extends StreamSpec {
source.runWith(Sink.head).futureValue shouldBe false source.runWith(Sink.head).futureValue shouldBe false
} }
"expose attributes" in { "expose attributes" in assertAllStagesStopped {
val source = Source.fromMaterializer { (_, attr) => val source = Source.fromMaterializer { (_, attr) =>
Source.single(attr.attributeList) Source.single(attr.attributeList)
} }
@ -32,7 +34,7 @@ class FromMaterializerSpec extends StreamSpec {
source.runWith(Sink.head).futureValue should not be empty source.runWith(Sink.head).futureValue should not be empty
} }
"propagate materialized value" in { "propagate materialized value" in assertAllStagesStopped {
val source = Source.fromMaterializer { (_, _) => val source = Source.fromMaterializer { (_, _) =>
Source.maybe[NotUsed] Source.maybe[NotUsed]
} }
@ -42,7 +44,7 @@ class FromMaterializerSpec extends StreamSpec {
element.futureValue shouldBe NotUsed element.futureValue shouldBe NotUsed
} }
"propagate attributes" in { "propagate attributes" in assertAllStagesStopped {
val source = Source val source = Source
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
Source.single(attr.nameLifted) Source.single(attr.nameLifted)
@ -52,7 +54,7 @@ class FromMaterializerSpec extends StreamSpec {
source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name") source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
} }
"propagate attributes when nested" in { "propagate attributes when nested" in assertAllStagesStopped {
val source = Source val source = Source
.fromMaterializer { (_, _) => .fromMaterializer { (_, _) =>
Source.fromMaterializer { (_, attr) => Source.fromMaterializer { (_, attr) =>
@ -61,10 +63,10 @@ class FromMaterializerSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup")
} }
"preserve attributes of inner source" in { "preserve attributes of inner source" in assertAllStagesStopped {
val source = Source.fromMaterializer { (_, _) => val source = Source.fromMaterializer { (_, _) =>
Source Source
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
@ -76,7 +78,21 @@ class FromMaterializerSpec extends StreamSpec {
source.runWith(Sink.head).futureValue shouldBe Some(MyAttribute()) source.runWith(Sink.head).futureValue shouldBe Some(MyAttribute())
} }
"handle factory failure" in { "give priority to attributes of inner source" in assertAllStagesStopped {
val source = Source
.fromMaterializer { (_, _) =>
Source
.fromMaterializer { (_, attr) =>
Source.single(attr.get[WhateverAttribute])
}
.addAttributes(whateverAttribute("inner"))
}
.addAttributes(whateverAttribute("outer"))
source.runWith(Sink.head).futureValue shouldBe Some(WhateverAttribute("inner"))
}
"handle factory failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val source = Source.fromMaterializer { (_, _) => val source = Source.fromMaterializer { (_, _) =>
throw error throw error
@ -87,7 +103,7 @@ class FromMaterializerSpec extends StreamSpec {
completion.failed.futureValue.getCause shouldBe error completion.failed.futureValue.getCause shouldBe error
} }
"handle materialization failure" in { "handle materialization failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val source = Source.fromMaterializer { (_, _) => val source = Source.fromMaterializer { (_, _) =>
Source.empty.mapMaterializedValue(_ => throw error) Source.empty.mapMaterializedValue(_ => throw error)
@ -102,7 +118,7 @@ class FromMaterializerSpec extends StreamSpec {
"Flow.fromMaterializer" should { "Flow.fromMaterializer" should {
"expose materializer" in { "expose materializer" in assertAllStagesStopped {
val flow = Flow.fromMaterializer { (mat, _) => val flow = Flow.fromMaterializer { (mat, _) =>
Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown)) Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown))
} }
@ -110,7 +126,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe false
} }
"expose attributes" in { "expose attributes" in assertAllStagesStopped {
val flow = Flow.fromMaterializer { (_, attr) => val flow = Flow.fromMaterializer { (_, attr) =>
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList)) Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList))
} }
@ -118,7 +134,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty Source.empty.via(flow).runWith(Sink.head).futureValue should not be empty
} }
"propagate materialized value" in { "propagate materialized value" in assertAllStagesStopped {
val flow = Flow.fromMaterializer { (_, _) => val flow = Flow.fromMaterializer { (_, _) =>
Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right) Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right)
} }
@ -128,7 +144,7 @@ class FromMaterializerSpec extends StreamSpec {
element.futureValue shouldBe NotUsed element.futureValue shouldBe NotUsed
} }
"propagate attributes" in { "propagate attributes" in assertAllStagesStopped {
val flow = Flow val flow = Flow
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted))
@ -138,7 +154,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name") Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name")
} }
"propagate attributes when nested" in { "propagate attributes when nested" in assertAllStagesStopped {
val flow = Flow val flow = Flow
.fromMaterializer { (_, _) => .fromMaterializer { (_, _) =>
Flow.fromMaterializer { (_, attr) => Flow.fromMaterializer { (_, attr) =>
@ -147,10 +163,10 @@ class FromMaterializerSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup")
} }
"preserve attributes of inner flow" in { "preserve attributes of inner flow" in assertAllStagesStopped {
val flow = Flow.fromMaterializer { (_, _) => val flow = Flow.fromMaterializer { (_, _) =>
Flow Flow
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
@ -162,7 +178,21 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some(MyAttribute()) Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some(MyAttribute())
} }
"handle factory failure" in { "give priority to attributes of inner flow" in assertAllStagesStopped {
val flow = Flow
.fromMaterializer { (_, _) =>
Flow
.fromMaterializer { (_, attr) =>
Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.get[WhateverAttribute]))
}
.addAttributes(whateverAttribute("inner"))
}
.addAttributes(whateverAttribute("outer"))
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some(WhateverAttribute("inner"))
}
"handle factory failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val flow = Flow.fromMaterializer { (_, _) => val flow = Flow.fromMaterializer { (_, _) =>
throw error throw error
@ -173,7 +203,7 @@ class FromMaterializerSpec extends StreamSpec {
completion.failed.futureValue.getCause shouldBe error completion.failed.futureValue.getCause shouldBe error
} }
"handle materialization failure" in { "handle materialization failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val flow = Flow.fromMaterializer { (_, _) => val flow = Flow.fromMaterializer { (_, _) =>
Flow[NotUsed].mapMaterializedValue(_ => throw error) Flow[NotUsed].mapMaterializedValue(_ => throw error)
@ -188,7 +218,7 @@ class FromMaterializerSpec extends StreamSpec {
"Sink.fromMaterializer" should { "Sink.fromMaterializer" should {
"expose materializer" in { "expose materializer" in assertAllStagesStopped {
val sink = Sink.fromMaterializer { (mat, _) => val sink = Sink.fromMaterializer { (mat, _) =>
Sink.fold(mat.isShutdown)(Keep.left) Sink.fold(mat.isShutdown)(Keep.left)
} }
@ -196,7 +226,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.runWith(sink).flatten.futureValue shouldBe false Source.empty.runWith(sink).flatten.futureValue shouldBe false
} }
"expose attributes" in { "expose attributes" in assertAllStagesStopped {
val sink = Sink.fromMaterializer { (_, attr) => val sink = Sink.fromMaterializer { (_, attr) =>
Sink.fold(attr.attributeList)(Keep.left) Sink.fold(attr.attributeList)(Keep.left)
} }
@ -204,7 +234,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.runWith(sink).flatten.futureValue should not be empty Source.empty.runWith(sink).flatten.futureValue should not be empty
} }
"propagate materialized value" in { "propagate materialized value" in assertAllStagesStopped {
val sink = Sink.fromMaterializer { (_, _) => val sink = Sink.fromMaterializer { (_, _) =>
Sink.fold(NotUsed)(Keep.left) Sink.fold(NotUsed)(Keep.left)
} }
@ -212,17 +242,17 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.runWith(sink).flatten.futureValue shouldBe NotUsed Source.empty.runWith(sink).flatten.futureValue shouldBe NotUsed
} }
"propagate attributes" in { "propagate attributes" in assertAllStagesStopped {
val sink = Sink val sink = Sink
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
Sink.fold(attr.nameLifted)(Keep.left) Sink.fold(attr.nameLifted)(Keep.left)
} }
.named("my-name") .named("my-name")
Source.empty.runWith(sink).flatten.futureValue shouldBe Some("setup-my-name") Source.empty.runWith(sink).flatten.futureValue shouldBe Some("my-name-setup")
} }
"propagate attributes when nested" in { "propagate attributes when nested" in assertAllStagesStopped {
val sink = Sink val sink = Sink
.fromMaterializer { (_, _) => .fromMaterializer { (_, _) =>
Sink.fromMaterializer { (_, attr) => Sink.fromMaterializer { (_, attr) =>
@ -231,10 +261,10 @@ class FromMaterializerSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("setup-setup-my-name") Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("my-name-setup-setup")
} }
"preserve attributes of inner sink" in { "preserve attributes of inner sink" in assertAllStagesStopped {
val sink = Sink.fromMaterializer { (_, _) => val sink = Sink.fromMaterializer { (_, _) =>
Sink Sink
.fromMaterializer { (_, attr) => .fromMaterializer { (_, attr) =>
@ -246,7 +276,21 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some(MyAttribute()) Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some(MyAttribute())
} }
"handle factory failure" in { "give priority to attributes of inner sink" in assertAllStagesStopped {
val sink = Sink
.fromMaterializer { (_, _) =>
Sink
.fromMaterializer { (_, attr) =>
Sink.fold(attr.get[WhateverAttribute])(Keep.left)
}
.addAttributes(whateverAttribute("inner"))
}
.addAttributes(whateverAttribute("outer"))
Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some(WhateverAttribute("inner"))
}
"handle factory failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val sink = Sink.fromMaterializer { (_, _) => val sink = Sink.fromMaterializer { (_, _) =>
throw error throw error
@ -255,7 +299,7 @@ class FromMaterializerSpec extends StreamSpec {
Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error
} }
"handle materialization failure" in { "handle materialization failure" in assertAllStagesStopped {
val error = new Error("boom") val error = new Error("boom")
val sink = Sink.fromMaterializer { (_, _) => val sink = Sink.fromMaterializer { (_, _) =>
Sink.ignore.mapMaterializedValue(_ => throw error) Sink.ignore.mapMaterializedValue(_ => throw error)

View file

@ -59,7 +59,7 @@ class SetupSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
source.runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") source.runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup")
} }
"handle factory failure" in { "handle factory failure" in {
@ -133,7 +133,7 @@ class SetupSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-setup-my-name") Source.empty.via(flow).runWith(Sink.head).futureValue shouldBe Some("setup-my-name-setup")
} }
"handle factory failure" in { "handle factory failure" in {
@ -193,7 +193,7 @@ class SetupSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
Source.empty.runWith(sink).flatten.futureValue shouldBe Some("setup-my-name") Source.empty.runWith(sink).flatten.futureValue shouldBe Some("my-name-setup")
} }
"propagate attributes when nested" in { "propagate attributes when nested" in {
@ -205,7 +205,7 @@ class SetupSpec extends StreamSpec {
} }
.named("my-name") .named("my-name")
Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("setup-setup-my-name") Source.empty.runWith(sink).flatten.flatten.futureValue shouldBe Some("my-name-setup-setup")
} }
"handle factory failure" in { "handle factory failure" in {

View file

@ -0,0 +1,2 @@
# Was @InternalApi private[stream]
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SetupSinkStage")

View file

@ -19,43 +19,6 @@ import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler import akka.stream.stage.OutHandler
/** Internal Api */
@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (Materializer, Attributes) => Sink[T, M])
extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
private val in = Inlet[T]("SetupSinkStage.in")
override val shape = SinkShape(in)
override protected def initialAttributes: Attributes = Attributes.name("setup") and SourceLocation.forLambda(factory)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val matPromise = Promise[M]()
(createStageLogic(matPromise), matPromise.future)
}
private def createStageLogic(matPromise: Promise[M]) = new GraphStageLogic(shape) {
import SetupStage._
val subOutlet = new SubSourceOutlet[T]("SetupSinkStage")
subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause)))
setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet))
override def preStart(): Unit = {
try {
val sink = factory(materializer, attributes)
val mat = Source.fromGraph(subOutlet.source).runWith(sink.addAttributes(attributes))(subFusingMaterializer)
matPromise.success(mat)
} catch {
case NonFatal(ex) =>
matPromise.failure(ex)
throw ex
}
}
}
}
/** Internal Api */ /** Internal Api */
@InternalApi private[stream] final class SetupFlowStage[T, U, M](factory: (Materializer, Attributes) => Flow[T, U, M]) @InternalApi private[stream] final class SetupFlowStage[T, U, M](factory: (Materializer, Attributes) => Flow[T, U, M])
extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] {
@ -87,11 +50,9 @@ import akka.stream.stage.OutHandler
try { try {
val flow = factory(materializer, attributes) val flow = factory(materializer, attributes)
val mat = Source val mat = subFusingMaterializer.materialize(
.fromGraph(subOutlet.source) Source.fromGraph(subOutlet.source).viaMat(flow)(Keep.right).to(Sink.fromGraph(subInlet.sink)),
.viaMat(flow.addAttributes(attributes))(Keep.right) attributes)
.to(Sink.fromGraph(subInlet.sink))
.run()(subFusingMaterializer)
matPromise.success(mat) matPromise.success(mat)
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>
@ -127,7 +88,7 @@ import akka.stream.stage.OutHandler
try { try {
val source = factory(materializer, attributes) val source = factory(materializer, attributes)
val mat = source.addAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer) val mat = subFusingMaterializer.materialize(source.to(Sink.fromGraph(subInlet.sink)), attributes)
matPromise.success(mat) matPromise.success(mat)
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>

View file

@ -159,7 +159,13 @@ object Sink {
* [[Attributes]] of the [[Sink]] returned by this method. * [[Attributes]] of the [[Sink]] returned by this method.
*/ */
def fromMaterializer[T, M](factory: (Materializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = def fromMaterializer[T, M](factory: (Materializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] =
Sink.fromGraph(new SetupSinkStage(factory)) Flow
.fromMaterializer({ (mat, attr) =>
Flow.fromGraph(GraphDSL.create(factory(mat, attr)) { b => sink =>
FlowShape(sink.in, b.materializedValue.outlet)
})
})
.to(Sink.head)
/** /**
* Defers the creation of a [[Sink]] until materialization. The `factory` function * Defers the creation of a [[Sink]] until materialization. The `factory` function
@ -168,8 +174,9 @@ object Sink {
*/ */
@deprecated("Use 'fromMaterializer' instead", "2.6.0") @deprecated("Use 'fromMaterializer' instead", "2.6.0")
def setup[T, M](factory: (ActorMaterializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = def setup[T, M](factory: (ActorMaterializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] =
Sink.fromGraph(new SetupSinkStage((materializer, attributes) => fromMaterializer { (mat, attr) =>
factory(ActorMaterializerHelper.downcast(materializer), attributes))) factory(ActorMaterializerHelper.downcast(mat), attr)
}
/** /**
* Helper to create [[Sink]] from `Subscriber`. * Helper to create [[Sink]] from `Subscriber`.