2015-04-16 02:24:01 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2015-05-29 16:43:02 +02:00
|
|
|
package akka.stream.impl.io
|
2015-04-16 02:24:01 +02:00
|
|
|
|
2016-01-14 14:55:42 +01:00
|
|
|
import java.io.File
|
2015-04-16 02:24:01 +02:00
|
|
|
import java.nio.channels.FileChannel
|
2016-01-14 14:55:42 +01:00
|
|
|
import java.util.Collections
|
2015-04-16 02:24:01 +02:00
|
|
|
|
2015-05-29 16:43:02 +02:00
|
|
|
import akka.actor.{ Deploy, ActorLogging, Props }
|
2015-04-16 02:24:01 +02:00
|
|
|
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
|
|
|
|
|
import akka.util.ByteString
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.Promise
|
|
|
|
|
|
|
|
|
|
/** INTERNAL API */
|
2015-11-14 22:42:22 +01:00
|
|
|
private[akka] object FileSubscriber {
|
2015-04-16 02:24:01 +02:00
|
|
|
def props(f: File, completionPromise: Promise[Long], bufSize: Int, append: Boolean) = {
|
|
|
|
|
require(bufSize > 0, "buffer size must be > 0")
|
2015-11-14 22:42:22 +01:00
|
|
|
Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local)
|
2015-04-16 02:24:01 +02:00
|
|
|
}
|
|
|
|
|
|
2016-01-14 14:55:42 +01:00
|
|
|
import java.nio.file.StandardOpenOption._
|
|
|
|
|
val Write = Collections.singleton(WRITE)
|
|
|
|
|
val Append = Collections.singleton(APPEND)
|
2015-04-16 02:24:01 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** INTERNAL API */
|
2015-11-14 22:42:22 +01:00
|
|
|
private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bufSize: Int, append: Boolean)
|
2015-04-16 02:24:01 +02:00
|
|
|
extends akka.stream.actor.ActorSubscriber
|
|
|
|
|
with ActorLogging {
|
|
|
|
|
|
|
|
|
|
override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
|
|
|
|
|
|
|
|
|
|
private var chan: FileChannel = _
|
|
|
|
|
|
|
|
|
|
private var bytesWritten: Long = 0
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = try {
|
2016-01-14 14:55:42 +01:00
|
|
|
val openOptions = if (append) FileSubscriber.Append else FileSubscriber.Write
|
|
|
|
|
chan = FileChannel.open(f.toPath, openOptions)
|
2015-04-16 02:24:01 +02:00
|
|
|
|
|
|
|
|
super.preStart()
|
|
|
|
|
} catch {
|
|
|
|
|
case ex: Exception ⇒
|
|
|
|
|
bytesWrittenPromise.failure(ex)
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case ActorSubscriberMessage.OnNext(bytes: ByteString) ⇒
|
|
|
|
|
try {
|
|
|
|
|
bytesWritten += chan.write(bytes.asByteBuffer)
|
|
|
|
|
} catch {
|
|
|
|
|
case ex: Exception ⇒
|
|
|
|
|
bytesWrittenPromise.failure(ex)
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case ActorSubscriberMessage.OnError(cause) ⇒
|
2015-11-14 22:42:22 +01:00
|
|
|
log.error(cause, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath)
|
2015-04-16 02:24:01 +02:00
|
|
|
context.stop(self)
|
|
|
|
|
|
|
|
|
|
case ActorSubscriberMessage.OnComplete ⇒
|
|
|
|
|
try {
|
|
|
|
|
chan.force(true)
|
|
|
|
|
} catch {
|
|
|
|
|
case ex: Exception ⇒
|
|
|
|
|
bytesWrittenPromise.failure(ex)
|
|
|
|
|
}
|
|
|
|
|
context.stop(self)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
|
|
|
|
bytesWrittenPromise.trySuccess(bytesWritten)
|
|
|
|
|
|
|
|
|
|
if (chan ne null) chan.close()
|
|
|
|
|
super.postStop()
|
|
|
|
|
}
|
|
|
|
|
}
|