Merge pull request #20620 from hseeberger/20590-fileio

Return failed IOResult for non-existint file (closes #20590)
This commit is contained in:
Patrik Nordwall 2016-05-26 16:04:36 +02:00
commit e2c6948c15
2 changed files with 9 additions and 6 deletions

View file

@ -15,10 +15,11 @@ import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.FileSourceSpec.Settings
import akka.stream.scaladsl.{ FileIO, Sink }
import akka.stream.scaladsl.{ FileIO, Keep, Sink }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration
import akka.util.ByteString
import akka.util.Timeout
import scala.concurrent.Await
@ -138,13 +139,14 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
c.expectComplete()
}
"onError whent trying to read from file which does not exist" in assertAllStagesStopped {
val p = FileIO.fromPath(notExistingFile).runWith(Sink.asPublisher(false))
"onError with failure and return a failed IOResult when trying to read from file which does not exist" in assertAllStagesStopped {
val (r, p) = FileIO.fromPath(notExistingFile).toMat(Sink.asPublisher(false))(Keep.both).run()
val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)
c.expectSubscription()
c.expectError()
val ioResult = Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true
}
List(

View file

@ -51,7 +51,8 @@ private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IORe
try {
chan = FileChannel.open(f, FilePublisher.Read)
} catch {
case ex: Exception
case NonFatal(ex)
completionPromise.trySuccess(IOResult(0L, Failure(ex)))
onErrorThenStop(ex)
}
@ -108,8 +109,8 @@ private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IORe
try {
if (chan ne null) chan.close()
} catch {
case ex: Exception
completionPromise.success(IOResult(readBytesTotal, Failure(ex)))
case NonFatal(ex)
completionPromise.trySuccess(IOResult(readBytesTotal, Failure(ex)))
}
completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done)))