Akka 30374 flat map prefix attributes propagation (#30388)

This commit is contained in:
eyal farago 2021-07-12 18:04:14 +03:00 committed by GitHub
parent 59e3bc8648
commit 4f059265f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,13 +4,18 @@
package akka.stream.scaladsl
import akka.stream.Attributes.Attribute
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.{ Done, NotUsed }
import akka.stream.{
AbruptStageTerminationException,
AbruptTerminationException,
Attributes,
FlowShape,
Inlet,
Materializer,
NeverMaterializedException,
Outlet,
SubscriptionWithCancelException
}
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
@ -615,4 +620,54 @@ class FlowFlatMapPrefixSpec extends StreamSpec {
}
}
}
"attributes propagation" must {
case class CustomAttribute(n: Int) extends Attribute
class WithAttr[A] extends GraphStage[FlowShape[A, (A, Option[CustomAttribute])]] {
override val shape: FlowShape[A, (A, Option[CustomAttribute])] =
FlowShape(Inlet[A]("in"), Outlet[(A, Option[CustomAttribute])]("out"))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
val attr = inheritedAttributes.get[CustomAttribute]
setHandlers(shape.in, shape.out, this)
override def onPush(): Unit = push(shape.out, grab(shape.in) -> attr)
override def onPull(): Unit = pull(shape.in)
}
}
def withAttr[A] = Flow.fromGraph(new WithAttr[A])
"baseline behaviour" in {
Source
.single("1")
.via(withAttr)
.map(_._2)
.withAttributes(Attributes(CustomAttribute(42)))
.runWith(Sink.head)
.futureValue should be(Some(CustomAttribute(42)))
}
"propagate attribute applied to flatMapPrefix" in {
Source
.single("1")
.flatMapPrefix(0) { _ =>
Flow[String].via(withAttr).map(_._2)
}
.withAttributes(Attributes(CustomAttribute(42)))
.runWith(Sink.head)
.futureValue should be(Some(CustomAttribute(42)))
}
"respect attributes override" in {
Source
.single("1")
.flatMapPrefix(0) { _ =>
Flow[String].via(withAttr).map(_._2).withAttributes(Attributes(CustomAttribute(24)))
}
.withAttributes(Attributes(CustomAttribute(42)))
.runWith(Sink.head)
.futureValue should be(Some(CustomAttribute(24)))
}
}
}