=str #16967 add buffer to headSink and "take" state
This commit is contained in:
parent
6ffd238e68
commit
cb0fc1999f
2 changed files with 12 additions and 2 deletions
|
|
@ -18,6 +18,7 @@ import akka.stream.impl.io.SslTlsCipherActor
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.io.SslTls.TlsModule
|
import akka.stream.io.SslTls.TlsModule
|
||||||
import akka.stream.stage.Stage
|
import akka.stream.stage.Stage
|
||||||
|
import akka.stream.Attributes._
|
||||||
import org.reactivestreams._
|
import org.reactivestreams._
|
||||||
|
|
||||||
import scala.concurrent.{ Await, ExecutionContextExecutor }
|
import scala.concurrent.{ Await, ExecutionContextExecutor }
|
||||||
|
|
@ -292,11 +293,20 @@ private[akka] object ActorProcessorFactory {
|
||||||
// Also, otherwise the attributes will not affect the settings properly!
|
// Also, otherwise the attributes will not affect the settings properly!
|
||||||
val settings = materializer.effectiveSettings(att)
|
val settings = materializer.effectiveSettings(att)
|
||||||
def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ())
|
def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ())
|
||||||
|
def interpAttr(s: Stage[_, _], newAttributes: Attributes): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, newAttributes), ())
|
||||||
|
def inputSizeAttr(n: Long) = {
|
||||||
|
if (n <= 0)
|
||||||
|
inputBuffer(initial = 1, max = 1) and att
|
||||||
|
else if (n <= materializer.settings.maxInputBufferSize)
|
||||||
|
inputBuffer(initial = n.toInt, max = n.toInt) and att
|
||||||
|
else
|
||||||
|
att
|
||||||
|
}
|
||||||
op match {
|
op match {
|
||||||
case Map(f, _) ⇒ interp(fusing.Map(f, settings.supervisionDecider))
|
case Map(f, _) ⇒ interp(fusing.Map(f, settings.supervisionDecider))
|
||||||
case Filter(p, _) ⇒ interp(fusing.Filter(p, settings.supervisionDecider))
|
case Filter(p, _) ⇒ interp(fusing.Filter(p, settings.supervisionDecider))
|
||||||
case Drop(n, _) ⇒ interp(fusing.Drop(n))
|
case Drop(n, _) ⇒ interp(fusing.Drop(n))
|
||||||
case Take(n, _) ⇒ interp(fusing.Take(n))
|
case Take(n, _) ⇒ interpAttr(fusing.Take(n), inputSizeAttr(n))
|
||||||
case TakeWhile(p, _) ⇒ interp(fusing.TakeWhile(p, settings.supervisionDecider))
|
case TakeWhile(p, _) ⇒ interp(fusing.TakeWhile(p, settings.supervisionDecider))
|
||||||
case DropWhile(p, _) ⇒ interp(fusing.DropWhile(p, settings.supervisionDecider))
|
case DropWhile(p, _) ⇒ interp(fusing.DropWhile(p, settings.supervisionDecider))
|
||||||
case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider))
|
case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider))
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ private[stream] object Stages {
|
||||||
|
|
||||||
val subscriberSink = name("subscriberSink")
|
val subscriberSink = name("subscriberSink")
|
||||||
val cancelledSink = name("cancelledSink")
|
val cancelledSink = name("cancelledSink")
|
||||||
val headSink = name("headSink")
|
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
|
||||||
val publisherSink = name("publisherSink")
|
val publisherSink = name("publisherSink")
|
||||||
val fanoutPublisherSink = name("fanoutPublisherSink")
|
val fanoutPublisherSink = name("fanoutPublisherSink")
|
||||||
val ignoreSink = name("ignoreSink")
|
val ignoreSink = name("ignoreSink")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue