diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamConvertersSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamConvertersSpec.scala index 24a0be5102..72d0ccbcce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamConvertersSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamConvertersSpec.scala @@ -14,8 +14,11 @@ import java.util.stream.BaseStream import java.util.stream.Collector import java.util.stream.Collectors +import akka.stream.ActorAttributes import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils.TE +import akka.util.ByteString + import akka.testkit.DefaultTimeout import org.scalatest.time.Millis import org.scalatest.time.Span @@ -309,7 +312,33 @@ class StreamConvertersSpec extends StreamSpec with DefaultTimeout { Await.result(future, 300.millis) } } - } + "InputStream Sink" must { + "produce a single value" in { + val source = Source.single(ByteString("ASDF")) + val sink = + StreamConverters.asInputStream().withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher")) + + val is = source.runWith(sink) + is.read() should be('A') + val target = Array[Byte](0, 0, 0) + is.read(target, 1, 2) + target should be(Array[Byte](0, 'S', 'D')) + is.read() should be('F') + is.read() should be(-1) + is.close() + } + // As specified in the Closeable interface, #28664 + "withstand being closed twice" in { + val source = Source.single(ByteString("ASDF")) + val sink = + StreamConverters.asInputStream().withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher")) + + val is = source.runWith(sink) + is.read() should be('A') + is.close() + is.close() + } + } } diff --git a/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28681-allow-closing-inputstream-twice b/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28681-allow-closing-inputstream-twice new file mode 100644 index 0000000000..270658d410 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.3.backwards.excludes/28681-allow-closing-inputstream-twice @@ -0,0 +1,3 @@ +# private[stream] +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamAdapter.isActive") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamAdapter.isActive_=") diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index b0f64a3acc..2651e45528 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.{ IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } +import java.util.concurrent.atomic.AtomicBoolean import akka.annotation.InternalApi import akka.stream.Attributes.InputBuffer @@ -118,14 +119,14 @@ private[stream] object InputStreamSinkStage { extends InputStream { var isInitialized = false - var isActive = true + val isActive = new AtomicBoolean(true) var isStageAlive = true def subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible") var detachedChunk: Option[ByteString] = None @scala.throws(classOf[IOException]) private[this] def executeIfNotClosed[T](f: () => T): T = - if (isActive) { + if (isActive.get()) { waitIfNotInitialized() f() } else throw subscriberClosedException @@ -187,11 +188,10 @@ private[stream] object InputStreamSinkStage { @scala.throws(classOf[IOException]) override def close(): Unit = { - executeIfNotClosed(() => { + if (isActive.getAndSet(false)) { // at this point Subscriber may be already terminated if (isStageAlive) sendToStage(Close) - isActive = false - }) + } } @tailrec