Inherit attributes from Restart{Source,Flow,Sink} #24810

This commit is contained in:
Arman Bilge 2021-03-15 04:47:37 -07:00 committed by GitHub
parent 573a97debb
commit 1ff2950ab6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 6 deletions

View file

@ -5,13 +5,20 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ Await, Promise }
import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.stream.Attributes.Name
import akka.stream.scaladsl.AttributesSpec.{
whateverAttribute,
AttributesFlow,
AttributesSink,
AttributesSource,
WhateverAttribute
}
import akka.stream.{ Attributes, OverflowStrategy, RestartSettings } import akka.stream.{ Attributes, OverflowStrategy, RestartSettings }
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
@ -315,6 +322,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
probe.cancel() probe.cancel()
} }
"provide attributes to inner source" in assertAllStagesStopped {
val promisedAttributes = Promise[Attributes]()
RestartSource
.withBackoff(restartSettings) { () =>
Source.fromGraph(new AttributesSource().named("inner-name")).mapMaterializedValue(promisedAttributes.success)
}
.withAttributes(whateverAttribute("other-thing"))
.named("outer-name")
.runWith(Sink.cancelled)
val attributes = Await.result(promisedAttributes.future, 1.second)
attributes.get[Name] should contain(Name("inner-name"))
attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing"))
}
} }
"A restart with backoff sink" should { "A restart with backoff sink" should {
@ -550,6 +572,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
sinkProbe.cancel() sinkProbe.cancel()
probe.sendComplete() probe.sendComplete()
} }
"provide attributes to inner sink" in assertAllStagesStopped {
val promisedAttributes = Promise[Attributes]()
RestartSink
.withBackoff(restartSettings) { () =>
Sink.fromGraph(new AttributesSink().named("inner-name")).mapMaterializedValue(promisedAttributes.success)
}
.withAttributes(whateverAttribute("other-thing"))
.named("outer-name")
.runWith(Source.empty)
val attributes = Await.result(promisedAttributes.future, 1.second)
attributes.get[Name] should contain(Name("inner-name"))
attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing"))
}
} }
"A restart with backoff flow" should { "A restart with backoff flow" should {
@ -878,5 +915,20 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
elements shouldEqual List(1, 2, 4, 5, 7) elements shouldEqual List(1, 2, 4, 5, 7)
flowCreations.get() shouldEqual 3 flowCreations.get() shouldEqual 3
} }
"provide attributes to inner flow" in assertAllStagesStopped {
val promisedAttributes = Promise[Attributes]()
RestartFlow
.withBackoff(restartSettings) { () =>
Flow.fromGraph(new AttributesFlow().named("inner-name")).mapMaterializedValue(promisedAttributes.success)
}
.withAttributes(whateverAttribute("other-thing"))
.named("outer-name")
.runWith(Source.empty, Sink.ignore)
val attributes = Await.result(promisedAttributes.future, 1.second)
attributes.get[Name] should contain(Name("inner-name"))
attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing"))
}
} }
} }

View file

@ -199,12 +199,13 @@ private final class RestartWithBackoffFlow[In, Out](
val sourceOut: SubSourceOutlet[In] = createSubOutlet(in) val sourceOut: SubSourceOutlet[In] = createSubOutlet(in)
val sinkIn: SubSinkInlet[Out] = createSubInlet(out) val sinkIn: SubSinkInlet[Out] = createSubInlet(out)
Source val graph = Source
.fromGraph(sourceOut.source) .fromGraph(sourceOut.source)
// Temp fix while waiting cause of cancellation. See #23909 // Temp fix while waiting cause of cancellation. See #23909
.via(RestartWithBackoffFlow.delayCancellation[In](delay)) .via(RestartWithBackoffFlow.delayCancellation[In](delay))
.via(flowFactory()) .via(flowFactory())
.runWith(sinkIn.sink)(subFusingMaterializer) .to(sinkIn.sink)
subFusingMaterializer.materialize(graph, inheritedAttributes)
if (isAvailable(out)) { if (isAvailable(out)) {
sinkIn.pull() sinkIn.pull()

View file

@ -120,7 +120,7 @@ private final class RestartWithBackoffSink[T](sinkFactory: () => Sink[T, _], res
override protected def startGraph() = { override protected def startGraph() = {
val sourceOut = createSubOutlet(in) val sourceOut = createSubOutlet(in)
Source.fromGraph(sourceOut.source).runWith(sinkFactory())(subFusingMaterializer) subFusingMaterializer.materialize(Source.fromGraph(sourceOut.source).to(sinkFactory()), inheritedAttributes)
} }
override protected def backoff() = { override protected def backoff() = {

View file

@ -195,7 +195,7 @@ private final class RestartWithBackoffSource[T](
override protected def startGraph() = { override protected def startGraph() = {
val sinkIn = createSubInlet(out) val sinkIn = createSubInlet(out)
sourceFactory().runWith(sinkIn.sink)(subFusingMaterializer) subFusingMaterializer.materialize(sourceFactory().to(sinkIn.sink), inheritedAttributes)
if (isAvailable(out)) { if (isAvailable(out)) {
sinkIn.pull() sinkIn.pull()
} }