From ed7ea755bbfdac580d0d1844232465d64df4abcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 18 Dec 2015 16:56:28 +0200 Subject: [PATCH] =str #19193 remove buffer from InputStreamSource --- .../stream/io/InputStreamSourceSpec.scala | 26 ++++++- .../scala/akka/stream/impl/io/IOSources.scala | 3 +- .../stream/impl/io/InputStreamPublisher.scala | 68 ++++--------------- 3 files changed, 39 insertions(+), 58 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index 3adcae3150..dd388b0f6c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -4,11 +4,13 @@ package akka.stream.io import java.io.InputStream +import java.util.concurrent.CountDownLatch -import akka.stream.scaladsl.StreamConverters import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.StreamConverters import akka.stream.testkit._ import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures @@ -37,6 +39,28 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF f.futureValue should ===(ByteString("abc")) } + + "emit as soon as read" in assertAllStagesStopped { + val latch = new CountDownLatch(1) + val probe = StreamConverters.fromInputStream(() ⇒ new InputStream { + @volatile var emitted = false + override def read(): Int = { + if (!emitted) { + emitted = true + 'M'.toInt + } else { + latch.await() + -1 + } + } + }, chunkSize = 1) + .runWith(TestSink.probe) + + probe.request(4) + probe.expectNext(ByteString("M")) + latch.countDown() + probe.expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 84c3f8d577..2880364714 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -50,13 +50,12 @@ private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStrea extends SourceModule[ByteString, Future[Long]](shape) { override def create(context: MaterializationContext) = { val materializer = ActorMaterializer.downcast(context.materializer) - val settings = materializer.effectiveSettings(context.effectiveAttributes) val bytesReadPromise = Promise[Long]() val pub = try { val is = createInputStream() // can throw, i.e. FileNotFound - val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize) val ref = materializer.actorOf(context, props) akka.stream.actor.ActorPublisher[ByteString](ref) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index b4a4c37770..f2eec6ef86 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -17,19 +17,17 @@ import scala.concurrent.Promise /** INTERNAL API */ private[akka] object InputStreamPublisher { - def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int): Props = { + def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int): Props = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") - require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") - require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") - Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize, initialBuffer, maxBuffer).withDeploy(Deploy.local) + Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local) } private final case object Continue extends DeadLetterSuppression } /** INTERNAL API */ -private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { @@ -37,82 +35,42 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom import InputStreamPublisher._ - var eofReachedAtOffset = Long.MinValue - + val arr = Array.ofDim[Byte](chunkSize) var readBytesTotal = 0L - var availableChunks: Vector[ByteString] = Vector.empty - - override def preStart() = { - try { - readAndSignal(initialBuffer) - } catch { - case ex: Exception ⇒ - onErrorThenStop(ex) - } - - super.preStart() - } def receive = { - case ActorPublisherMessage.Request(elements) ⇒ readAndSignal(maxBuffer) - case Continue ⇒ readAndSignal(maxBuffer) + case ActorPublisherMessage.Request(elements) ⇒ readAndSignal() + case Continue ⇒ readAndSignal() case ActorPublisherMessage.Cancel ⇒ context.stop(self) } - def readAndSignal(readAhead: Int): Unit = + def readAndSignal(): Unit = if (isActive) { - // signal from available buffer right away - signalOnNexts() - - // read chunks until readAhead is fulfilled - while (availableChunks.length < readAhead && !eofEncountered && isActive) - loadChunk() - + readAndEmit() if (totalDemand > 0) self ! Continue - else if (availableChunks.isEmpty) signalOnNexts() } - @tailrec private def signalOnNexts(): Unit = - if (availableChunks.nonEmpty) { - if (totalDemand > 0) { - val ready = availableChunks.head - availableChunks = availableChunks.tail - - onNext(ready) - - if (totalDemand > 0) signalOnNexts() - } - } else if (eofEncountered) onCompleteThenStop() - - /** BLOCKING I/O READ */ - def loadChunk() = try { - - // this is used directly by ByteString1C, therefore create a new one every time - val arr = Array.ofDim[Byte](chunkSize) - + def readAndEmit() = try { // blocking read val readBytes = is.read(arr) readBytes match { case -1 ⇒ // had nothing to read into this chunk - eofReachedAtOffset = readBytes - log.debug("No more bytes available to read (got `-1` or `0` from `read`), marking final bytes of file @ " + eofReachedAtOffset) + log.debug("No more bytes available to read (got `-1` from `read`)") + onCompleteThenStop() case _ ⇒ readBytesTotal += readBytes - if (readBytes == chunkSize) availableChunks :+= ByteString1C(arr) - else availableChunks :+= ByteString1C(arr).take(readBytes) - // valid read, continue + // emit immediately, as this is the only chance to do it before we might block again + onNext(ByteString.fromArray(arr, 0, readBytes)) } } catch { case ex: Exception ⇒ onErrorThenStop(ex) } - private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue - override def postStop(): Unit = { super.postStop() bytesReadPromise.trySuccess(readBytesTotal)