Merge pull request #19262 from ktoso/wip-fix-inputstreamsource-ktoso

=htc,str #19256 InputStreamPublisher could emit without pending demnd
This commit is contained in:
Konrad Malawski 2015-12-22 23:26:23 +01:00
commit baebe079ff
4 changed files with 45 additions and 6 deletions

View file

@ -91,7 +91,7 @@ trait FileAndResourceDirectives {
complete {
HttpEntity.Default(contentType, length,
StreamConverters.fromInputStream(() url.openStream())
.withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher)))
.withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) // TODO is this needed? It already uses `val inputStreamSource = name("inputStreamSource") and IODispatcher`
}
}
} else complete(HttpEntity.Empty)

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.io.InputStream
import akka.stream.ActorAttributes
import akka.stream.scaladsl.{ Sink, StreamConverters }
import akka.util.ByteString
import org.reactivestreams.Publisher
class InputStreamSourceTest extends AkkaPublisherVerification[ByteString] {
def createPublisher(elements: Long): Publisher[ByteString] = {
StreamConverters.fromInputStream(() new InputStream {
@volatile var num = 0
override def read(): Int = {
num += 1
num
}
}).withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher"))
.take(elements)
.runWith(Sink.asPublisher(false))
}
}

View file

@ -3,17 +3,18 @@
*/
package akka.stream.io
import java.io.InputStream
import java.io.{ OutputStream, PipedOutputStream, PipedInputStream, InputStream }
import java.util.concurrent.CountDownLatch
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.StreamConverters
import akka.stream.scaladsl.{ Source, Sink, StreamConverters }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.{ Attributes, OverflowStrategy, ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Await
import scala.concurrent.duration._
class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaFutures {
@ -21,6 +22,17 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF
implicit val materializer = ActorMaterializer(settings)
"InputStreamSource" must {
"not signal when no demand" in {
val f = StreamConverters.fromInputStream(() new InputStream {
override def read(): Int = 42
})
Await.result(f
.takeWithin(5.seconds)
.runForeach(it ()), 10.seconds)
}
"read bytes from InputStream" in assertAllStagesStopped {
val f = StreamConverters.fromInputStream(() new InputStream {
@volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt)

View file

@ -50,7 +50,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom
if (totalDemand > 0) self ! Continue
}
def readAndEmit() = try {
def readAndEmit(): Unit = if (totalDemand > 0) try {
// blocking read
val readBytes = is.read(arr)