=str #19781 Use overriden InputBuffer attribute

* Sink.{queue, actorRefWithAck} and StreamConverters.{asInputStream,
  asOutputStream} now use overriden/inherited `InputBuffer` attribute
* They now use their default attributes as initial attributes.
This commit is contained in:
Bojan Petrovic 2016-02-15 13:38:37 +01:00
parent a83f08d4ab
commit b4f507cdd6
11 changed files with 69 additions and 17 deletions

View file

@ -8,6 +8,7 @@ import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.Attributes.inputBuffer
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.InputStreamSinkStage
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
@ -219,4 +220,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
inputStream.close()
}
}
"fail to materialize with zero sized input buffer" in {
an[IllegalArgumentException] shouldBe thrownBy {
Source.single(byteString)
.runWith(StreamConverters.asInputStream(timeout).withAttributes(inputBuffer(0, 0)))
/*
With Source.single we test the code path in which the sink
itself throws an exception when being materialized. If
Source.empty is used, the same exception is thrown by
Materializer.
*/
}
}
}

View file

@ -8,10 +8,11 @@ import java.util.concurrent.TimeoutException
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.Attributes.inputBuffer
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.io.OutputStreamSourceStage
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
import akka.stream.scaladsl.{ Keep, StreamConverters }
import akka.stream.scaladsl.{ Keep, StreamConverters, Sink }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
@ -147,5 +148,19 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
sourceProbe.expectMsg(GraphStageMessages.DownstreamFinish)
the[Exception] thrownBy outputStream.write(bytesArray) shouldBe a[IOException]
}
"fail to materialize with zero sized input buffer" in {
an[IllegalArgumentException] shouldBe thrownBy {
StreamConverters.asOutputStream(timeout)
.withAttributes(inputBuffer(0, 0))
.runWith(Sink.head)
/*
With Sink.head we test the code path in which the source
itself throws an exception when being materialized. If
Sink.ignore is used, the same exception is thrown by
Materializer.
*/
}
}
}
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.ActorMaterializer
import akka.stream.Attributes.inputBuffer
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
@ -130,6 +131,16 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec with ScalaFutures with Conve
expectMsg(completeMessage)
}
"fail to materialize with zero sized input buffer" in {
val fw = createActor(classOf[Fw])
an[IllegalArgumentException] shouldBe thrownBy {
val badSink = Sink
.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)
.withAttributes(inputBuffer(0, 0))
Source.single(()).runWith(badSink)
}
}
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import akka.actor.Status
import akka.pattern.pipe
import akka.stream.Attributes.inputBuffer
import akka.stream.{ OverflowStrategy, ActorMaterializer }
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, _ }
@ -129,5 +130,10 @@ class QueueSinkSpec extends AkkaSpec with ScalaFutures {
}
"fail to materialize with zero sized input buffer" in {
an[IllegalArgumentException] shouldBe thrownBy {
Source.single(()).runWith(Sink.queue().withAttributes(inputBuffer(0, 0)))
}
}
}
}

View file

@ -7,6 +7,7 @@ import java.util
import akka.actor._
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.GraphStageLogic.StageActor
import akka.stream.{ Inlet, SinkShape, ActorMaterializer, Attributes }
import akka.stream.Attributes.InputBuffer
@ -21,15 +22,16 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa
onFailureMessage: (Throwable) Any)
extends GraphStage[SinkShape[In]] {
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
override def initialAttributes = DefaultAttributes.actorRefWithAck
override val shape: SinkShape[In] = SinkShape(in)
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
implicit def self: ActorRef = stageActor.ref
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
val buffer: util.Deque[In] = new util.ArrayDeque[In]()
var acknowledgementReceived = false
var completeReceived = false

View file

@ -289,13 +289,14 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
override def initialAttributes = DefaultAttributes.queueSink
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
type Received[E] = Try[Option[E]]
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
var buffer: Buffer[Received[T]] = _

View file

@ -107,6 +107,7 @@ private[stream] object Stages {
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val actorRefSink = name("actorRefSink")
val actorRefWithAck = name("actorRefWithAckSink")
val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink")
val outputStreamSink = name("outputStreamSink") and IODispatcher

View file

@ -6,6 +6,7 @@ package akka.stream.impl.io
import java.io.{ IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.InputStreamSinkStage._
import akka.stream.stage._
import akka.util.ByteString
@ -36,13 +37,13 @@ private[stream] object InputStreamSinkStage {
final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] {
val in = Inlet[ByteString]("InputStreamSink.in")
override def initialAttributes: Attributes = DefaultAttributes.inputStreamSink
override val shape: SinkShape[ByteString] = SinkShape.of(in)
// has to be in this order as module depends on shape
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InputStream) = {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2)
val logic = new GraphStageLogic(shape) with StageWithCallback {

View file

@ -9,6 +9,7 @@ import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.{ Outlet, SourceShape, Attributes }
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._
import akka.util.ByteString
@ -34,13 +35,13 @@ private[stream] object OutputStreamSourceStage {
final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] {
val out = Outlet[ByteString]("OutputStreamSource.out")
override def initialAttributes = DefaultAttributes.outputStreamSource
override val shape: SourceShape[ByteString] = SourceShape.of(out)
// has to be in this order as module depends on shape
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutputStream) = {
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer)
val downstreamStatus = new AtomicReference[DownstreamStatus](Ok)

View file

@ -340,6 +340,6 @@ object Sink {
* @see [[akka.stream.SinkQueue]]
*/
def queue[T](): Sink[T, SinkQueue[T]] =
Sink.fromGraph(new QueueSink().withAttributes(DefaultAttributes.queueSink))
Sink.fromGraph(new QueueSink())
}

View file

@ -51,7 +51,7 @@ object StreamConverters {
* @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds
*/
def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] =
Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource)
Source.fromGraph(new OutputStreamSourceStage(writeTimeout))
/**
* Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function.
@ -78,6 +78,6 @@ object StreamConverters {
* @param readTimeout the max time the read operation on the materialized InputStream should block
*/
def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] =
Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink)
Sink.fromGraph(new InputStreamSinkStage(readTimeout))
}