Add basic support for Java 7 NIO file systems (#20293)

This commit is contained in:
Michał Kiędyś 2016-04-25 19:25:26 +10:00 committed by Konrad Malawski
parent 6d399a308e
commit b983f19c1f
23 changed files with 286 additions and 124 deletions

View file

@ -4,7 +4,7 @@
package akka.stream.io
import java.io.{ FileInputStream, File }
import java.nio.file.{ Files, Path }
import java.util.concurrent.TimeUnit
import akka.{ Done, NotUsed }
import akka.actor.ActorSystem
@ -28,15 +28,14 @@ class FileSourcesBenchmark {
implicit val system = ActorSystem("file-sources-benchmark")
implicit val materializer = ActorMaterializer()
val file: File = {
val file: Path = {
val line = ByteString("x" * 2048 + "\n")
val f = File.createTempFile(getClass.getName, ".bench.tmp")
f.deleteOnExit()
val f = Files.createTempFile(getClass.getName, ".bench.tmp")
val ft = Source.fromIterator(() Iterator.continually(line))
.take(10 * 39062) // adjust as needed
.runWith(FileIO.toFile(f))
.runWith(FileIO.toPath(f))
Await.result(ft, 30.seconds)
f
@ -51,14 +50,14 @@ class FileSourcesBenchmark {
@Setup
def setup():Unit = {
fileChannelSource = FileIO.fromFile(file, bufSize)
fileInputStreamSource = StreamConverters.fromInputStream(() new FileInputStream(file), bufSize)
ioSourceLinesIterator = Source.fromIterator(() scala.io.Source.fromFile(file).getLines()).map(ByteString(_))
fileChannelSource = FileIO.fromPath(file, bufSize)
fileInputStreamSource = StreamConverters.fromInputStream(() Files.newInputStream(file), bufSize)
ioSourceLinesIterator = Source.fromIterator(() scala.io.Source.fromFile(file.toFile).getLines()).map(ByteString(_))
}
@TearDown
def teardown(): Unit = {
file.delete()
Files.delete(file)
}
@TearDown

View file

@ -3,7 +3,7 @@
*/
package docs.stream;
import java.io.File;
import java.nio.file.Paths;
import java.math.BigInteger;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
@ -56,7 +56,7 @@ public class QuickStartDocTest {
final CompletionStage<IOResult> result =
factorials
.map(num -> ByteString.fromString(num.toString() + "\n"))
.runWith(FileIO.toFile(new File("factorials.txt")), materializer);
.runWith(FileIO.toPath(Paths.get("factorials.txt")), materializer);
//#transform-source
//#use-transformed-sink
@ -81,7 +81,7 @@ public class QuickStartDocTest {
public Sink<String, CompletionStage<IOResult>> lineSink(String filename) {
return Flow.of(String.class)
.map(s -> ByteString.fromString(s.toString() + "\n"))
.toMat(FileIO.toFile(new File(filename)), Keep.right());
.toMat(FileIO.toPath(Paths.get(filename)), Keep.right());
}
//#transform-sink

View file

@ -3,7 +3,9 @@
*/
package docs.stream.io;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.io.IOException;
import java.util.concurrent.CompletionStage;
@ -47,13 +49,13 @@ public class StreamFileDocTest extends AbstractJavaTest {
{
//#file-source
final File file = new File("example.csv");
final Path file = Paths.get("example.csv");
//#file-source
}
@Test
public void demonstrateMaterializingBytesWritten() throws IOException {
final File file = File.createTempFile(getClass().getName(), ".tmp");
final Path file = Files.createTempFile(getClass().getName(), ".tmp");
try {
//#file-source
@ -61,27 +63,27 @@ public class StreamFileDocTest extends AbstractJavaTest {
Sink.<ByteString> foreach(chunk -> System.out.println(chunk.utf8String()));
CompletionStage<IOResult> ioResult =
FileIO.fromFile(file)
FileIO.fromPath(file)
.to(printlnSink)
.run(mat);
//#file-source
} finally {
file.delete();
Files.delete(file);
}
}
@Test
public void demonstrateSettingDispatchersInCode() throws IOException {
final File file = File.createTempFile(getClass().getName(), ".tmp");
final Path file = Files.createTempFile(getClass().getName(), ".tmp");
try {
Sink<ByteString, CompletionStage<IOResult>> fileSink =
//#custom-dispatcher-code
FileIO.toFile(file)
FileIO.toPath(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"));
//#custom-dispatcher-code
} finally {
file.delete();
Files.delete(file);
}
}

View file

@ -4,7 +4,7 @@
package docs.http.scaladsl.server.directives
import java.io.File
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.event.Logging
@ -190,7 +190,7 @@ class BasicDirectivesExamplesSpec extends RoutingSpec {
path("sample") {
complete {
// internally uses the configured fileIODispatcher:
val source = FileIO.fromFile(new File("example.json"))
val source = FileIO.fromPath(Paths.get("example.json"))
HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, source))
}
}

View file

@ -15,7 +15,7 @@ import org.scalatest._
import org.scalatest.concurrent._
import scala.concurrent._
import scala.concurrent.duration._
import java.io.File
import java.nio.file.Paths
class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures {
implicit val patience = PatienceConfig(5.seconds)
@ -46,7 +46,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
val result: Future[IOResult] =
factorials
.map(num => ByteString(s"$num\n"))
.runWith(FileIO.toFile(new File("factorials.txt")))
.runWith(FileIO.toPath(Paths.get("factorials.txt")))
//#transform-source
//#use-transformed-sink
@ -71,7 +71,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
def lineSink(filename: String): Sink[String, Future[IOResult]] =
Flow[String]
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toFile(new File(filename)))(Keep.right)
.toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
//#transform-sink
}

View file

@ -31,9 +31,11 @@ object TwitterStreamQuickstartDocSpec {
val akka = Hashtag("#akka")
//#model
// format: OFF
//#tweet-source
val tweets: Source[Tweet, NotUsed]
//#tweet-source
// format: ON
= Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::

View file

@ -3,10 +3,10 @@
*/
package docs.stream.io
import java.io.File
import java.nio.file.{ Files, Paths }
import akka.stream._
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.scaladsl.{ FileIO, Sink }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.util.ByteString
@ -22,9 +22,9 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
// silence sysout
def println(s: String) = ()
val file = File.createTempFile(getClass.getName, ".tmp")
val file = Files.createTempFile(getClass.getName, ".tmp")
override def afterTermination() = file.delete()
override def afterTermination() = Files.delete(file)
{
//#file-source
@ -35,7 +35,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
{
//#file-source
val file = new File("example.csv")
val file = Paths.get("example.csv")
//#file-source
}
@ -46,7 +46,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
//#file-source
val foreach: Future[IOResult] = FileIO.fromFile(file)
val foreach: Future[IOResult] = FileIO.fromPath(file)
.to(Sink.ignore)
.run()
//#file-source
@ -54,7 +54,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
"configure dispatcher in code" in {
//#custom-dispatcher-code
FileIO.fromFile(file)
FileIO.fromPath(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
//#custom-dispatcher-code
}

View file

@ -5,6 +5,7 @@
package akka.http.javadsl.model;
import java.io.File;
import java.nio.file.Path;
import akka.http.impl.util.JavaAccessors;
import akka.http.scaladsl.model.HttpEntity;
@ -42,14 +43,30 @@ public final class HttpEntities {
return HttpEntity$.MODULE$.apply((akka.http.scaladsl.model.ContentType) contentType, bytes);
}
/**
* @deprecated Will be removed in Akka 3.x, use {@link #create(ContentType, Path)} instead.
*/
@Deprecated
public static UniversalEntity create(ContentType contentType, File file) {
return JavaAccessors.HttpEntity(contentType, file);
}
public static UniversalEntity create(ContentType contentType, Path file) {
return JavaAccessors.HttpEntity(contentType, file);
}
/**
* @deprecated Will be removed in Akka 3.x, use {@link #create(ContentType, Path, int)} instead.
*/
@Deprecated
public static UniversalEntity create(ContentType contentType, File file, int chunkSize) {
return HttpEntity$.MODULE$.apply((akka.http.scaladsl.model.ContentType) contentType, file, chunkSize);
}
public static UniversalEntity create(ContentType contentType, Path file, int chunkSize) {
return HttpEntity$.MODULE$.fromPath((akka.http.scaladsl.model.ContentType) contentType, file, chunkSize);
}
public static HttpEntity.Default create(ContentType contentType, long contentLength, Source<ByteString, Object> data) {
return new akka.http.scaladsl.model.HttpEntity.Default((akka.http.scaladsl.model.ContentType) contentType, contentLength, data.asScala());
}

View file

@ -5,12 +5,17 @@
package akka.http.javadsl.model;
import akka.util.ByteString;
import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
/**
* The base type for an Http message (request or response).
*
* INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it!
* Do not implement this interface outside the Akka code base!
*
* Binary compatibility is only maintained for callers of this traits interface.
*/
public interface HttpMessage {
/**
@ -103,9 +108,17 @@ public interface HttpMessage {
/**
* Returns a copy of Self message with a new entity.
*
* @deprecated Will be removed in Akka 3.x, use {@link #withEntity(ContentType, Path)} instead.
*/
@Deprecated
Self withEntity(ContentType type, File file);
/**
* Returns a copy of Self message with a new entity.
*/
Self withEntity(ContentType type, Path file);
/**
* Returns a copy of Self message with a new entity.
*/

View file

@ -5,6 +5,7 @@
package akka.http.impl.util
import java.io.File
import java.nio.file.Path
import JavaMapping.Implicits._
import akka.http.javadsl.model._
@ -28,4 +29,8 @@ object JavaAccessors {
/** INTERNAL API */
def HttpEntity(contentType: ContentType, file: File): UniversalEntity =
model.HttpEntity(contentType.asScala, file)
/** INTERNAL API */
def HttpEntity(contentType: ContentType, file: Path): UniversalEntity =
model.HttpEntity.fromPath(contentType.asScala, file)
}

View file

@ -10,12 +10,13 @@ import akka.http.impl.model.JavaInitialization
import language.implicitConversions
import java.io.File
import java.nio.file.{ Path, Files }
import java.lang.{ Iterable JIterable}
import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.collection.immutable
import akka.util.{Unsafe, ByteString}
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream._
@ -242,11 +243,22 @@ object HttpEntity {
*
* If the given `chunkSize` is -1 the default chunk size is used.
*/
def apply(contentType: ContentType, file: File, chunkSize: Int = -1): UniversalEntity = {
val fileLength = file.length
@deprecated("Use `fromPath` instead", "2.4.5")
def apply(contentType: ContentType, file: File, chunkSize: Int = -1): UniversalEntity =
fromPath(contentType, file.toPath, chunkSize)
/**
* Returns either the empty entity, if the given file is empty, or a [[HttpEntity.Default]] entity
* consisting of a stream of [[akka.util.ByteString]] instances each containing `chunkSize` bytes
* (except for the final ByteString, which simply contains the remaining bytes).
*
* If the given `chunkSize` is -1 the default chunk size is used.
*/
def fromPath(contentType: ContentType, file: Path, chunkSize: Int = -1): UniversalEntity = {
val fileLength = Files.size(file)
if (fileLength > 0)
HttpEntity.Default(contentType, fileLength,
if (chunkSize > 0) FileIO.fromFile(file, chunkSize) else FileIO.fromFile(file))
if (chunkSize > 0) FileIO.fromPath(file, chunkSize) else FileIO.fromPath(file))
else empty(contentType)
}

View file

@ -4,6 +4,8 @@
package akka.http.scaladsl.model
import java.io.File
import java.nio.file.Path
import java.lang.{ Iterable JIterable }
import java.util.Optional
@ -107,7 +109,10 @@ sealed trait HttpMessage extends jm.HttpMessage {
withEntity(HttpEntity(contentType.asInstanceOf[ContentType.NonBinary], string))
def withEntity(contentType: jm.ContentType, bytes: Array[Byte]): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], bytes))
def withEntity(contentType: jm.ContentType, bytes: ByteString): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], bytes))
def withEntity(contentType: jm.ContentType, file: java.io.File): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], file))
@deprecated("Use withEntity(ContentType, Path) instead", "2.4.5")
def withEntity(contentType: jm.ContentType, file: File): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], file))
def withEntity(contentType: jm.ContentType, file: Path): Self = withEntity(HttpEntity.fromPath(contentType.asInstanceOf[ContentType], file))
import collection.JavaConverters._
/** Java API */

View file

@ -3,8 +3,8 @@
*/
package akka.http.scaladsl.model
import java.io.File
import java.nio.file.Path
import java.util.Optional
import akka.http.impl.util.Util
import scala.concurrent.duration.FiniteDuration
@ -344,8 +344,18 @@ object Multipart {
* To create an instance with several parts or for multiple files, use
* `FormData(BodyPart.fromFile("field1", ...), BodyPart.fromFile("field2", ...)`
*/
@deprecated("Use `fromPath` instead", "2.4.5")
def fromFile(name: String, contentType: ContentType, file: File, chunkSize: Int = -1): Multipart.FormData =
Multipart.FormData(Source.single(Multipart.FormData.BodyPart.fromFile(name, contentType, file, chunkSize)))
fromPath(name, contentType, file.toPath, chunkSize)
/**
* Creates a FormData instance that contains a single part backed by the given file.
*
* To create an instance with several parts or for multiple files, use
* `FormData(BodyPart.fromPath("field1", ...), BodyPart.fromPath("field2", ...)`
*/
def fromPath(name: String, contentType: ContentType, file: Path, chunkSize: Int = -1): Multipart.FormData =
Multipart.FormData(Source.single(Multipart.FormData.BodyPart.fromPath(name, contentType, file, chunkSize)))
/**
* Strict [[FormData]].
@ -432,8 +442,15 @@ object Multipart {
/**
* Creates a BodyPart backed by a File that will be streamed using a FileSource.
*/
@deprecated("Use `fromPath` instead", since = "2.4.5")
def fromFile(name: String, contentType: ContentType, file: File, chunkSize: Int = -1): BodyPart =
BodyPart(name, HttpEntity(contentType, file, chunkSize), Map("filename" -> file.getName))
fromPath(name, contentType, file.toPath, chunkSize)
/**
* Creates a BodyPart backed by a file that will be streamed using a FileSource.
*/
def fromPath(name: String, contentType: ContentType, file: Path, chunkSize: Int = -1): BodyPart =
BodyPart(name, HttpEntity.fromPath(contentType, file, chunkSize), Map("filename" -> file.getFileName.toString))
def unapply(value: BodyPart): Option[(String, BodyPartEntity, Map[String, String], immutable.Seq[HttpHeader])] =
Some((value.name, value.entity, value.additionalDispositionParams, value.additionalHeaders))

View file

@ -3,11 +3,10 @@
*/
package akka.stream.tck
import java.io.{ File, FileWriter }
import java.nio.file.Files
import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.FileIO
import akka.stream.scaladsl.{ Sink }
import akka.stream.scaladsl.{ Sink, FileIO }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.testkit.{ EventFilter, TestEvent }
@ -28,21 +27,22 @@ class FilePublisherTest extends AkkaPublisherVerification[ByteString] {
}
val file = {
val f = File.createTempFile("file-source-tck", ".tmp")
val f = Files.createTempFile("file-source-tck", ".tmp")
val chunk = "x" * ChunkSize
val fw = new FileWriter(f)
val fw = Files.newBufferedWriter(f)
for (i 1 to Elements) fw.append(chunk)
fw.close()
f
}
def createPublisher(elements: Long): Publisher[ByteString] =
FileIO.fromFile(file, chunkSize = 512)
FileIO.fromPath(file, chunkSize = 512)
.take(elements)
.runWith(Sink.asPublisher(false))
@AfterClass
def after = file.delete()
def after() = Files.delete(file)
override def maxElementsFromPublisher(): Long = Elements
}

View file

@ -3,8 +3,7 @@
*/
package akka.stream.io
import java.io.File
import java.nio.file.StandardOpenOption
import java.nio.file.{ Files, Path, StandardOpenOption }
import akka.actor.ActorSystem
import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor
@ -15,8 +14,7 @@ import akka.stream.testkit.Utils._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.ActorAttributes
import akka.util.ByteString
import akka.util.Timeout
import akka.util.{ ByteString, Timeout }
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration._
@ -40,11 +38,11 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
val TestByteStrings = TestLines.map(ByteString(_))
"SynchronousFile Sink" must {
"FileSink" must {
"write lines to a file" in assertAllStagesStopped {
targetFile { f
val completion = Source(TestByteStrings)
.runWith(FileIO.toFile(f))
.runWith(FileIO.toPath(f))
val result = Await.result(completion, 3.seconds)
result.count should equal(6006)
@ -55,7 +53,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
"create new file if not exists" in assertAllStagesStopped {
targetFile({ f
val completion = Source(TestByteStrings)
.runWith(FileIO.toFile(f))
.runWith(FileIO.toPath(f))
val result = Await.result(completion, 3.seconds)
result.count should equal(6006)
@ -68,7 +66,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
def write(lines: List[String]) =
Source(lines)
.map(ByteString(_))
.runWith(FileIO.toFile(f))
.runWith(FileIO.toPath(f))
val completion1 = write(TestLines)
Await.result(completion1, 3.seconds)
@ -87,7 +85,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
def write(lines: List[String] = TestLines) =
Source(lines)
.map(ByteString(_))
.runWith(FileIO.toFile(f, Set(StandardOpenOption.APPEND)))
.runWith(FileIO.toPath(f, Set(StandardOpenOption.APPEND)))
val completion1 = write()
val result1 = Await.result(completion1, 3.seconds)
@ -96,8 +94,8 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
val completion2 = write(lastWrite)
val result2 = Await.result(completion2, 3.seconds)
f.length() should ===(result1.count + result2.count)
checkFileContents(f, TestLines.mkString("") + lastWrite.mkString("") + "\n")
Files.size(f) should ===(result1.count + result2.count)
checkFileContents(f, TestLines.mkString("") + lastWrite.mkString(""))
}
}
@ -106,7 +104,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
Source.fromIterator(() Iterator.continually(TestByteStrings.head)).runWith(FileIO.toFile(f))(materializer)
Source.fromIterator(() Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
@ -125,7 +123,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
try {
Source.fromIterator(() Iterator.continually(TestByteStrings.head))
.to(FileIO.toFile(f))
.to(FileIO.toPath(f))
.withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))
.run()(materializer)
@ -138,17 +136,15 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
private def targetFile(block: File Unit, create: Boolean = true) {
val targetFile = File.createTempFile("synchronous-file-sink", ".tmp")
if (!create) targetFile.delete()
try block(targetFile) finally targetFile.delete()
private def targetFile(block: Path Unit, create: Boolean = true) {
val targetFile = Files.createTempFile("synchronous-file-sink", ".tmp")
if (!create) Files.delete(targetFile)
try block(targetFile) finally Files.delete(targetFile)
}
def checkFileContents(f: File, contents: String): Unit = {
val s = scala.io.Source.fromFile(f)
val out = s.getLines().mkString("\n") + "\n"
s.close()
out should ===(contents)
def checkFileContents(f: Path, contents: String): Unit = {
val out = Files.readAllBytes(f)
new String(out) should ===(contents)
}
}

View file

@ -3,7 +3,8 @@
*/
package akka.stream.io
import java.io.File
import java.nio.file.Files
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Random
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
@ -23,9 +24,6 @@ import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import java.io.OutputStreamWriter
import java.io.FileOutputStream
import java.nio.charset.StandardCharsets.UTF_8
object FileSourceSpec {
final case class Settings(chunkSize: Int, readAhead: Int)
@ -46,23 +44,23 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
val testFile = {
val f = File.createTempFile("file-source-spec", ".tmp")
new OutputStreamWriter(new FileOutputStream(f), UTF_8).append(TestText).close()
val f = Files.createTempFile("file-source-spec", ".tmp")
Files.newBufferedWriter(f, UTF_8).append(TestText).close()
f
}
val notExistingFile = {
// this way we make sure it doesn't accidentally exist
val f = File.createTempFile("not-existing-file", ".tmp")
f.delete()
val f = Files.createTempFile("not-existing-file", ".tmp")
Files.delete(f)
f
}
val LinesCount = 2000 + new Random().nextInt(300)
val manyLines = {
val f = File.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp")
val w = new OutputStreamWriter(new FileOutputStream(f), UTF_8)
val f = Files.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp")
val w = Files.newBufferedWriter(f, UTF_8)
(1 to LinesCount).foreach { l
w.append("a" * l).append("\n")
}
@ -70,12 +68,12 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
f
}
"File Source" must {
"FileSource" must {
"read contents from a file" in assertAllStagesStopped {
val chunkSize = 512
val bufferAttributes = Attributes.inputBuffer(1, 2)
val p = FileIO.fromFile(testFile, chunkSize)
val p = FileIO.fromPath(testFile, chunkSize)
.withAttributes(bufferAttributes)
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
@ -112,7 +110,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val demandAllButOneChunks = TestText.length / chunkSize - 1
val p = FileIO.fromFile(testFile, chunkSize)
val p = FileIO.fromPath(testFile, chunkSize)
.withAttributes(bufferAttributes)
.runWith(Sink.asPublisher(false))
@ -141,7 +139,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"onError whent trying to read from file which does not exist" in assertAllStagesStopped {
val p = FileIO.fromFile(notExistingFile).runWith(Sink.asPublisher(false))
val p = FileIO.fromPath(notExistingFile).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c)
@ -157,7 +155,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
import settings._
s"count lines in real file (chunkSize = $chunkSize, readAhead = $readAhead)" in {
val s = FileIO.fromFile(manyLines, chunkSize = chunkSize)
val s = FileIO.fromPath(manyLines, chunkSize = chunkSize)
.withAttributes(Attributes.inputBuffer(readAhead, readAhead))
val f = s.runWith(Sink.fold(0) { case (acc, l) acc + l.utf8String.count(_ == '\n') })
@ -171,7 +169,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
val p = FileIO.fromFile(manyLines).runWith(TestSink.probe)(materializer)
val p = FileIO.fromPath(manyLines).runWith(TestSink.probe)(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
@ -187,7 +185,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
implicit val timeout = Timeout(500.millis)
try {
val p = FileIO.fromFile(manyLines)
val p = FileIO.fromPath(manyLines)
.withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))
.runWith(TestSink.probe)(materializer)
@ -198,7 +196,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
"not signal onComplete more than once" in {
FileIO.fromFile(testFile, 2 * TestText.length)
FileIO.fromPath(testFile, 2 * TestText.length)
.runWith(TestSink.probe)
.requestNext(ByteString(TestText, UTF_8.name))
.expectComplete()
@ -207,8 +205,8 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
override def afterTermination(): Unit = {
testFile.delete()
manyLines.delete()
Files.delete(testFile)
Files.delete(manyLines)
}
}

View file

@ -3,9 +3,9 @@
*/
package akka.stream.impl.io
import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Path
import akka.Done
import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props }
@ -20,7 +20,7 @@ import scala.util.control.NonFatal
/** INTERNAL API */
private[akka] object FilePublisher {
def props(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = {
def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)")
require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)")
@ -35,7 +35,7 @@ private[akka] object FilePublisher {
}
/** INTERNAL API */
private[akka] final class FilePublisher(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging {
import FilePublisher._
@ -49,7 +49,7 @@ private[akka] final class FilePublisher(f: File, completionPromise: Promise[IORe
override def preStart() = {
try {
chan = FileChannel.open(f.toPath, FilePublisher.Read)
chan = FileChannel.open(f, FilePublisher.Read)
} catch {
case ex: Exception
onErrorThenStop(ex)

View file

@ -3,9 +3,8 @@
*/
package akka.stream.impl.io
import java.io.File
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.nio.file.{ Path, StandardOpenOption }
import akka.Done
import akka.actor.{ Deploy, ActorLogging, Props }
@ -19,14 +18,14 @@ import scala.util.{ Failure, Success }
/** INTERNAL API */
private[akka] object FileSubscriber {
def props(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = {
def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = {
require(bufSize > 0, "buffer size must be > 0")
Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local)
}
}
/** INTERNAL API */
private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
extends akka.stream.actor.ActorSubscriber
with ActorLogging {
@ -37,7 +36,7 @@ private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult]
private var bytesWritten: Long = 0
override def preStart(): Unit = try {
chan = FileChannel.open(f.toPath, openOptions.asJava)
chan = FileChannel.open(f, openOptions.asJava)
super.preStart()
} catch {
@ -57,7 +56,7 @@ private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult]
}
case ActorSubscriberMessage.OnError(ex)
log.error(ex, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath)
log.error(ex, "Tearing down FileSink({}) due to upstream error", f)
closeAndComplete(IOResult(bytesWritten, Failure(ex)))
context.stop(self)

View file

@ -3,8 +3,8 @@
*/
package akka.stream.impl.io
import java.io.{ File, OutputStream }
import java.nio.file.StandardOpenOption
import java.io.OutputStream
import java.nio.file.{ Path, StandardOpenOption }
import akka.stream.IOResult
import akka.stream.impl.SinkModule
import akka.stream.impl.StreamLayout.Module
@ -16,10 +16,10 @@ import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
* Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file
* Creates simple synchronous Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
*/
private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) {
override protected def label: String = s"FileSink($f, $options)"
@ -45,8 +45,7 @@ private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], va
/**
* INTERNAL API
* Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
* Creates simple synchronous Sink which writes all incoming elements to the output stream.
*/
private[akka] final class OutputStreamSink(createOutput: () OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean)
extends SinkModule[ByteString, Future[IOResult]](shape) {

View file

@ -3,7 +3,8 @@
*/
package akka.stream.impl.io
import java.io.{ File, InputStream }
import java.io.InputStream
import java.nio.file.Path
import akka.stream._
import akka.stream.ActorAttributes.Dispatcher
@ -17,9 +18,9 @@ import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
* Creates simple synchronous (Java 6 compatible) Source backed by the given file.
* Creates simple synchronous Source backed by the given file.
*/
private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[IOResult]](shape) {
require(chunkSize > 0, "chunkSize must be greater than 0")
override def create(context: MaterializationContext) = {

View file

@ -4,7 +4,7 @@
package akka.stream.javadsl
import java.io.File
import java.nio.file.StandardOpenOption
import java.nio.file.{ Path, StandardOpenOption }
import java.util
import akka.stream.{ scaladsl, javadsl }
import akka.stream.IOResult
@ -20,7 +20,7 @@ object FileIO {
/**
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
* Overwrites existing files, if you want to append to an existing file use [[#file(File, util.Set[StandardOpenOption])]].
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]].
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
@ -30,11 +30,26 @@ object FileIO {
*
* @param f The file to write to
*/
def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toFile(f).toCompletionStage())
@deprecated("Use `toPath` instead.", "2.4.5")
def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toPath(f.toPath)
/**
* Creates a Sink that writes incoming [[ByteString]] elements to the given file
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]].
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
*/
def toPath(f: Path): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toPath(f).toCompletionStage())
/**
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
@ -45,11 +60,27 @@ object FileIO {
* @param f The file to write to
* @param options File open options
*/
@deprecated("Use `toPath` instead.", "2.4.5")
def toFile(f: File, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toFile(f, options.asScala.toSet).toCompletionStage())
toPath(f.toPath)
/**
* Creates a Source from a Files contents.
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
* @param options File open options
*/
def toPath(f: Path, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage())
/**
* Creates a Source from a files contents.
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
* except the last element, which will be up to 8192 in size.
*
@ -59,10 +90,24 @@ object FileIO {
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*/
def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromFile(f, 8192)
@deprecated("Use `fromPath` instead.", "2.4.5")
def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f.toPath)
/**
* Creates a synchronous (Java 6 compatible) Source from a Files contents.
* Creates a Source from a files contents.
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
* except the last element, which will be up to 8192 in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*/
def fromPath(f: Path): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f, 8192)
/**
* Creates a synchronous Source from a files contents.
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
* except the last element, which will be up to `chunkSize` in size.
*
@ -72,7 +117,21 @@ object FileIO {
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*/
@deprecated("Use `fromPath` instead.", "2.4.5")
def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] =
new Source(scaladsl.FileIO.fromFile(f, chunkSize).toCompletionStage())
fromPath(f.toPath, chunkSize)
/**
* Creates a synchronous Source from a files contents.
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
* except the last element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*/
def fromPath(f: Path, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] =
new Source(scaladsl.FileIO.fromPath(f, chunkSize).toCompletionStage())
}

View file

@ -4,7 +4,7 @@
package akka.stream.scaladsl
import java.io.File
import java.nio.file.StandardOpenOption
import java.nio.file.{ Path, StandardOpenOption }
import java.nio.file.StandardOpenOption._
import akka.stream.impl.Stages.DefaultAttributes
@ -23,7 +23,7 @@ object FileIO {
import Source.{ shape sourceShape }
/**
* Creates a Source from a Files contents.
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
@ -33,10 +33,28 @@ object FileIO {
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*
* @param f the File to read from
* @param f the file to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
@deprecated("Use `fromPath` instead", "2.4.5")
def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
fromPath(f.toPath, chunkSize)
/**
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
* except the final element, which will be up to `chunkSize` in size.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*
* @param f the file to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource")))
/**
@ -48,9 +66,25 @@ object FileIO {
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*
* @param f the File to write to
* @param f the file to write to
* @param options File open options, defaults to Set(WRITE, CREATE)
*/
@deprecated("Use `toPath` instead", "2.4.5")
def toFile(f: File, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] =
toPath(f.toPath, options)
/**
* Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default.
*
* Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*
* @param f the file to write to
* @param options File open options, defaults to Set(WRITE, CREATE)
*/
def toPath(f: Path, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] =
new Sink(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
}

View file

@ -746,7 +746,11 @@ object MiMa extends AutoPlugin {
"2.4.4" -> Seq(
// #20342 HttpEntity scaladsl overrides
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withoutSizeLimit"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withSizeLimit")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withSizeLimit"),
// #20293 Use JDK7 NIO Path instead of File
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMessage#MessageTransformations.withEntity"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.withEntity")
)
)
}