Inner stages of lazy/future stages should inherit attributes (#30161)

* Add tests for attributes inheritance
* Fix inheritance bugs revealed by tests
This commit is contained in:
Arman Bilge 2021-06-02 03:48:14 -07:00 committed by GitHub
parent fe1459fcbc
commit e714f7bdbc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 17 deletions

View file

@ -4,15 +4,16 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.Attributes.Attribute
import akka.stream.scaladsl.AttributesSpec.AttributesFlow
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.annotation.nowarn import scala.annotation.nowarn
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import akka.stream.AbruptStageTerminationException import akka.stream.{ AbruptStageTerminationException, Attributes, Materializer, NeverMaterializedException }
import akka.stream.Materializer
import akka.stream.NeverMaterializedException
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestPublisher
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
@ -27,7 +28,10 @@ class LazyFlowSpec extends StreamSpec("""
akka.stream.materializer.max-input-buffer-size = 1 akka.stream.materializer.max-input-buffer-size = 1
""") { """) {
import system.dispatcher
val ex = TE("") val ex = TE("")
case class MyAttribute() extends Attribute
val myAttributes = Attributes(MyAttribute())
"Flow.lazyFlow" must { "Flow.lazyFlow" must {
// more complete test coverage is for lazyFutureFlow since this is composition of that // more complete test coverage is for lazyFutureFlow since this is composition of that
@ -44,6 +48,16 @@ class LazyFlowSpec extends StreamSpec("""
deferredMatVal.isCompleted should ===(true) deferredMatVal.isCompleted should ===(true)
} }
"provide attributes to inner flow" in assertAllStagesStopped {
val attributes = Source
.single(Done)
.viaMat(Flow.lazyFlow(() => Flow.fromGraph(new AttributesFlow())))(Keep.right)
.addAttributes(myAttributes)
.to(Sink.head)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
"Flow.futureFlow" must { "Flow.futureFlow" must {
@ -60,6 +74,17 @@ class LazyFlowSpec extends StreamSpec("""
list.futureValue should equal(Seq("1", "2", "3")) list.futureValue should equal(Seq("1", "2", "3"))
deferredMatVal.isCompleted should ===(true) deferredMatVal.isCompleted should ===(true)
} }
"provide attributes to inner flow" in assertAllStagesStopped {
val attributes = Source
.single(Done)
.viaMat(Flow.futureFlow(Future(Flow.fromGraph(new AttributesFlow()))))(Keep.right)
.addAttributes(myAttributes)
.to(Sink.head)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
"Flow.lazyFutureFlow" must { "Flow.lazyFutureFlow" must {
@ -229,6 +254,17 @@ class LazyFlowSpec extends StreamSpec("""
list.failed.futureValue shouldBe an[AbruptStageTerminationException] list.failed.futureValue shouldBe an[AbruptStageTerminationException]
deferredMatVal.failed.futureValue shouldBe an[AbruptStageTerminationException] deferredMatVal.failed.futureValue shouldBe an[AbruptStageTerminationException]
} }
"provide attributes to inner flow" in assertAllStagesStopped {
val attributes = Source
.single(Done)
.viaMat(Flow.lazyFutureFlow(() => Future(Flow.fromGraph(new AttributesFlow()))))(Keep.right)
.addAttributes(myAttributes)
.to(Sink.head)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
"The deprecated LazyFlow ops" must { "The deprecated LazyFlow ops" must {

View file

@ -5,17 +5,16 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.annotation.nowarn import scala.annotation.nowarn
import akka.{ Done, NotUsed }
import akka.NotUsed import akka.stream.Attributes.Attribute
import akka.stream._ import akka.stream._
import akka.stream.scaladsl.AttributesSpec.AttributesSink
import akka.stream.stage.GraphStage import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageLogic
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
@ -31,7 +30,10 @@ class LazySinkSpec extends StreamSpec("""
akka.stream.materializer.max-input-buffer-size = 1 akka.stream.materializer.max-input-buffer-size = 1
""") { """) {
import system.dispatcher
val ex = TE("") val ex = TE("")
case class MyAttribute() extends Attribute
val myAttributes = Attributes(MyAttribute())
"A LazySink" must { "A LazySink" must {
"work in happy case" in assertAllStagesStopped { "work in happy case" in assertAllStagesStopped {
@ -158,6 +160,16 @@ class LazySinkSpec extends StreamSpec("""
innerMatVal.failed.futureValue should ===(MyException) innerMatVal.failed.futureValue should ===(MyException)
} }
"provide attributes to inner sink" in assertAllStagesStopped {
val attributes = Source
.single(Done)
.toMat(Sink.lazyFutureSink(() => Future(Sink.fromGraph(new AttributesSink()))))(Keep.right)
.addAttributes(myAttributes)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
} }

View file

@ -5,16 +5,15 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.immutable.Seq import scala.collection.immutable.Seq
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.stream.Attributes.Attribute
import akka.stream._ import akka.stream._
import akka.stream.scaladsl.AttributesSpec.AttributesSource
import akka.stream.stage.GraphStage import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageLogic
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
@ -28,6 +27,8 @@ import akka.testkit.TestProbe
class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import system.dispatcher import system.dispatcher
case class MyAttribute() extends Attribute
val myAttributes = Attributes(MyAttribute())
"Source.lazySingle" must { "Source.lazySingle" must {
"work like a normal source, happy path" in assertAllStagesStopped { "work like a normal source, happy path" in assertAllStagesStopped {
@ -247,6 +248,17 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
killswitch.abort(boom) killswitch.abort(boom)
doneF.failed.futureValue should ===(boom) doneF.failed.futureValue should ===(boom)
} }
"provide attributes to inner source" in assertAllStagesStopped {
val attributes = Source
.lazySource(() => Source.fromGraph(new AttributesSource()))
.addAttributes(myAttributes)
.buffer(1, OverflowStrategy.backpressure)
.to(Sink.cancelled)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
"Source.lazyFutureSource" must { "Source.lazyFutureSource" must {
@ -415,6 +427,16 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
terminationF.failed.futureValue should ===(boom) terminationF.failed.futureValue should ===(boom)
} }
"provide attributes to inner source" in assertAllStagesStopped {
val attributes = Source
.lazyFutureSource(() => Future(Source.fromGraph(new AttributesSource())))
.addAttributes(myAttributes)
.buffer(1, OverflowStrategy.backpressure)
.to(Sink.cancelled)
.run()
attributes.futureValue.get[MyAttribute] should contain(MyAttribute())
}
} }
} }

View file

@ -28,9 +28,7 @@ import akka.stream.impl.QueueSink.Output
import akka.stream.impl.QueueSink.Pull import akka.stream.impl.QueueSink.Pull
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.{ Keep, Sink, SinkQueueWithCancel, Source }
import akka.stream.scaladsl.SinkQueueWithCancel
import akka.stream.scaladsl.Source
import akka.stream.stage._ import akka.stream.stage._
import akka.util.ccompat._ import akka.util.ccompat._
@ -590,7 +588,8 @@ import akka.util.ccompat._
val subOutlet = new SubSourceOutlet[T]("LazySink") val subOutlet = new SubSourceOutlet[T]("LazySink")
val matVal = Source.fromGraph(subOutlet.source).runWith(sink)(interpreter.subFusingMaterializer) val matVal = interpreter.subFusingMaterializer
.materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right), inheritedAttributes)
def maybeCompleteStage(): Unit = { def maybeCompleteStage(): Unit = {
if (isClosed(in) && subOutlet.isClosed) { if (isClosed(in) && subOutlet.isClosed) {

View file

@ -147,7 +147,7 @@ import akka.util.OptionVal
val matVal = try { val matVal = try {
val flow = f(prefix) val flow = f(prefix)
val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink) val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink)
interpreter.subFusingMaterializer.materialize(runnableGraph) interpreter.subFusingMaterializer.materialize(runnableGraph, inheritedAttributes)
} catch { } catch {
case NonFatal(ex) => case NonFatal(ex) =>
matPromise.failure(new NeverMaterializedException(ex)) matPromise.failure(new NeverMaterializedException(ex))

View file

@ -111,8 +111,9 @@ import scala.util.{ Failure, Success, Try }
} }
} }
try { try {
val matVal = val matVal = subFusingMaterializer.materialize(
Source.fromGraph(subSource.source).viaMat(flow)(Keep.right).to(subSink.sink).run()(subFusingMaterializer) Source.fromGraph(subSource.source).viaMat(flow)(Keep.right).to(subSink.sink),
inheritedAttributes)
innerMatValue.success(matVal) innerMatValue.success(matVal)
upstreamFailure match { upstreamFailure match {
case OptionVal.Some(ex) => subSource.fail(ex) case OptionVal.Some(ex) => subSource.fail(ex)