Use in-memory filesystem for the file sink and source #20726

This commit is contained in:
Johan Andrén 2016-08-29 16:33:33 +02:00 committed by GitHub
parent bc09e610f6
commit f760b98e8c
3 changed files with 25 additions and 9 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.io
import java.nio.file.{ Files, Path, StandardOpenOption }
import akka.actor.ActorSystem
import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor
@ -15,6 +16,9 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.ActorAttributes
import akka.util.{ ByteString, Timeout }
import com.google.common.jimfs.{ Configuration, Jimfs }
import org.scalatest.BeforeAndAfterAll
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
@ -23,6 +27,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val fs = Jimfs.newFileSystem("FileSinkSpec", Configuration.unix())
val TestLines = {
val b = ListBuffer[String]()
@ -136,7 +141,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
}
private def targetFile(block: Path Unit, create: Boolean = true) {
val targetFile = Files.createTempFile("synchronous-file-sink", ".tmp")
val targetFile = Files.createTempFile(fs.getPath("/"), "synchronous-file-sink", ".tmp")
if (!create) Files.delete(targetFile)
try block(targetFile) finally Files.delete(targetFile)
}
@ -146,4 +151,8 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
new String(out) should ===(contents)
}
override def afterTermination(): Unit = {
fs.close()
}
}

View file

@ -3,9 +3,10 @@
*/
package akka.stream.io
import java.nio.file.Files
import java.nio.file.{FileSystems, Files}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Random
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
@ -15,13 +16,15 @@ import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.FileSourceSpec.Settings
import akka.stream.scaladsl.{ FileIO, Keep, Sink }
import akka.stream.scaladsl.{FileIO, Keep, Sink}
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration
import akka.util.ByteString
import akka.util.Timeout
import com.google.common.jimfs.{Configuration, Jimfs}
import scala.concurrent.Await
import scala.concurrent.duration._
@ -34,6 +37,8 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val fs = Jimfs.newFileSystem("FileSourceSpec", Configuration.unix())
val TestText = {
("a" * 1000) +
("b" * 1000) +
@ -44,14 +49,14 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}
val testFile = {
val f = Files.createTempFile("file-source-spec", ".tmp")
val f = Files.createTempFile(fs.getPath("/"), "file-source-spec", ".tmp")
Files.newBufferedWriter(f, UTF_8).append(TestText).close()
f
}
val notExistingFile = {
// this way we make sure it doesn't accidentally exist
val f = Files.createTempFile("not-existing-file", ".tmp")
val f = Files.createTempFile(fs.getPath("/"), "not-existing-file", ".tmp")
Files.delete(f)
f
}
@ -59,7 +64,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val LinesCount = 2000 + new Random().nextInt(300)
val manyLines = {
val f = Files.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp")
val f = Files.createTempFile(fs.getPath("/"), s"file-source-spec-lines_$LinesCount", "tmp")
val w = Files.newBufferedWriter(f, UTF_8)
(1 to LinesCount).foreach { l
w.append("a" * l).append("\n")
@ -206,8 +211,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}
override def afterTermination(): Unit = {
Files.delete(testFile)
Files.delete(manyLines)
fs.close()
}
}

View file

@ -97,6 +97,9 @@ object Dependencies {
val junitIntf = "com.novocode" % "junit-interface" % "0.11" % "test" // MIT
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.4" % "test"
// in-memory filesystem for file related tests
val jimfs = "com.google.jimfs" % "jimfs" % "1.1" % "test" // ApacheV2
// metrics, measurements, perf testing
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2
val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2
@ -209,7 +212,7 @@ object Dependencies {
lazy val streamTestkit = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit)
lazy val streamTests = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.commonsIo)
lazy val streamTests = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.commonsIo, Test.jimfs)
lazy val streamTestsTck = l ++= Seq(Test.scalatest.value, Test.scalacheck.value, Test.junit, Test.reactiveStreamsTck)