This commit is contained in:
parent
ba3f6a6c14
commit
2fa2766cb1
1 changed files with 0 additions and 25 deletions
|
|
@ -91,31 +91,6 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
|
|||
outProbe.expectError() shouldEqual TE("OMG Who set that on fire!?!")
|
||||
}
|
||||
|
||||
val attributesSource = Source.fromGraph(
|
||||
new GraphStage[SourceShape[Attributes]] {
|
||||
val out = Outlet[Attributes]("AttributesSource.out")
|
||||
override val shape: SourceShape[Attributes] = SourceShape(out)
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
push(out, inheritedAttributes)
|
||||
completeStage()
|
||||
}
|
||||
setHandler(out, this)
|
||||
}
|
||||
})
|
||||
|
||||
"propagate attributes to inner streams" in assertAllStagesStopped {
|
||||
val f = Source.single(attributesSource.addAttributes(Attributes.name("inner")))
|
||||
.flatMapMerge(1, identity)
|
||||
.addAttributes(Attributes.name("outer"))
|
||||
.runWith(Sink.head)
|
||||
|
||||
val attributes = f.futureValue.attributeList
|
||||
attributes should contain(Attributes.Name("inner"))
|
||||
attributes should contain(Attributes.Name("outer"))
|
||||
attributes.indexOf(Attributes.Name("inner")) < attributes.indexOf(Attributes.Name("outer")) should be(true)
|
||||
}
|
||||
|
||||
"fail correctly when materialization of inner source fails" in assertAllStagesStopped {
|
||||
val matFail = TE("fail!")
|
||||
object FailingInnerMat extends GraphStage[SourceShape[String]] {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue