diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 10f9978e7f..42fe540583 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -19,11 +19,9 @@ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.TestDuration import akka.util.ByteString import com.google.common.jimfs.{ Configuration, Jimfs } -import scala.concurrent.Await import scala.concurrent.duration._ object FileSourceSpec { @@ -59,6 +57,8 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { f } + val directoryInsteadOfFile = Files.createTempDirectory(fs.getPath("/"), "directory-instead-of-file") + val LinesCount = 2000 + new Random().nextInt(300) val manyLines = { @@ -116,7 +116,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { p.request(1) p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1) mat.shutdown() - Await.result(future, 3.seconds) === createSuccessful(chunkSize) + future.futureValue === createSuccessful(chunkSize) } "read partial contents from a file" in assertAllStagesStopped { @@ -154,7 +154,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { p.request(1) p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1) p.cancel() - Await.result(future, 3.seconds) === createSuccessful(chunkSize) + future.futureValue === createSuccessful(chunkSize) } "complete only when all contents of a file have been signalled" in assertAllStagesStopped { @@ -200,7 +200,20 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val error = c.expectError() error shouldBe a[NoSuchFileException] - Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true + r.futureValue.status.isFailure shouldBe true + } + + "onError with failure and return a failed IOResult when trying to read from a directory instead of a file" in assertAllStagesStopped { + val (r, p) = FileIO.fromPath(directoryInsteadOfFile).toMat(Sink.asPublisher(false))(Keep.both).run() + val c = TestSubscriber.manualProbe[ByteString]() + p.subscribe(c) + + c.expectSubscription() + + val error = c.expectError() + error shouldBe an[IllegalArgumentException] + + r.futureValue.status.isFailure shouldBe true } List( @@ -216,8 +229,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val f = s.runWith(Sink.fold(0) { case (acc, l) ⇒ acc + l.utf8String.count(_ == '\n') }) - val lineCount = Await.result(f, 3.seconds) - lineCount should ===(LinesCount) + f.futureValue should ===(LinesCount) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index b2e5843d62..63ac15b192 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -73,7 +73,7 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: // this is a bit weird but required to keep existing semantics if (!Files.exists(path)) throw new NoSuchFileException(path.toString) - require(Files.isRegularFile(path), s"Path '$path' is not a regular file") + require(!Files.isDirectory(path), s"Path '$path' is a directory") require(Files.isReadable(path), s"Missing read permission for '$path'") channel = FileChannel.open(path, StandardOpenOption.READ)