From cc0445f900f0b634825dbbb12a291f9e16c98a91 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 22 Dec 2015 17:54:35 +0100 Subject: [PATCH 1/2] =htc,str #19256 InputStreamPublisher could emit without pending demand, bug --- .../FileAndResourceDirectives.scala | 2 +- .../stream/io/InputStreamSourceSpec.scala | 20 +++++++++++++++---- .../stream/impl/io/InputStreamPublisher.scala | 2 +- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala index c532cae7f3..45fbfaead6 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala @@ -91,7 +91,7 @@ trait FileAndResourceDirectives { complete { HttpEntity.Default(contentType, length, StreamConverters.fromInputStream(() ⇒ url.openStream()) - .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) + .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) // TODO is this needed? It already uses `val inputStreamSource = name("inputStreamSource") and IODispatcher` } } } else complete(HttpEntity.Empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index dd388b0f6c..532afaf3c6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -3,17 +3,18 @@ */ package akka.stream.io -import java.io.InputStream +import java.io.{ OutputStream, PipedOutputStream, PipedInputStream, InputStream } import java.util.concurrent.CountDownLatch -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.{ Source, Sink, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.{ Attributes, OverflowStrategy, ActorMaterializer, ActorMaterializerSettings } import akka.util.ByteString import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.Await +import scala.concurrent.duration._ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaFutures { @@ -21,6 +22,17 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF implicit val materializer = ActorMaterializer(settings) "InputStreamSource" must { + + "not signal when no demand" in { + val f = StreamConverters.fromInputStream(() ⇒ new InputStream { + override def read(): Int = 42 + }) + + Await.result(f + .takeWithin(5.seconds) + .runForeach(it ⇒ ()), 10.seconds) + } + "read bytes from InputStream" in assertAllStagesStopped { val f = StreamConverters.fromInputStream(() ⇒ new InputStream { @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index f2eec6ef86..7300f9652f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -50,7 +50,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom if (totalDemand > 0) self ! Continue } - def readAndEmit() = try { + def readAndEmit(): Unit = if (totalDemand > 0) try { // blocking read val readBytes = is.read(arr) From b590b50f0d4bfc45d1c6e750e7a3d11f40ea3998 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Tue, 22 Dec 2015 17:54:35 +0100 Subject: [PATCH 2/2] =htc,str #19256 add TCK test for InputStreamPublisher --- .../stream/tck/InputStreamSourceTest.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/InputStreamSourceTest.scala diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/InputStreamSourceTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/InputStreamSourceTest.scala new file mode 100644 index 0000000000..4edda9a96b --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/InputStreamSourceTest.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import java.io.InputStream + +import akka.stream.ActorAttributes +import akka.stream.scaladsl.{ Sink, StreamConverters } +import akka.util.ByteString +import org.reactivestreams.Publisher + +class InputStreamSourceTest extends AkkaPublisherVerification[ByteString] { + + def createPublisher(elements: Long): Publisher[ByteString] = { + StreamConverters.fromInputStream(() ⇒ new InputStream { + @volatile var num = 0 + override def read(): Int = { + num += 1 + num + } + }).withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher")) + .take(elements) + .runWith(Sink.asPublisher(false)) + } +} +