From 1ff2950ab6eee20db0b6d3a1f2e7b2fe0126a386 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 15 Mar 2021 04:47:37 -0700 Subject: [PATCH] Inherit attributes from Restart{Source,Flow,Sink} #24810 --- .../akka/stream/scaladsl/RestartSpec.scala | 56 ++++++++++++++++++- .../akka/stream/scaladsl/RestartFlow.scala | 5 +- .../akka/stream/scaladsl/RestartSink.scala | 2 +- .../akka/stream/scaladsl/RestartSource.scala | 2 +- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 5147a4f65f..1c946ed2dc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -5,13 +5,20 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicInteger - -import scala.concurrent.Promise +import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import akka.Done import akka.NotUsed +import akka.stream.Attributes.Name +import akka.stream.scaladsl.AttributesSpec.{ + whateverAttribute, + AttributesFlow, + AttributesSink, + AttributesSource, + WhateverAttribute +} import akka.stream.{ Attributes, OverflowStrategy, RestartSettings } import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.testkit.StreamSpec @@ -315,6 +322,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 probe.cancel() } + + "provide attributes to inner source" in assertAllStagesStopped { + val promisedAttributes = Promise[Attributes]() + RestartSource + .withBackoff(restartSettings) { () => + Source.fromGraph(new AttributesSource().named("inner-name")).mapMaterializedValue(promisedAttributes.success) + } + .withAttributes(whateverAttribute("other-thing")) + .named("outer-name") + .runWith(Sink.cancelled) + + val attributes = Await.result(promisedAttributes.future, 1.second) + attributes.get[Name] should contain(Name("inner-name")) + attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing")) + } } "A restart with backoff sink" should { @@ -550,6 +572,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 sinkProbe.cancel() probe.sendComplete() } + + "provide attributes to inner sink" in assertAllStagesStopped { + val promisedAttributes = Promise[Attributes]() + RestartSink + .withBackoff(restartSettings) { () => + Sink.fromGraph(new AttributesSink().named("inner-name")).mapMaterializedValue(promisedAttributes.success) + } + .withAttributes(whateverAttribute("other-thing")) + .named("outer-name") + .runWith(Source.empty) + + val attributes = Await.result(promisedAttributes.future, 1.second) + attributes.get[Name] should contain(Name("inner-name")) + attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing")) + } } "A restart with backoff flow" should { @@ -878,5 +915,20 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 elements shouldEqual List(1, 2, 4, 5, 7) flowCreations.get() shouldEqual 3 } + + "provide attributes to inner flow" in assertAllStagesStopped { + val promisedAttributes = Promise[Attributes]() + RestartFlow + .withBackoff(restartSettings) { () => + Flow.fromGraph(new AttributesFlow().named("inner-name")).mapMaterializedValue(promisedAttributes.success) + } + .withAttributes(whateverAttribute("other-thing")) + .named("outer-name") + .runWith(Source.empty, Sink.ignore) + + val attributes = Await.result(promisedAttributes.future, 1.second) + attributes.get[Name] should contain(Name("inner-name")) + attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing")) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index 85af275976..5d523a21cf 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -199,12 +199,13 @@ private final class RestartWithBackoffFlow[In, Out]( val sourceOut: SubSourceOutlet[In] = createSubOutlet(in) val sinkIn: SubSinkInlet[Out] = createSubInlet(out) - Source + val graph = Source .fromGraph(sourceOut.source) // Temp fix while waiting cause of cancellation. See #23909 .via(RestartWithBackoffFlow.delayCancellation[In](delay)) .via(flowFactory()) - .runWith(sinkIn.sink)(subFusingMaterializer) + .to(sinkIn.sink) + subFusingMaterializer.materialize(graph, inheritedAttributes) if (isAvailable(out)) { sinkIn.pull() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala index 66e6b51a27..aa1d224b1c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSink.scala @@ -120,7 +120,7 @@ private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], res override protected def startGraph() = { val sourceOut = createSubOutlet(in) - Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer) + subFusingMaterializer.materialize(Source.fromGraph(sourceOut.source).to(sinkFactory()), inheritedAttributes) } override protected def backoff() = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala index 59165e30c8..a09d5463a7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSource.scala @@ -195,7 +195,7 @@ private final class RestartWithBackoffSource[T]( override protected def startGraph() = { val sinkIn = createSubInlet(out) - sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer) + subFusingMaterializer.materialize(sourceFactory().to(sinkIn.sink), inheritedAttributes) if (isAvailable(out)) { sinkIn.pull() }