Allow closing StreamConverter InputStream twice #28664

This commit is contained in:
Arnout Engelen 2020-03-04 17:12:50 +01:00 committed by GitHub
parent 72d2b7f034
commit e487088a27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 6 deletions

View file

@ -14,8 +14,11 @@ import java.util.stream.BaseStream
import java.util.stream.Collector import java.util.stream.Collector
import java.util.stream.Collectors import java.util.stream.Collectors
import akka.stream.ActorAttributes
import akka.stream.testkit.StreamSpec import akka.stream.testkit.StreamSpec
import akka.stream.testkit.Utils.TE import akka.stream.testkit.Utils.TE
import akka.util.ByteString
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import org.scalatest.time.Millis import org.scalatest.time.Millis
import org.scalatest.time.Span import org.scalatest.time.Span
@ -309,7 +312,33 @@ class StreamConvertersSpec extends StreamSpec with DefaultTimeout {
Await.result(future, 300.millis) Await.result(future, 300.millis)
} }
} }
} }
"InputStream Sink" must {
"produce a single value" in {
val source = Source.single(ByteString("ASDF"))
val sink =
StreamConverters.asInputStream().withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher"))
val is = source.runWith(sink)
is.read() should be('A')
val target = Array[Byte](0, 0, 0)
is.read(target, 1, 2)
target should be(Array[Byte](0, 'S', 'D'))
is.read() should be('F')
is.read() should be(-1)
is.close()
}
// As specified in the Closeable interface, #28664
"withstand being closed twice" in {
val source = Source.single(ByteString("ASDF"))
val sink =
StreamConverters.asInputStream().withAttributes(ActorAttributes.dispatcher("akka.test.stream-dispatcher"))
val is = source.runWith(sink)
is.read() should be('A')
is.close()
is.close()
}
}
} }

View file

@ -0,0 +1,3 @@
# private[stream]
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamAdapter.isActive")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamAdapter.isActive_=")

View file

@ -6,6 +6,7 @@ package akka.stream.impl.io
import java.io.{ IOException, InputStream } import java.io.{ IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import java.util.concurrent.atomic.AtomicBoolean
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
@ -118,14 +119,14 @@ private[stream] object InputStreamSinkStage {
extends InputStream { extends InputStream {
var isInitialized = false var isInitialized = false
var isActive = true val isActive = new AtomicBoolean(true)
var isStageAlive = true var isStageAlive = true
def subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible") def subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible")
var detachedChunk: Option[ByteString] = None var detachedChunk: Option[ByteString] = None
@scala.throws(classOf[IOException]) @scala.throws(classOf[IOException])
private[this] def executeIfNotClosed[T](f: () => T): T = private[this] def executeIfNotClosed[T](f: () => T): T =
if (isActive) { if (isActive.get()) {
waitIfNotInitialized() waitIfNotInitialized()
f() f()
} else throw subscriberClosedException } else throw subscriberClosedException
@ -187,11 +188,10 @@ private[stream] object InputStreamSinkStage {
@scala.throws(classOf[IOException]) @scala.throws(classOf[IOException])
override def close(): Unit = { override def close(): Unit = {
executeIfNotClosed(() => { if (isActive.getAndSet(false)) {
// at this point Subscriber may be already terminated // at this point Subscriber may be already terminated
if (isStageAlive) sendToStage(Close) if (isStageAlive) sendToStage(Close)
isActive = false }
})
} }
@tailrec @tailrec