Merge pull request #1321 from spray/wip-IO-WriteFile-rebased

Tcp: introduce WriteFile to support zero-copy writing of files, fixes #2896
This commit is contained in:
Roland Kuhn 2013-04-17 04:39:41 -07:00
commit cdf717e855
4 changed files with 208 additions and 59 deletions

View file

@ -4,8 +4,8 @@
package akka.io
import java.io.IOException
import java.net.{ ConnectException, InetSocketAddress, SocketException }
import java.io.{ FileOutputStream, File, IOException }
import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException }
import java.nio.ByteBuffer
import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel }
import java.nio.channels.spi.SelectorProvider
@ -190,6 +190,29 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
writer.expectMsg(Ack)
}
"write file to network" in withEstablishedConnection() { setup
import setup._
// hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath
val testFile =
classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader]
.getURLs
.filter(_.getProtocol == "file")
.map(url new File(url.toURI))
.filter(_.exists)
.sortBy(-_.length)
.head
// maximum of 100 MB
val size = math.min(testFile.length(), 100000000).toInt
object Ack
val writer = TestProbe()
writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack))
pullFromServerSide(size, 1000000)
writer.expectMsg(Ack)
}
/*
* Disabled on Windows: http://support.microsoft.com/kb/214397
*
@ -235,8 +258,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
writer.expectMsg(CommandFailed(secondWrite))
// reject even empty writes
writer.send(connectionActor, Write.Empty)
writer.expectMsg(CommandFailed(Write.Empty))
writer.send(connectionActor, Write.empty)
writer.expectMsg(CommandFailed(Write.empty))
// there will be immediately more space in the send buffer because
// some data will have been sent by now, so we assume we can write
@ -607,7 +630,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
*/
@tailrec final def pullFromServerSide(remaining: Int, remainingTries: Int = 1000): Unit =
if (remainingTries <= 0)
throw new AssertionError("Pulling took too many loops")
throw new AssertionError("Pulling took too many loops, remaining data: " + remaining)
else if (remaining > 0) {
if (selector.msgAvailable) {
selector.expectMsg(WriteInterest)

View file

@ -483,6 +483,17 @@ akka {
# Fully qualified config path which holds the dispatcher configuration
# for the selector management actors
management-dispatcher = "akka.actor.default-dispatcher"
# Fully qualified config path which holds the dispatcher configuration
# on which file IO tasks are scheduled
file-io-dispatcher = "akka.actor.default-dispatcher"
# The maximum number of bytes (or "unlimited") to transfer in one batch when using
# `WriteFile` command which uses `FileChannel.transferTo` to pipe files to a TCP socket.
# On some OS like Linux `FileChannel.transferTo` may block for a long time when network
# IO is faster than file IO. Decreasing the value may improve fairness while increasing
# may improve throughput.
file-io-transferTo-limit = 512 KiB
}
udp {

View file

@ -92,19 +92,42 @@ object Tcp extends ExtensionKey[TcpExt] {
case class NoAck(token: Any)
object NoAck extends NoAck(null)
sealed trait WriteCommand extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def ack: Any
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
/**
* Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object.
*/
case class Write(data: ByteString, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
case class Write(data: ByteString, ack: Any) extends WriteCommand
object Write {
val Empty: Write = Write(ByteString.empty, NoAck)
/**
* The empty Write doesn't write anything and isn't acknowledged.
* It will, however, be denied and sent back with `CommandFailed` if the
* connection isn't currently ready to send any data (because another WriteCommand
* is still pending).
*/
val empty: Write = Write(ByteString.empty, NoAck)
/**
* Create a new unacknowledged Write command with the given data.
*/
def apply(data: ByteString): Write =
if (data.isEmpty) Empty else Write(data, NoAck)
if (data.isEmpty) empty else Write(data, NoAck)
}
/**
* Write `count` bytes starting at `position` from file at `filePath` to the connection.
* When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The
* count must be > 0.
*/
case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
case object StopReading extends Command
@ -164,6 +187,11 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
case x getIntBytes("received-message-size-limit")
}
val ManagementDispatcher = getString("management-dispatcher")
val FileIODispatcher = getString("file-io-dispatcher")
val TransferToLimit = getString("file-io-transferTo-limit") match {
case "unlimited" Int.MaxValue
case _ getIntBytes("file-io-transferTo-limit")
}
require(NrOfSelectors > 0, "nr-of-selectors must be > 0")
require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'")
@ -187,6 +215,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
}
val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize)
val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher)
}
object TcpSO extends SoJavaFactories {

View file

@ -5,8 +5,8 @@
package akka.io
import java.net.InetSocketAddress
import java.io.IOException
import java.nio.channels.SocketChannel
import java.io.{ FileInputStream, IOException }
import java.nio.channels.{ FileChannel, SocketChannel }
import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.collection.immutable
@ -92,6 +92,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
doWrite(handler)
if (!writePending) // writing is now finished
handleClose(handler, closeCommander, closedEvent)
case SendBufferFull(remaining) pendingWrite = remaining; selector ! WriteInterest
case WriteFileFinished pendingWrite = null; handleClose(handler, closeCommander, closedEvent)
case WriteFileFailed(e) handleError(handler, e) // rethrow exception from dispatcher task
case Abort handleClose(handler, Some(sender), Aborted)
}
@ -107,7 +110,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
def handleWriteMessages(handler: ActorRef): Receive = {
case ChannelWritable if (writePending) doWrite(handler)
case write: Write if writePending
case write: WriteCommand if writePending
if (TraceLogging) log.debug("Dropping write because queue is full")
sender ! write.failureMessage
@ -115,9 +118,13 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
if (write.wantsAck)
sender ! write.ack
case write: Write
case write: WriteCommand
pendingWrite = createWrite(write)
doWrite(handler)
case SendBufferFull(remaining) pendingWrite = remaining; selector ! WriteInterest
case WriteFileFinished pendingWrite = null
case WriteFileFailed(e) handleError(handler, e) // rethrow exception from dispatcher task
}
// AUXILIARIES and IMPLEMENTATION
@ -173,32 +180,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
} finally bufferPool.release(buffer)
}
final def doWrite(handler: ActorRef): Unit = {
@tailrec def innerWrite(): Unit = {
val toWrite = pendingWrite.buffer.remaining()
require(toWrite != 0)
val writtenBytes = channel.write(pendingWrite.buffer)
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
pendingWrite = pendingWrite.consume(writtenBytes)
if (pendingWrite.hasData)
if (writtenBytes == toWrite) innerWrite() // wrote complete buffer, try again now
else selector ! WriteInterest // try again later
else { // everything written
if (pendingWrite.wantsAck)
pendingWrite.commander ! pendingWrite.ack
val buffer = pendingWrite.buffer
pendingWrite = null
bufferPool.release(buffer)
}
}
try innerWrite()
catch { case e: IOException handleError(handler, e) }
}
def doWrite(handler: ActorRef): Unit =
pendingWrite = pendingWrite.doWrite(handler)
def closeReason =
if (channel.socket.isOutputShutdown) ConfirmedClosed
@ -238,7 +221,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
context.stop(self)
}
def handleError(handler: ActorRef, exception: IOException): Unit = {
def handleError(handler: ActorRef, exception: IOException): Nothing = {
closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))
throw exception
@ -267,8 +250,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
if (channel.isOpen)
abort()
if (writePending)
bufferPool.release(pendingWrite.buffer)
if (writePending) pendingWrite.release()
if (closedMessage != null) {
val interestedInClose =
@ -282,29 +264,112 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
private[io] case class PendingWrite(
/** Create a pending write from a WriteCommand */
private[io] def createWrite(write: WriteCommand): PendingWrite = write match {
case write: Write
val buffer = bufferPool.acquire()
try {
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
} catch {
case NonFatal(e)
bufferPool.release(buffer)
throw e
}
case write: WriteFile
PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L)
}
private[io] case class PendingBufferWrite(
commander: ActorRef,
ack: Any,
remainingData: ByteString,
buffer: ByteBuffer) {
buffer: ByteBuffer) extends PendingWrite {
def consume(writtenBytes: Int): PendingWrite =
if (buffer.remaining() == 0) {
def release(): Unit = bufferPool.release(buffer)
def doWrite(handler: ActorRef): PendingWrite = {
@tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = {
val toWrite = pendingWrite.buffer.remaining()
require(toWrite != 0)
val writtenBytes = channel.write(pendingWrite.buffer)
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
val nextWrite = pendingWrite.consume(writtenBytes)
if (pendingWrite.hasData)
if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now
else {
selector ! WriteInterest
nextWrite
} // try again later
else { // everything written
if (pendingWrite.wantsAck)
pendingWrite.commander ! pendingWrite.ack
pendingWrite.release()
null
}
}
try innerWrite(this)
catch { case e: IOException handleError(handler, e) }
}
def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite =
if (buffer.hasRemaining) this
else {
buffer.clear()
val copied = remainingData.copyToBuffer(buffer)
buffer.flip()
copy(remainingData = remainingData.drop(copied))
} else this
def hasData = buffer.remaining() > 0 || remainingData.size > 0
def wantsAck = !ack.isInstanceOf[NoAck]
}
def createWrite(write: Write): PendingWrite = {
val buffer = bufferPool.acquire()
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
}
PendingWrite(sender, write.ack, write.data.drop(copied), buffer)
private[io] case class PendingWriteFile(
commander: ActorRef,
write: WriteFile,
fileChannel: FileChannel,
alreadyWritten: Long) extends PendingWrite {
def doWrite(handler: ActorRef): PendingWrite = {
tcp.fileIoDispatcher.execute(writeFileRunnable(this))
this
}
def ack: Any = write.ack
/** Release any open resources */
def release() { fileChannel.close() }
def updatedWrite(nowWritten: Long): PendingWriteFile = {
require(nowWritten < write.count)
copy(alreadyWritten = nowWritten)
}
def remainingBytes = write.count - alreadyWritten
def currentPosition = write.position + alreadyWritten
}
private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable =
new Runnable {
def run(): Unit = try {
import pendingWrite._
val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit)
val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel)
if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes))
else { // finished
if (wantsAck) commander ! write.ack
self ! WriteFileFinished
pendingWrite.release()
}
} catch {
case e: IOException self ! WriteFileFailed(e)
}
}
}
@ -324,4 +389,25 @@ private[io] object TcpConnection {
case class CloseInformation(
notificationsTo: Set[ActorRef],
closedEvent: Event)
// INTERNAL MESSAGES
/** Informs actor that no writing was possible but there is still work remaining */
case class SendBufferFull(remainingWrite: PendingWrite)
/** Informs actor that a pending file write has finished */
case object WriteFileFinished
/** Informs actor that a pending WriteFile failed */
case class WriteFileFailed(e: IOException)
/** Abstraction over pending writes */
trait PendingWrite {
def commander: ActorRef
def ack: Any
def wantsAck = !ack.isInstanceOf[NoAck]
def doWrite(handler: ActorRef): PendingWrite
/** Release any open resources */
def release(): Unit
}
}