diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index c14b759d9a..44989599f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -18,6 +18,7 @@ import akka.stream.impl.io.SslTlsCipherActor import akka.stream._ import akka.stream.io.SslTls.TlsModule import akka.stream.stage.Stage +import akka.stream.Attributes._ import org.reactivestreams._ import scala.concurrent.{ Await, ExecutionContextExecutor } @@ -292,11 +293,20 @@ private[akka] object ActorProcessorFactory { // Also, otherwise the attributes will not affect the settings properly! val settings = materializer.effectiveSettings(att) def interp(s: Stage[_, _]): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, att), ()) + def interpAttr(s: Stage[_, _], newAttributes: Attributes): (Props, Unit) = (ActorInterpreter.props(settings, List(s), materializer, newAttributes), ()) + def inputSizeAttr(n: Long) = { + if (n <= 0) + inputBuffer(initial = 1, max = 1) and att + else if (n <= materializer.settings.maxInputBufferSize) + inputBuffer(initial = n.toInt, max = n.toInt) and att + else + att + } op match { case Map(f, _) ⇒ interp(fusing.Map(f, settings.supervisionDecider)) case Filter(p, _) ⇒ interp(fusing.Filter(p, settings.supervisionDecider)) case Drop(n, _) ⇒ interp(fusing.Drop(n)) - case Take(n, _) ⇒ interp(fusing.Take(n)) + case Take(n, _) ⇒ interpAttr(fusing.Take(n), inputSizeAttr(n)) case TakeWhile(p, _) ⇒ interp(fusing.TakeWhile(p, settings.supervisionDecider)) case DropWhile(p, _) ⇒ interp(fusing.DropWhile(p, settings.supervisionDecider)) case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c29a0127d8..2c2d769935 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -78,7 +78,7 @@ private[stream] object Stages { val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") - val headSink = name("headSink") + val headSink = name("headSink") and inputBuffer(initial = 1, max = 1) val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink")