Re-write InputStreamSource as GraphStage (#26811)

* Re-write InputStreamPublisher as a GraphStage
* Deprecate IOResult.failure and introduce IOOperationIncompleteException

After some dicussion with @johanandren, @raboof and @2m about the
confusion of the materialised value of the IO streams to complete
even if there is an exception (with the exception in the IOResult)
this now changes it to:

* Deprecate failure in IOResult so it is always set to Success(Done)
* Stop using AbrubtIOTerminationException as the inner IOResult also
contains an exception causing confusion
This commit is contained in:
Christopher Batey 2019-07-02 10:46:16 +01:00 committed by GitHub
parent cfed2512d7
commit 200b07e534
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 252 additions and 177 deletions

View file

@ -266,7 +266,7 @@ blocking and protect a bit against starving the internal actors. Since the inter
the default dispatcher has been adjusted down to `1.0` which means the number of threads will be one per core, but at least
`8` and at most `64`. This can be tuned using the individual settings in `akka.actor.default-dispatcher.fork-join-executor`.
### Cluster sharding
### Cluster Sharding
#### waiting-for-state-timeout reduced to 2s
@ -274,7 +274,7 @@ This has been reduced to speed up ShardCoordinator initialization in smaller clu
The read from ddata is a ReadMajority, for small clusters (< majority-min-cap) every node needs to respond
so is more likely to timeout if there are nodes restarting e.g. when there is a rolling re-deploy happening.
### Passivate idle entity
#### Passivate idle entity
The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default.
Sharding will passivate entities when they have not received any messages after this duration.
@ -301,6 +301,11 @@ and then it will behave as in Akka 2.5.x:
akka.coordinated-shutdown.run-by-actor-system-terminate = off
```
### IOSources
`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
### Akka now uses Fork Join Pool from JDK

View file

@ -23,7 +23,9 @@ import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
@silent
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")

View file

@ -23,11 +23,13 @@ import akka.util.ByteString
import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
object FileSourceSpec {
final case class Settings(chunkSize: Int, readAhead: Int)
}
@silent
class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")

View file

@ -4,56 +4,110 @@
package akka.stream.io
import java.io.InputStream
import java.io.{ ByteArrayInputStream, InputStream }
import java.util.concurrent.CountDownLatch
import akka.stream.scaladsl.{ Sink, StreamConverters }
import akka.Done
import akka.stream.scaladsl.{ Keep, Sink, StreamConverters }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.{ AbruptStageTerminationException, ActorMaterializer, ActorMaterializerSettings, IOResult }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Success
import com.github.ghik.silencer.silent
@silent
class InputStreamSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
"InputStreamSource" must {
private def inputStreamFor(bytes: Array[Byte]): InputStream =
new ByteArrayInputStream(bytes)
"not signal when no demand" in {
val f = StreamConverters.fromInputStream(() =>
new InputStream {
override def read(): Int = 42
})
Await.result(f.takeWithin(5.seconds).runForeach(_ => ()), 10.seconds)
}
"InputStream Source" must {
"read bytes from InputStream" in assertAllStagesStopped {
val f = StreamConverters
.fromInputStream(() =>
new InputStream {
@volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt)
override def read(): Int = {
buf match {
case head :: tail =>
buf = tail
head
case Nil =>
-1
}
}
})
.runWith(Sink.head)
val f =
StreamConverters.fromInputStream(() => inputStreamFor(Array('a', 'b', 'c').map(_.toByte))).runWith(Sink.head)
f.futureValue should ===(ByteString("abc"))
}
"record number of bytes read" in assertAllStagesStopped {
StreamConverters
.fromInputStream(() => inputStreamFor(Array(1, 2, 3)))
.toMat(Sink.ignore)(Keep.left)
.run
.futureValue shouldEqual IOResult(3, Success(Done))
}
"return fail if close fails" in assertAllStagesStopped {
val fail = new RuntimeException("oh dear")
StreamConverters
.fromInputStream(() =>
new InputStream {
override def read(): Int = -1
override def close(): Unit = throw fail
})
.toMat(Sink.ignore)(Keep.left)
.run
.failed
.futureValue
.getCause shouldEqual fail
}
"return failure if creation fails" in {
val fail = new RuntimeException("oh dear indeed")
StreamConverters
.fromInputStream(() => {
throw fail
})
.toMat(Sink.ignore)(Keep.left)
.run
.failed
.futureValue
.getCause shouldEqual fail
}
"handle failure on read" in assertAllStagesStopped {
val fail = new RuntimeException("oh dear indeed")
StreamConverters
.fromInputStream(() => () => throw fail)
.toMat(Sink.ignore)(Keep.left)
.run
.failed
.futureValue
.getCause shouldEqual fail
}
"include number of bytes when downstream doesn't read all of it" in {
val f = StreamConverters
.fromInputStream(() => inputStreamFor(Array.fill(100)(1)), 1)
.take(1) // stream is not completely read
.toMat(Sink.ignore)(Keep.left)
.run
.futureValue
f.status shouldEqual Success(Done)
f.count shouldBe >=(1L)
}
"handle actor materializer shutdown" in {
val mat = ActorMaterializer()
val source = StreamConverters.fromInputStream(() => inputStreamFor(Array(1, 2, 3)))
val pubSink = Sink.asPublisher[ByteString](false)
val (f, neverPub) = source.toMat(pubSink)(Keep.both).run()(mat)
val c = TestSubscriber.manualProbe[ByteString]()
neverPub.subscribe(c)
c.expectSubscription()
mat.shutdown()
f.failed.futureValue shouldBe an[AbruptStageTerminationException]
}
"emit as soon as read" in assertAllStagesStopped {
val latch = new CountDownLatch(1)
val probe = StreamConverters

View file

@ -16,7 +16,9 @@ import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
@silent
class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")

View file

@ -97,6 +97,14 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPubl
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnrestrictedStash")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamSource.withAttributes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamSource.*")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.InputStreamSource")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSource")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$")
# #25045 adding Java/Scala interop to SourceQueue and SinkQueue
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter")
@ -112,3 +120,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart
# Adding methods to Materializer is not compatible but we don't support other Materializer implementations
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.scheduleAtFixedRate")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.scheduleWithFixedDelay")
#26187 - Re-write ActorPublisher/Subscribers as GraphStages
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamSource.withAttributes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamSource.*")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.InputStreamSource")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSource")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$")

View file

@ -5,6 +5,7 @@
package akka.stream
import akka.Done
import com.github.ghik.silencer.silent
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success, Try }
@ -15,9 +16,14 @@ import scala.util.{ Failure, Success, Try }
* @param count Numeric value depending on context, for example IO operations performed or bytes processed.
* @param status Status of the result. Can be either [[akka.Done]] or an exception.
*/
final case class IOResult(count: Long, status: Try[Done]) {
@silent // deprecated success
final case class IOResult(
count: Long,
@deprecated("status is always set to Success(Done)", "2.6.0") status: Try[Done]) {
def withCount(value: Long): IOResult = copy(count = value)
@deprecated("status is always set to Success(Done)", "2.6.0")
def withStatus(value: Try[Done]): IOResult = copy(status = value)
/**
@ -28,12 +34,14 @@ final case class IOResult(count: Long, status: Try[Done]) {
/**
* Java API: Indicates whether IO operation completed successfully or not.
*/
@deprecated("status is always set to Success(Done)", "2.6.0")
def wasSuccessful: Boolean = status.isSuccess
/**
* Java API: If the IO operation resulted in an error, returns the corresponding [[Throwable]]
* or throws [[UnsupportedOperationException]] otherwise.
*/
@deprecated("status is always set to Success(Done)", "2.6.0")
def getError: Throwable = status match {
case Failure(t) => t
case Success(_) => throw new UnsupportedOperationException("IO operation was successful.")
@ -43,6 +51,8 @@ final case class IOResult(count: Long, status: Try[Done]) {
object IOResult {
def apply(count: Long): IOResult = IOResult(count, Success(Done))
/** JAVA API: Creates successful IOResult */
def createSuccessful(count: Long): IOResult =
new IOResult(count, Success(Done))
@ -56,6 +66,16 @@ object IOResult {
* This exception signals that a stream has been completed by an onError signal
* while there was still IO operations in progress.
*/
@deprecated("use IOOperationIncompleteException", "2.6.0")
final case class AbruptIOTerminationException(ioResult: IOResult, cause: Throwable)
extends RuntimeException("Stream terminated without completing IO operation.", cause)
with NoStackTrace
/**
* This exception signals that a stream has been completed or has an error while
* there was still IO operations in progress
*
* @param count The number of bytes read/written up until the error
* @param cause cause
*/
final class IOOperationIncompleteException(val count: Long, cause: Throwable) extends RuntimeException(cause)

View file

@ -4,21 +4,15 @@
package akka.stream.impl.io
import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.channels.{ CompletionHandler, FileChannel }
import java.nio.file.{ Files, NoSuchFileException, Path, StandardOpenOption }
import akka.Done
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.stream.stage._
import akka.stream.{ IOResult, _ }
import akka.util.ByteString
import com.github.ghik.silencer.silent
import org.reactivestreams.Publisher
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise }
@ -140,43 +134,3 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
override def toString = s"FileSource($path, $chunkSize)"
}
/**
* INTERNAL API
* Source backed by the given input stream.
*/
@InternalApi private[akka] final class InputStreamSource(
createInputStream: () => InputStream,
chunkSize: Int,
val attributes: Attributes,
shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[IOResult]](shape) {
override def create(context: MaterializationContext) = {
val materializer = ActorMaterializerHelper.downcast(context.materializer)
val ioResultPromise = Promise[IOResult]()
@silent
val pub = try {
val is = createInputStream() // can throw, i.e. FileNotFound
val props = InputStreamPublisher
.props(is, ioResultPromise, chunkSize)
.withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher)
val ref = materializer.actorOf(context, props)
akka.stream.actor.ActorPublisher[ByteString](ref)
} catch {
case ex: Exception =>
ioResultPromise.failure(ex)
ErrorPublisher(ex, attributes.nameOrDefault("inputStreamSource")).asInstanceOf[Publisher[ByteString]]
}
(pub, ioResultPromise.future)
}
override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] =
new InputStreamSource(createInputStream, chunkSize, attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] =
new InputStreamSource(createInputStream, chunkSize, attr, amendShape(attr))
}

View file

@ -1,94 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.io.InputStream
import akka.Done
import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisherMessage
import akka.stream.IOResult
import akka.util.ByteString
import com.github.ghik.silencer.silent
import scala.concurrent.Promise
import scala.util.{ Failure, Success }
/** INTERNAL API */
@InternalApi private[akka] object InputStreamPublisher {
def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local)
}
private final case object Continue extends DeadLetterSuppression
}
/** INTERNAL API */
@silent
@InternalApi private[akka] class InputStreamPublisher(
is: InputStream,
completionPromise: Promise[IOResult],
chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {
// TODO possibly de-duplicate with FilePublisher?
import InputStreamPublisher._
val arr = new Array[Byte](chunkSize)
var readBytesTotal = 0L
def receive = {
case ActorPublisherMessage.Request(_) => readAndSignal()
case Continue => readAndSignal()
case ActorPublisherMessage.Cancel => context.stop(self)
}
def readAndSignal(): Unit =
if (isActive) {
readAndEmit()
if (totalDemand > 0 && isActive) self ! Continue
}
def readAndEmit(): Unit =
if (totalDemand > 0) try {
// blocking read
val readBytes = is.read(arr)
readBytes match {
case -1 =>
// had nothing to read into this chunk
log.debug("No more bytes available to read (got `-1` from `read`)")
onCompleteThenStop()
case _ =>
readBytesTotal += readBytes
// emit immediately, as this is the only chance to do it before we might block again
onNext(ByteString.fromArray(arr, 0, readBytes))
}
} catch {
case ex: Exception =>
onErrorThenStop(ex)
}
override def postStop(): Unit = {
super.postStop()
try {
if (is ne null) is.close()
} catch {
case ex: Exception =>
completionPromise.success(IOResult(readBytesTotal, Failure(ex)))
}
completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done)))
}
}

