From f4186b5391a38cafeb3fddeed6ffbcc8c07c7399 Mon Sep 17 00:00:00 2001 From: Heiko Seeberger Date: Tue, 24 May 2016 19:08:43 +0200 Subject: [PATCH] Return failed IOResult for non-existint file (closes #20590) --- .../src/test/scala/akka/stream/io/FileSourceSpec.scala | 8 +++++--- .../main/scala/akka/stream/impl/io/FilePublisher.scala | 7 ++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 1311dd3ff7..857e3d6427 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -15,10 +15,11 @@ import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children import akka.stream.io.FileSourceSpec.Settings -import akka.stream.scaladsl.{ FileIO, Sink } +import akka.stream.scaladsl.{ FileIO, Keep, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.TestDuration import akka.util.ByteString import akka.util.Timeout import scala.concurrent.Await @@ -138,13 +139,14 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { c.expectComplete() } - "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = FileIO.fromPath(notExistingFile).runWith(Sink.asPublisher(false)) + "onError with failure and return a failed IOResult when trying to read from file which does not exist" in assertAllStagesStopped { + val (r, p) = FileIO.fromPath(notExistingFile).toMat(Sink.asPublisher(false))(Keep.both).run() val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) c.expectSubscription() c.expectError() + val ioResult = Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true } List( diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index b225ef0834..933101c183 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -51,7 +51,8 @@ private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IORe try { chan = FileChannel.open(f, FilePublisher.Read) } catch { - case ex: Exception ⇒ + case NonFatal(ex) ⇒ + completionPromise.trySuccess(IOResult(0L, Failure(ex))) onErrorThenStop(ex) } @@ -108,8 +109,8 @@ private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IORe try { if (chan ne null) chan.close() } catch { - case ex: Exception ⇒ - completionPromise.success(IOResult(readBytesTotal, Failure(ex))) + case NonFatal(ex) ⇒ + completionPromise.trySuccess(IOResult(readBytesTotal, Failure(ex))) } completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done)))