Allow named pipes / FIFOs when using FileIO #25328

This commit is contained in:
Nathan Kleyn 2018-07-12 07:31:16 +01:00 committed by Johan Andrén
parent d76b27ba3e
commit a372ad8b03
2 changed files with 20 additions and 8 deletions

View file

@ -19,11 +19,9 @@ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration
import akka.util.ByteString
import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.concurrent.Await
import scala.concurrent.duration._
object FileSourceSpec {
@ -59,6 +57,8 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
f
}
val directoryInsteadOfFile = Files.createTempDirectory(fs.getPath("/"), "directory-instead-of-file")
val LinesCount = 2000 + new Random().nextInt(300)
val manyLines = {
@ -116,7 +116,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
p.request(1)
p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1)
mat.shutdown()
Await.result(future, 3.seconds) === createSuccessful(chunkSize)
future.futureValue === createSuccessful(chunkSize)
}
"read partial contents from a file" in assertAllStagesStopped {
@ -154,7 +154,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
p.request(1)
p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1)
p.cancel()
Await.result(future, 3.seconds) === createSuccessful(chunkSize)
future.futureValue === createSuccessful(chunkSize)
}
"complete only when all contents of a file have been signalled" in assertAllStagesStopped {
@ -200,7 +200,20 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val error = c.expectError()
error shouldBe a[NoSuchFileException]
Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true
r.futureValue.status.isFailure shouldBe true
}
"onError with failure and return a failed IOResult when trying to read from a directory instead of a file" in assertAllStagesStopped {
val (r, p) = FileIO.fromPath(directoryInsteadOfFile).toMat(Sink.asPublisher(false))(Keep.both).run()
val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)
c.expectSubscription()
val error = c.expectError()
error shouldBe an[IllegalArgumentException]
r.futureValue.status.isFailure shouldBe true
}
List(
@ -216,8 +229,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val f = s.runWith(Sink.fold(0) { case (acc, l) acc + l.utf8String.count(_ == '\n') })
val lineCount = Await.result(f, 3.seconds)
lineCount should ===(LinesCount)
f.futureValue should ===(LinesCount)
}
}

View file

@ -73,7 +73,7 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition:
// this is a bit weird but required to keep existing semantics
if (!Files.exists(path)) throw new NoSuchFileException(path.toString)
require(Files.isRegularFile(path), s"Path '$path' is not a regular file")
require(!Files.isDirectory(path), s"Path '$path' is a directory")
require(Files.isReadable(path), s"Missing read permission for '$path'")
channel = FileChannel.open(path, StandardOpenOption.READ)