chore: Fix flaky test in unsafeOptionalDataVia (#1611)
This commit is contained in:
parent
53761e549a
commit
7184dadf4a
1 changed files with 31 additions and 23 deletions
|
|
@ -57,36 +57,44 @@ object SourceWithContext {
|
|||
SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) {
|
||||
implicit b => (s, viaF) =>
|
||||
import GraphDSL.Implicits._
|
||||
val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2))
|
||||
val merge = b.add(Merge[(Option[FOut], Ctx)](2))
|
||||
|
||||
val unzip = b.add(Unzip[SOut, Ctx]())
|
||||
val zipper = b.add(Zip[FOut, Ctx]())
|
||||
case class IndexedCtx(idx: Long, ctx: Ctx)
|
||||
val partition = b.add(Partition[(Option[SOut], IndexedCtx)](2,
|
||||
{
|
||||
case (None, _) => 0
|
||||
case (Some(_), _) => 1
|
||||
}))
|
||||
|
||||
val filterAvailable = Flow[(Option[SOut], Ctx)].collect {
|
||||
case (Some(f), ctx) => (f, ctx)
|
||||
val sequence = Flow[(Option[SOut], Ctx)].zipWithIndex
|
||||
.map {
|
||||
case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx))
|
||||
}
|
||||
|
||||
val unzip = b.add(Unzip[Option[SOut], IndexedCtx]())
|
||||
val zipper = b.add(Zip[FOut, IndexedCtx]())
|
||||
val mergeSequence = b.add(MergeSequence[(Option[FOut], IndexedCtx)](2)(_._2.idx))
|
||||
val unwrapSome = b.add(Flow[Option[SOut]].map {
|
||||
case Some(elem) => elem
|
||||
case _ => throw new IllegalStateException("Only expects Some")
|
||||
})
|
||||
val unwrap = Flow[(Option[FOut], IndexedCtx)].map {
|
||||
case (opt, indexedCtx) => (opt, indexedCtx.ctx)
|
||||
}
|
||||
|
||||
val filterUnavailable = Flow[(Option[SOut], Ctx)].collect {
|
||||
case (None, ctx) => (Option.empty[FOut], ctx)
|
||||
val mapIntoOption = Flow[(FOut, IndexedCtx)].map {
|
||||
case (elem, indexedCtx) => (Some(elem), indexedCtx)
|
||||
}
|
||||
|
||||
val mapIntoOption = Flow[(FOut, Ctx)].map {
|
||||
case (f, ctx) => (Some(f), ctx)
|
||||
}
|
||||
//format: off
|
||||
s ~> sequence ~> partition.in
|
||||
partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0)
|
||||
partition.out(1) ~> unzip.in
|
||||
unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0
|
||||
unzip.out1 ~> zipper.in1
|
||||
zipper.out ~> mapIntoOption ~> mergeSequence.in(1)
|
||||
|
||||
s ~> broadcast.in
|
||||
|
||||
broadcast.out(0) ~> filterAvailable ~> unzip.in
|
||||
|
||||
unzip.out0 ~> viaF ~> zipper.in0
|
||||
unzip.out1 ~> zipper.in1
|
||||
|
||||
zipper.out ~> mapIntoOption ~> merge.in(0)
|
||||
|
||||
broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
|
||||
|
||||
SourceShape(merge.out)
|
||||
//format: on
|
||||
SourceShape((mergeSequence.out ~> unwrap).outlet)
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue