diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala index 3766933e3d..2689c3ebb6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala @@ -4,13 +4,18 @@ package akka.stream.scaladsl +import akka.stream.Attributes.Attribute +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.{ Done, NotUsed } import akka.stream.{ AbruptStageTerminationException, AbruptTerminationException, Attributes, + FlowShape, + Inlet, Materializer, NeverMaterializedException, + Outlet, SubscriptionWithCancelException } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } @@ -615,4 +620,54 @@ class FlowFlatMapPrefixSpec extends StreamSpec { } } } + + "attributes propagation" must { + case class CustomAttribute(n: Int) extends Attribute + class WithAttr[A] extends GraphStage[FlowShape[A, (A, Option[CustomAttribute])]] { + override val shape: FlowShape[A, (A, Option[CustomAttribute])] = + FlowShape(Inlet[A]("in"), Outlet[(A, Option[CustomAttribute])]("out")) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + val attr = inheritedAttributes.get[CustomAttribute] + + setHandlers(shape.in, shape.out, this) + + override def onPush(): Unit = push(shape.out, grab(shape.in) -> attr) + + override def onPull(): Unit = pull(shape.in) + } + } + def withAttr[A] = Flow.fromGraph(new WithAttr[A]) + "baseline behaviour" in { + Source + .single("1") + .via(withAttr) + .map(_._2) + .withAttributes(Attributes(CustomAttribute(42))) + .runWith(Sink.head) + .futureValue should be(Some(CustomAttribute(42))) + } + + "propagate attribute applied to flatMapPrefix" in { + Source + .single("1") + .flatMapPrefix(0) { _ => + Flow[String].via(withAttr).map(_._2) + } + .withAttributes(Attributes(CustomAttribute(42))) + .runWith(Sink.head) + .futureValue should be(Some(CustomAttribute(42))) + } + + "respect attributes override" in { + Source + .single("1") + .flatMapPrefix(0) { _ => + Flow[String].via(withAttr).map(_._2).withAttributes(Attributes(CustomAttribute(24))) + } + .withAttributes(Attributes(CustomAttribute(42))) + .runWith(Sink.head) + .futureValue should be(Some(CustomAttribute(24))) + } + } }