View file

@ -0,0 +1,114 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.io.InputStream
import akka.annotation.InternalApi
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{
AbruptStageTerminationException,
Attributes,
IOOperationIncompleteException,
IOResult,
Outlet,
SourceShape
}
import akka.stream.stage.{ GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, OutHandler }
import akka.util.ByteString
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class InputStreamSource(factory: () => InputStream, chunkSize: Int)
extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
private val out: Outlet[ByteString] = Outlet("InputStreamSource.out")
override def shape: SourceShape[ByteString] = SourceShape(out)
override protected def initialAttributes: Attributes = DefaultAttributes.inputStreamSource
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
val mat = Promise[IOResult]
val logic = new GraphStageLogicWithLogging(shape) with OutHandler {
private val buffer = new Array[Byte](chunkSize)
private var readBytesTotal = 0L
private var inputStream: InputStream = _
private def isClosed = mat.isCompleted
override def preStart(): Unit = {
try {
inputStream = factory()
} catch {
case NonFatal(t) =>
mat.failure(new IOOperationIncompleteException(0, t))
failStage(t)
}
}
override def onPull(): Unit =
try {
inputStream.read(buffer) match {
case -1 =>
closeStage()
case readBytes =>
readBytesTotal += readBytes
push(out, ByteString.fromArray(buffer, 0, readBytes))
}
} catch {
case NonFatal(t) =>
failStream(t)
failStage(t)
}
override def onDownstreamFinish(): Unit = {
if (!isClosed) {
closeInputStream()
mat.trySuccess(IOResult(readBytesTotal))
}
}
override def postStop(): Unit = {
if (!isClosed) {
mat.tryFailure(new AbruptStageTerminationException(this))
}
}
private def closeStage(): Unit = {
closeInputStream()
mat.trySuccess(IOResult(readBytesTotal))
completeStage()
}
private def failStream(reason: Throwable): Unit = {
closeInputStream()
mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, reason))
}
private def closeInputStream(): Unit = {
try {
if (inputStream != null)
inputStream.close()
} catch {
case NonFatal(ex) =>
mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, ex))
failStage(ex)
}
}
setHandler(out, this)
}
(logic, mat.future)
}
override def toString: String = "InputStreamSource"
}

View file

@ -24,7 +24,6 @@ import akka.NotUsed
*/
object StreamConverters {
import Source.{ shape => sourceShape }
import Sink.{ shape => sinkShape }
/**
@ -45,9 +44,9 @@ object StreamConverters {
* @param in a function which creates the InputStream to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
Source.fromGraph(
new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, sourceShape("InputStreamSource")))
def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = {
Source.fromGraph(new InputStreamSource(in, chunkSize))
}
/**
* Creates a Source which when materialized will return an [[OutputStream]] which it is possible