2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2019-01-02 18:55:26 +08:00
|
|
|
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
|
2015-11-25 19:58:48 +01:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2015-11-25 19:58:48 +01:00
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed
|
2017-03-16 21:04:07 +02:00
|
|
|
import akka.annotation.InternalApi
|
2015-11-25 19:58:48 +01:00
|
|
|
import akka.stream._
|
|
|
|
|
import akka.stream.scaladsl._
|
2017-03-16 21:04:07 +02:00
|
|
|
|
2015-11-25 19:58:48 +01:00
|
|
|
import language.higherKinds
|
|
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] object SubFlowImpl {
|
2019-03-11 10:38:24 +01:00
|
|
|
trait MergeBack[In, F[+ _]] {
|
2016-01-20 10:00:37 +02:00
|
|
|
def apply[T](f: Flow[In, T, NotUsed], breadth: Int): F[T]
|
2015-11-25 19:58:48 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2019-03-13 10:56:20 +01:00
|
|
|
@InternalApi private[akka] class SubFlowImpl[In, Out, Mat, F[+ _], C](
|
|
|
|
|
val subFlow: Flow[In, Out, NotUsed],
|
|
|
|
|
mergeBackFunction: SubFlowImpl.MergeBack[In, F],
|
|
|
|
|
finishFunction: Sink[In, NotUsed] => C)
|
2019-03-11 10:38:24 +01:00
|
|
|
extends SubFlow[Out, Mat, F, C] {
|
2015-11-25 19:58:48 +01:00
|
|
|
|
|
|
|
|
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =
|
|
|
|
|
new SubFlowImpl[In, T, Mat, F, C](subFlow.via(flow), mergeBackFunction, finishFunction)
|
|
|
|
|
|
|
|
|
|
override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
|
|
|
|
|
new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction)
|
|
|
|
|
|
2015-12-22 20:56:02 +01:00
|
|
|
override def addAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
|
|
|
|
|
new SubFlowImpl[In, Out, Mat, F, C](subFlow.addAttributes(attr), mergeBackFunction, finishFunction)
|
|
|
|
|
|
|
|
|
|
override def named(name: String): SubFlow[Out, Mat, F, C] =
|
|
|
|
|
new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction)
|
|
|
|
|
|
2016-02-10 13:56:38 +01:00
|
|
|
override def async: Repr[Out] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.async, mergeBackFunction, finishFunction)
|
|
|
|
|
|
2015-12-11 11:15:19 +01:00
|
|
|
override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth)
|
2015-11-25 19:58:48 +01:00
|
|
|
|
|
|
|
|
def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink))
|
|
|
|
|
}
|