diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 50d94dcc73..40bd9cf43a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -8,6 +8,7 @@ import java.util.concurrent.TimeoutException import akka.actor.ActorSystem import akka.stream._ +import akka.stream.Attributes.inputBuffer import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.InputStreamSinkStage import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } @@ -219,4 +220,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { inputStream.close() } } + + "fail to materialize with zero sized input buffer" in { + an[IllegalArgumentException] shouldBe thrownBy { + Source.single(byteString) + .runWith(StreamConverters.asInputStream(timeout).withAttributes(inputBuffer(0, 0))) + /* + With Source.single we test the code path in which the sink + itself throws an exception when being materialized. If + Source.empty is used, the same exception is thrown by + Materializer. + */ + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index f3433d9ddf..0718a4f342 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -8,10 +8,11 @@ import java.util.concurrent.TimeoutException import akka.actor.ActorSystem import akka.stream._ +import akka.stream.Attributes.inputBuffer import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.OutputStreamSourceStage import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } -import akka.stream.scaladsl.{ Keep, StreamConverters } +import akka.stream.scaladsl.{ Keep, StreamConverters, Sink } import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink @@ -147,5 +148,19 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { sourceProbe.expectMsg(GraphStageMessages.DownstreamFinish) the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException] } + + "fail to materialize with zero sized input buffer" in { + an[IllegalArgumentException] shouldBe thrownBy { + StreamConverters.asOutputStream(timeout) + .withAttributes(inputBuffer(0, 0)) + .runWith(Sink.head) + /* + With Sink.head we test the code path in which the source + itself throws an exception when being materialized. If + Sink.ignore is used, the same exception is thrown by + Materializer. + */ + } + } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index ec76249bce..48ba42b8e4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.actor.{ Actor, ActorRef, Props } import akka.stream.ActorMaterializer +import akka.stream.Attributes.inputBuffer import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ @@ -130,6 +131,16 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec with ScalaFutures with Conve expectMsg(completeMessage) } + "fail to materialize with zero sized input buffer" in { + val fw = createActor(classOf[Fw]) + an[IllegalArgumentException] shouldBe thrownBy { + val badSink = Sink + .actorRefWithAck(fw, initMessage, ackMessage, completeMessage) + .withAttributes(inputBuffer(0, 0)) + Source.single(()).runWith(badSink) + } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 8f63552522..4ee4ce1498 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.actor.Status import akka.pattern.pipe +import akka.stream.Attributes.inputBuffer import akka.stream.{ OverflowStrategy, ActorMaterializer } import akka.stream.testkit.Utils._ import akka.stream.testkit.{ AkkaSpec, _ } @@ -129,5 +130,10 @@ class QueueSinkSpec extends AkkaSpec with ScalaFutures { } + "fail to materialize with zero sized input buffer" in { + an[IllegalArgumentException] shouldBe thrownBy { + Source.single(()).runWith(Sink.queue().withAttributes(inputBuffer(0, 0))) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 4bc54b9c9c..e6403eca08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -7,6 +7,7 @@ import java.util import akka.actor._ import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch } +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage.GraphStageLogic.StageActor import akka.stream.{ Inlet, SinkShape, ActorMaterializer, Attributes } import akka.stream.Attributes.InputBuffer @@ -21,15 +22,16 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa onFailureMessage: (Throwable) ⇒ Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") + override def initialAttributes = DefaultAttributes.actorRefWithAck override val shape: SinkShape[In] = SinkShape(in) - val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - require(maxBuffer > 0, "Buffer size must be greater than 0") - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { implicit def self: ActorRef = stageActor.ref + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + val buffer: util.Deque[In] = new util.ArrayDeque[In]() var acknowledgementReceived = false var completeReceived = false diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 6e50d56fa9..4fcb2c5db6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -289,13 +289,14 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") + override def initialAttributes = DefaultAttributes.queueSink override val shape: SinkShape[T] = SinkShape.of(in) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { type Received[E] = Try[Option[E]] - val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") var buffer: Buffer[Received[T]] = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 56fa6817e4..39103e2715 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -107,6 +107,7 @@ private[stream] object Stages { val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") + val actorRefWithAck = name("actorRefWithAckSink") val actorSubscriberSink = name("actorSubscriberSink") val queueSink = name("queueSink") val outputStreamSink = name("outputStreamSink") and IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 514180bb3f..28ce639476 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.{ IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } import akka.stream.Attributes.InputBuffer +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.InputStreamSinkStage._ import akka.stream.stage._ import akka.util.ByteString @@ -36,13 +37,13 @@ private[stream] object InputStreamSinkStage { final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] { val in = Inlet[ByteString]("InputStreamSink.in") + override def initialAttributes: Attributes = DefaultAttributes.inputStreamSink override val shape: SinkShape[ByteString] = SinkShape.of(in) - // has to be in this order as module depends on shape - val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - require(maxBuffer > 0, "Buffer size must be greater than 0") - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InputStream) = { + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2) val logic = new GraphStageLogic(shape) with StageWithCallback { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index d222250380..ee02467e0f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -9,6 +9,7 @@ import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } import akka.stream.{ Outlet, SourceShape, Attributes } import akka.stream.Attributes.InputBuffer +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.stage._ import akka.util.ByteString @@ -34,13 +35,13 @@ private[stream] object OutputStreamSourceStage { final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] { val out = Outlet[ByteString]("OutputStreamSource.out") + override def initialAttributes = DefaultAttributes.outputStreamSource override val shape: SourceShape[ByteString] = SourceShape.of(out) - // has to be in this order as module depends on shape - val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - require(maxBuffer > 0, "Buffer size must be greater than 0") - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = { + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val downstreamStatus = new AtomicReference[DownstreamStatus](Ok) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 2f20a67b1a..21a216f34e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -340,6 +340,6 @@ object Sink { * @see [[akka.stream.SinkQueue]] */ def queue[T](): Sink[T, SinkQueue[T]] = - Sink.fromGraph(new QueueSink().withAttributes(DefaultAttributes.queueSink)) + Sink.fromGraph(new QueueSink()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 130235913b..95d82441f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -51,7 +51,7 @@ object StreamConverters { * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds */ def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = - Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource) + Source.fromGraph(new OutputStreamSourceStage(writeTimeout)) /** * Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. @@ -78,6 +78,6 @@ object StreamConverters { * @param readTimeout the max time the read operation on the materialized InputStream should block */ def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = - Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink) + Sink.fromGraph(new InputStreamSinkStage(readTimeout)) }