=str #19193 remove buffer from InputStreamSource

This commit is contained in:
Martynas Mickevičius 2015-12-18 16:56:28 +02:00
parent dcfa56e547
commit ed7ea755bb
3 changed files with 39 additions and 58 deletions

View file

@ -4,11 +4,13 @@
package akka.stream.io
import java.io.InputStream
import java.util.concurrent.CountDownLatch
import akka.stream.scaladsl.StreamConverters
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.StreamConverters
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
@ -37,6 +39,28 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF
f.futureValue should ===(ByteString("abc"))
}
"emit as soon as read" in assertAllStagesStopped {
val latch = new CountDownLatch(1)
val probe = StreamConverters.fromInputStream(() new InputStream {
@volatile var emitted = false
override def read(): Int = {
if (!emitted) {
emitted = true
'M'.toInt
} else {
latch.await()
-1
}
}
}, chunkSize = 1)
.runWith(TestSink.probe)
probe.request(4)
probe.expectNext(ByteString("M"))
latch.countDown()
probe.expectComplete()
}
}
}

View file

@ -50,13 +50,12 @@ private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStrea
extends SourceModule[ByteString, Future[Long]](shape) {
override def create(context: MaterializationContext) = {
val materializer = ActorMaterializer.downcast(context.materializer)
val settings = materializer.effectiveSettings(context.effectiveAttributes)
val bytesReadPromise = Promise[Long]()
val pub = try {
val is = createInputStream() // can throw, i.e. FileNotFound
val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize)
val props = InputStreamPublisher.props(is, bytesReadPromise, chunkSize)
val ref = materializer.actorOf(context, props)
akka.stream.actor.ActorPublisher[ByteString](ref)

View file

@ -17,19 +17,17 @@ import scala.concurrent.Promise
/** INTERNAL API */
private[akka] object InputStreamPublisher {
def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int): Props = {
def props(is: InputStream, completionPromise: Promise[Long], chunkSize: Int): Props = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)")
require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)")
Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize, initialBuffer, maxBuffer).withDeploy(Deploy.local)
Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local)
}
private final case object Continue extends DeadLetterSuppression
}
/** INTERNAL API */
private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Promise[Long], chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {
@ -37,82 +35,42 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom
import InputStreamPublisher._
var eofReachedAtOffset = Long.MinValue
val arr = Array.ofDim[Byte](chunkSize)
var readBytesTotal = 0L
var availableChunks: Vector[ByteString] = Vector.empty
override def preStart() = {
try {
readAndSignal(initialBuffer)
} catch {
case ex: Exception
onErrorThenStop(ex)
}
super.preStart()
}
def receive = {
case ActorPublisherMessage.Request(elements) readAndSignal(maxBuffer)
case Continue readAndSignal(maxBuffer)
case ActorPublisherMessage.Request(elements) readAndSignal()
case Continue readAndSignal()
case ActorPublisherMessage.Cancel context.stop(self)
}
def readAndSignal(readAhead: Int): Unit =
def readAndSignal(): Unit =
if (isActive) {
// signal from available buffer right away
signalOnNexts()
// read chunks until readAhead is fulfilled
while (availableChunks.length < readAhead && !eofEncountered && isActive)
loadChunk()
readAndEmit()
if (totalDemand > 0) self ! Continue
else if (availableChunks.isEmpty) signalOnNexts()
}
@tailrec private def signalOnNexts(): Unit =
if (availableChunks.nonEmpty) {
if (totalDemand > 0) {
val ready = availableChunks.head
availableChunks = availableChunks.tail
onNext(ready)
if (totalDemand > 0) signalOnNexts()
}
} else if (eofEncountered) onCompleteThenStop()
/** BLOCKING I/O READ */
def loadChunk() = try {
// this is used directly by ByteString1C, therefore create a new one every time
val arr = Array.ofDim[Byte](chunkSize)
def readAndEmit() = try {
// blocking read
val readBytes = is.read(arr)
readBytes match {
case -1
// had nothing to read into this chunk
eofReachedAtOffset = readBytes
log.debug("No more bytes available to read (got `-1` or `0` from `read`), marking final bytes of file @ " + eofReachedAtOffset)
log.debug("No more bytes available to read (got `-1` from `read`)")
onCompleteThenStop()
case _
readBytesTotal += readBytes
if (readBytes == chunkSize) availableChunks :+= ByteString1C(arr)
else availableChunks :+= ByteString1C(arr).take(readBytes)
// valid read, continue
// emit immediately, as this is the only chance to do it before we might block again
onNext(ByteString.fromArray(arr, 0, readBytes))
}
} catch {
case ex: Exception
onErrorThenStop(ex)
}
private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue
override def postStop(): Unit = {
super.postStop()
bytesReadPromise.trySuccess(readBytesTotal)