diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala index e44a5cfc65..7d8dadc897 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -4,15 +4,16 @@ package akka.stream.scaladsl +import akka.stream.Attributes.Attribute +import akka.stream.scaladsl.AttributesSpec.AttributesFlow + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.annotation.nowarn import akka.{ Done, NotUsed } -import akka.stream.AbruptStageTerminationException -import akka.stream.Materializer -import akka.stream.NeverMaterializedException +import akka.stream.{ AbruptStageTerminationException, Attributes, Materializer, NeverMaterializedException } import akka.stream.testkit.StreamSpec import akka.stream.testkit.TestPublisher import akka.stream.testkit.Utils._ @@ -27,7 +28,10 @@ class LazyFlowSpec extends StreamSpec(""" akka.stream.materializer.max-input-buffer-size = 1 """) { + import system.dispatcher val ex = TE("") + case class MyAttribute() extends Attribute + val myAttributes = Attributes(MyAttribute()) "Flow.lazyFlow" must { // more complete test coverage is for lazyFutureFlow since this is composition of that @@ -44,6 +48,16 @@ class LazyFlowSpec extends StreamSpec(""" deferredMatVal.isCompleted should ===(true) } + "provide attributes to inner flow" in assertAllStagesStopped { + val attributes = Source + .single(Done) + .viaMat(Flow.lazyFlow(() => Flow.fromGraph(new AttributesFlow())))(Keep.right) + .addAttributes(myAttributes) + .to(Sink.head) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } "Flow.futureFlow" must { @@ -60,6 +74,17 @@ class LazyFlowSpec extends StreamSpec(""" list.futureValue should equal(Seq("1", "2", "3")) deferredMatVal.isCompleted should ===(true) } + + "provide attributes to inner flow" in assertAllStagesStopped { + val attributes = Source + .single(Done) + .viaMat(Flow.futureFlow(Future(Flow.fromGraph(new AttributesFlow()))))(Keep.right) + .addAttributes(myAttributes) + .to(Sink.head) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } "Flow.lazyFutureFlow" must { @@ -229,6 +254,17 @@ class LazyFlowSpec extends StreamSpec(""" list.failed.futureValue shouldBe an[AbruptStageTerminationException] deferredMatVal.failed.futureValue shouldBe an[AbruptStageTerminationException] } + + "provide attributes to inner flow" in assertAllStagesStopped { + val attributes = Source + .single(Done) + .viaMat(Flow.lazyFutureFlow(() => Future(Flow.fromGraph(new AttributesFlow()))))(Keep.right) + .addAttributes(myAttributes) + .to(Sink.head) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } "The deprecated LazyFlow ops" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 08dab96974..d8fc6d6790 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -5,17 +5,16 @@ package akka.stream.scaladsl import java.util.concurrent.TimeoutException - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import scala.annotation.nowarn - -import akka.NotUsed +import akka.{ Done, NotUsed } +import akka.stream.Attributes.Attribute import akka.stream._ +import akka.stream.scaladsl.AttributesSpec.AttributesSink import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.testkit.StreamSpec @@ -31,7 +30,10 @@ class LazySinkSpec extends StreamSpec(""" akka.stream.materializer.max-input-buffer-size = 1 """) { + import system.dispatcher val ex = TE("") + case class MyAttribute() extends Attribute + val myAttributes = Attributes(MyAttribute()) "A LazySink" must { "work in happy case" in assertAllStagesStopped { @@ -158,6 +160,16 @@ class LazySinkSpec extends StreamSpec(""" innerMatVal.failed.futureValue should ===(MyException) } + + "provide attributes to inner sink" in assertAllStagesStopped { + val attributes = Source + .single(Done) + .toMat(Sink.lazyFutureSink(() => Future(Sink.fromGraph(new AttributesSink()))))(Keep.right) + .addAttributes(myAttributes) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index 542d4ec37c..86f873f215 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -5,16 +5,15 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicBoolean - import scala.collection.immutable.Seq import scala.concurrent.Future import scala.concurrent.Promise - import org.scalatest.concurrent.ScalaFutures - import akka.Done import akka.NotUsed +import akka.stream.Attributes.Attribute import akka.stream._ +import akka.stream.scaladsl.AttributesSpec.AttributesSource import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.testkit.StreamSpec @@ -28,6 +27,8 @@ import akka.testkit.TestProbe class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import system.dispatcher + case class MyAttribute() extends Attribute + val myAttributes = Attributes(MyAttribute()) "Source.lazySingle" must { "work like a normal source, happy path" in assertAllStagesStopped { @@ -247,6 +248,17 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { killswitch.abort(boom) doneF.failed.futureValue should ===(boom) } + + "provide attributes to inner source" in assertAllStagesStopped { + val attributes = Source + .lazySource(() => Source.fromGraph(new AttributesSource())) + .addAttributes(myAttributes) + .buffer(1, OverflowStrategy.backpressure) + .to(Sink.cancelled) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } "Source.lazyFutureSource" must { @@ -415,6 +427,16 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { terminationF.failed.futureValue should ===(boom) } + "provide attributes to inner source" in assertAllStagesStopped { + val attributes = Source + .lazyFutureSource(() => Future(Source.fromGraph(new AttributesSource()))) + .addAttributes(myAttributes) + .buffer(1, OverflowStrategy.backpressure) + .to(Sink.cancelled) + .run() + + attributes.futureValue.get[MyAttribute] should contain(MyAttribute()) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 45a8861465..eb19b20578 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -28,9 +28,7 @@ import akka.stream.impl.QueueSink.Output import akka.stream.impl.QueueSink.Pull import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.SinkQueueWithCancel -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Keep, Sink, SinkQueueWithCancel, Source } import akka.stream.stage._ import akka.util.ccompat._ @@ -590,7 +588,8 @@ import akka.util.ccompat._ val subOutlet = new SubSourceOutlet[T]("LazySink") - val matVal = Source.fromGraph(subOutlet.source).runWith(sink)(interpreter.subFusingMaterializer) + val matVal = interpreter.subFusingMaterializer + .materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right), inheritedAttributes) def maybeCompleteStage(): Unit = { if (isClosed(in) && subOutlet.isClosed) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala index aa37382147..6b11d8497d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala @@ -147,7 +147,7 @@ import akka.util.OptionVal val matVal = try { val flow = f(prefix) val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink) - interpreter.subFusingMaterializer.materialize(runnableGraph) + interpreter.subFusingMaterializer.materialize(runnableGraph, inheritedAttributes) } catch { case NonFatal(ex) => matPromise.failure(new NeverMaterializedException(ex)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala index d3f3e21a3b..5fe47b8dfd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala @@ -111,8 +111,9 @@ import scala.util.{ Failure, Success, Try } } } try { - val matVal = - Source.fromGraph(subSource.source).viaMat(flow)(Keep.right).to(subSink.sink).run()(subFusingMaterializer) + val matVal = subFusingMaterializer.materialize( + Source.fromGraph(subSource.source).viaMat(flow)(Keep.right).to(subSink.sink), + inheritedAttributes) innerMatValue.success(matVal) upstreamFailure match { case OptionVal.Some(ex) => subSource.fail(ex)