Merge pull request #1709 from spray/wip-3581-io-compound-write

!act #3581 Add Tcp.CompoundWrite, some cleanup
This commit is contained in:
Roland Kuhn 2013-10-01 03:35:14 -07:00
commit e05d30aeaa
7 changed files with 286 additions and 120 deletions

View file

@ -24,7 +24,8 @@ import akka.util.{ Helpers, ByteString }
import akka.TestUtils._
object TcpConnectionSpec {
case object Ack extends Event
case class Ack(i: Int) extends Event
object Ack extends Ack(0)
case class Registration(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded
}
@ -152,10 +153,6 @@ class TcpConnectionSpec extends AkkaSpec("""
run {
val writer = TestProbe()
// directly acknowledge an empty write
writer.send(connectionActor, Write(ByteString.empty, Ack))
writer.expectMsg(Ack)
// reply to write commander with Ack
val ackedWrite = Write(ByteString("testdata"), Ack)
val buffer = ByteBuffer.allocate(100)
@ -174,8 +171,7 @@ class TcpConnectionSpec extends AkkaSpec("""
writer.expectNoMsg(500.millis)
pullFromServerSide(remaining = 10, into = buffer)
buffer.flip()
buffer.limit must be(10)
ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!")
ByteString(buffer).utf8String must be("morestuff!")
}
}
@ -227,6 +223,29 @@ class TcpConnectionSpec extends AkkaSpec("""
}
}
"write a CompoundWrite to the network and produce correct ACKs" in new EstablishedConnectionTest() {
run {
val writer = TestProbe()
val compoundWrite =
Write(ByteString("test1"), Ack(1)) +:
Write(ByteString("test2")) +:
Write(ByteString.empty, Ack(3)) +:
Write(ByteString("test4"), Ack(4))
// reply to write commander with Ack
val buffer = ByteBuffer.allocate(100)
serverSideChannel.read(buffer) must be(0)
writer.send(connectionActor, compoundWrite)
pullFromServerSide(remaining = 15, into = buffer)
buffer.flip()
ByteString(buffer).utf8String must be("test1test2test4")
writer.expectMsg(Ack(1))
writer.expectMsg(Ack(3))
writer.expectMsg(Ack(4))
}
}
/*
* Disabled on Windows: http://support.microsoft.com/kb/214397
*

View file

@ -10,6 +10,7 @@ import akka.io.Inet._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.util.ByteString
import akka.util.Helpers.Requiring
import akka.actor._
@ -240,9 +241,56 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
object NoAck extends NoAck(null)
/**
* Common interface for all write commands, currently [[Write]] and [[WriteFile]].
* Common interface for all write commands, currently [[Write]], [[WriteFile]] and [[CompoundWrite]].
*/
sealed trait WriteCommand extends Command {
sealed abstract class WriteCommand extends Command {
/**
* Prepends this command with another `Write` or `WriteFile` to form
* a `CompoundWrite`.
*/
def +:(other: SimpleWriteCommand): CompoundWrite = CompoundWrite(other, this)
/**
* Prepends this command with a number of other writes.
* The first element of the given Iterable becomes the first sub write of a potentially
* created `CompoundWrite`.
*/
def ++:(writes: Iterable[WriteCommand]): WriteCommand =
writes.foldRight(this) {
case (a: SimpleWriteCommand, b) a +: b
case (a: CompoundWrite, b) a ++: b
}
/**
* Java API: prepends this command with another `Write` or `WriteFile` to form
* a `CompoundWrite`.
*/
def prepend(that: SimpleWriteCommand): CompoundWrite = that +: this
/**
* Java API: prepends this command with a number of other writes.
* The first element of the given Iterable becomes the first sub write of a potentially
* created `CompoundWrite`.
*/
def prepend(writes: JIterable[WriteCommand]): WriteCommand = writes.asScala ++: this
}
object WriteCommand {
/**
* Combines the given number of write commands into one atomic `WriteCommand`.
*/
def apply(writes: Iterable[WriteCommand]): WriteCommand = writes ++: Write.empty
/**
* Java API: combines the given number of write commands into one atomic `WriteCommand`.
*/
def create(writes: JIterable[WriteCommand]): WriteCommand = apply(writes.asScala)
}
/**
* Common supertype of [[Write]] and [[WriteFile]].
*/
sealed abstract class SimpleWriteCommand extends WriteCommand {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
/**
@ -255,6 +303,11 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* equivalent to the [[#ack]] token not being a of type [[NoAck]].
*/
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
/**
* Java API: appends this command with another `WriteCommand` to form a `CompoundWrite`.
*/
def append(that: WriteCommand): CompoundWrite = this +: that
}
/**
@ -267,7 +320,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
case class Write(data: ByteString, ack: Event) extends WriteCommand
case class Write(data: ByteString, ack: Event) extends SimpleWriteCommand
object Write {
/**
* The empty Write doesn't write anything and isn't acknowledged.
@ -294,11 +347,35 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand {
case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
/**
* A write command which aggregates two other write commands. Using this construct
* you can chain a number of [[Write]] and/or [[WriteFile]] commands together in a way
* that allows them to be handled as a single write which gets written out to the
* network as quickly as possible.
* If the sub commands contain `ack` requests they will be honored as soon as the
* respective write has been written completely.
*/
case class CompoundWrite(override val head: SimpleWriteCommand, tailCommand: WriteCommand) extends WriteCommand
with immutable.Iterable[SimpleWriteCommand] {
def iterator: Iterator[SimpleWriteCommand] =
new Iterator[SimpleWriteCommand] {
private[this] var current: WriteCommand = CompoundWrite.this
def hasNext: Boolean = current ne null
def next(): SimpleWriteCommand =
current match {
case null Iterator.empty.next()
case CompoundWrite(h, t) current = t; h
case x: SimpleWriteCommand current = null; x
}
}
}
/**
* When `useResumeWriting` is in effect as was indicated in the [[Register]] message
* then this command needs to be sent to the connection actor in order to re-enable

View file

@ -32,13 +32,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
import tcp.bufferPool
import TcpConnection._
private[this] var pendingWrite: PendingWrite = _
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
def writePending = pendingWrite ne null
def writePending = pendingWrite ne EmptyPendingWrite
// STATES
@ -95,8 +95,12 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
doWrite(info)
if (!writePending) // writing is now finished
handleClose(info, closeCommander, closedEvent)
case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
case WriteFileFinished { pendingWrite = null; handleClose(info, closeCommander, closedEvent) }
case UpdatePendingWrite(remaining)
pendingWrite = remaining
if (writePending) info.registration.enableInterest(OP_WRITE)
else handleClose(info, closeCommander, closedEvent)
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
case Abort handleClose(info, Some(sender), Aborted)
@ -130,13 +134,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
sender ! write.failureMessage
if (info.useResumeWriting) writingSuspended = true
} else write match {
case Write(data, ack) if data.isEmpty
if (write.wantsAck) sender ! ack
case _
pendingWrite = createWrite(write)
doWrite(info)
} else {
pendingWrite = PendingWrite(sender, write)
if (writePending) doWrite(info)
}
case ResumeWriting
@ -156,8 +156,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
else sender ! CommandFailed(ResumeWriting)
} else sender ! WritingResumed
case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
case WriteFileFinished pendingWrite = null
case UpdatePendingWrite(remaining)
pendingWrite = remaining
if (writePending) info.registration.enableInterest(OP_WRITE)
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
}
@ -301,109 +303,103 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
/** Create a pending write from a WriteCommand */
private[io] def createWrite(write: WriteCommand): PendingWrite = write match {
case write: Write
def PendingWrite(commander: ActorRef, write: WriteCommand): PendingWrite = {
@tailrec def create(head: WriteCommand, tail: WriteCommand = Write.empty): PendingWrite =
head match {
case Write.empty if (tail eq Write.empty) EmptyPendingWrite else create(tail)
case Write(data, ack) if data.nonEmpty PendingBufferWrite(commander, data, ack, tail)
case WriteFile(path, offset, count, ack) PendingWriteFile(commander, path, offset, count, ack, tail)
case CompoundWrite(h, t) create(h, t)
case x @ Write(_, ack) // empty write with either an ACK or a non-standard NoACK
if (x.wantsAck) commander ! ack
create(tail)
}
create(write)
}
def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = {
val buffer = bufferPool.acquire()
try {
val copied = write.data.copyToBuffer(buffer)
val copied = data.copyToBuffer(buffer)
buffer.flip()
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail)
} catch {
case NonFatal(e)
bufferPool.release(buffer)
throw e
}
case write: WriteFile
PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L)
}
private[io] case class PendingBufferWrite(
commander: ActorRef,
ack: Any,
class PendingBufferWrite(
val commander: ActorRef,
remainingData: ByteString,
buffer: ByteBuffer) extends PendingWrite {
def release(): Unit = bufferPool.release(buffer)
ack: Any,
buffer: ByteBuffer,
tail: WriteCommand) extends PendingWrite {
def doWrite(info: ConnectionInfo): PendingWrite = {
@tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = {
val toWrite = pendingWrite.buffer.remaining()
require(toWrite != 0)
val writtenBytes = channel.write(pendingWrite.buffer)
@tailrec def writeToChannel(data: ByteString): PendingWrite = {
val writtenBytes = channel.write(buffer) // at first we try to drain the remaining bytes from the buffer
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
if (buffer.hasRemaining) {
// we weren't able to write all bytes from the buffer, so we need to try again later
if (data eq remainingData) this
else new PendingBufferWrite(commander, data, ack, buffer, tail) // copy with updated remainingData
val nextWrite = pendingWrite.consume(writtenBytes)
if (pendingWrite.hasData)
if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now
else {
info.registration.enableInterest(OP_WRITE)
nextWrite
} // try again later
else { // everything written
if (pendingWrite.wantsAck)
pendingWrite.commander ! pendingWrite.ack
pendingWrite.release()
null
}
}
try innerWrite(this)
catch { case e: IOException handleError(info.handler, e); this }
}
def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite =
if (buffer.hasRemaining) this
else {
} else if (data.nonEmpty) {
buffer.clear()
val copied = remainingData.copyToBuffer(buffer)
buffer.flip()
copy(remainingData = remainingData.drop(copied))
writeToChannel(remainingData drop copied)
} else {
if (!ack.isInstanceOf[NoAck]) commander ! ack
release()
PendingWrite(commander, tail)
}
}
try {
val next = writeToChannel(remainingData)
if (next ne EmptyPendingWrite) info.registration.enableInterest(OP_WRITE)
next
} catch { case e: IOException handleError(info.handler, e); this }
}
private[io] case class PendingWriteFile(
commander: ActorRef,
write: WriteFile,
def release(): Unit = bufferPool.release(buffer)
}
def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event,
tail: WriteCommand): PendingWriteFile =
new PendingWriteFile(commander, new FileInputStream(filePath).getChannel, offset, count, ack, tail)
class PendingWriteFile(
val commander: ActorRef,
fileChannel: FileChannel,
alreadyWritten: Long) extends PendingWrite {
offset: Long,
remaining: Long,
ack: Event,
tail: WriteCommand) extends PendingWrite with Runnable {
def doWrite(info: ConnectionInfo): PendingWrite = {
tcp.fileIoDispatcher.execute(writeFileRunnable(this))
tcp.fileIoDispatcher.execute(this)
this
}
def ack: Any = write.ack
def release(): Unit = fileChannel.close()
/** Release any open resources */
def release() { fileChannel.close() }
def run(): Unit =
try {
val toWrite = math.min(remaining, tcp.Settings.TransferToLimit)
val written = fileChannel.transferTo(offset, toWrite, channel)
def updatedWrite(nowWritten: Long): PendingWriteFile = {
require(nowWritten < write.count)
copy(alreadyWritten = nowWritten)
}
if (written < remaining) {
val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
self ! UpdatePendingWrite(updated)
def remainingBytes = write.count - alreadyWritten
def currentPosition = write.position + alreadyWritten
}
private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable =
new Runnable {
def run(): Unit = try {
import pendingWrite._
val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit)
val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel)
if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes))
else { // finished
if (wantsAck) commander ! write.ack
self ! WriteFileFinished
pendingWrite.release()
} else {
if (!ack.isInstanceOf[NoAck]) commander ! ack
release()
self ! UpdatePendingWrite(PendingWrite(commander, tail))
}
} catch {
case e: IOException self ! WriteFileFailed(e)
@ -436,22 +432,18 @@ private[io] object TcpConnection {
// INTERNAL MESSAGES
/** Informs actor that no writing was possible but there is still work remaining */
case class SendBufferFull(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
/** Informs actor that a pending file write has finished */
case object WriteFileFinished
/** Informs actor that a pending WriteFile failed */
case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
case class WriteFileFailed(e: IOException)
/** Abstraction over pending writes */
trait PendingWrite {
sealed abstract class PendingWrite {
def commander: ActorRef
def ack: Any
def wantsAck = !ack.isInstanceOf[NoAck]
def doWrite(info: ConnectionInfo): PendingWrite
def release(): Unit // free any occupied resources
}
/** Release any open resources */
def release(): Unit
object EmptyPendingWrite extends PendingWrite {
def commander: ActorRef = throw new IllegalStateException
def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException
def release(): Unit = throw new IllegalStateException
}
}

View file

@ -125,6 +125,43 @@ it receives one of the above close commands.
All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events
may handle all close events in the same way.
Writing to a connection
-----------------------
Once a connection has been established data can be sent to it from any actor in the form of a ``Tcp.WriteCommand``.
``Tcp.WriteCommand`` is an abstract class with three concrete implementations:
Tcp.Write
The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event.
A ``ByteString`` (as explained in :ref:`this section <ByteString>`) models one or more chunks of immutable
in-memory data with a maximum (total) size of 2 GB (2^31 bytes).
Tcp.WriteFile
If you want to send "raw" data from a file you can do so efficiently with the ``Tcp.WriteFile`` command.
This allows you do designate a (contiguous) chunk of on-disk bytes for sending across the connection without
the need to first load them into the JVM memory. As such ``Tcp.WriteFile`` can "hold" more than 2GB of data and
an "ack" event if required.
Tcp.CompoundWrite
Sometimes you might want to group (or interleave) several ``Tcp.Write`` and/or ``Tcp.WriteFile`` commands into
one atomic write command which gets written to the connection in one go. The ``Tcp.CompoundWrite`` allows you
to do just that and offers three benefits:
1. As explained in the following section the TCP connection actor can only handle one single write command at a time.
By combining several writes into one ``CompoundWrite`` you can have them be sent across the connection with
minimum overhead and without the need to spoon feed them to the connection actor via an *ACK-based* message
protocol.
2. Because a ``WriteCommand`` is atomic you can be sure that no other actor can "inject" other writes into your
series of writes if you combine them into one single ``CompoundWrite``. In scenarios where several actors write
to the same connection this can be an important feature which can be somewhat hard to achieve otherwise.
3. The "sub writes" of a ``CompoundWrite`` are regular ``Write`` or ``WriteFile`` commands that themselves can request
"ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to
attach more than one ACK to a ``Write`` or ``WriteFile`` (by combining it with an empty write that itself requests
an ACK) or to have the connection actor acknowledge the progress of transmitting the ``CompoundWrite`` by sending
out intermediate ACKs at arbitrary points.
Throttling Reads and Writes
---------------------------

View file

@ -85,6 +85,8 @@ nacked messages it may need to keep a buffer of pending messages.
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
not error handling. In other words, data may still be lost, even if every write is acknowledged.
.. _ByteString:
ByteString
^^^^^^^^^^

View file

@ -126,6 +126,43 @@ it receives one of the above close commands.
All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events
may handle all close events in the same way.
Writing to a connection
-----------------------
Once a connection has been established data can be sent to it from any actor in the form of a ``Tcp.WriteCommand``.
``Tcp.WriteCommand`` is an abstract class with three concrete implementations:
Tcp.Write
The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event.
A ``ByteString`` (as explained in :ref:`this section <ByteString>`) models one or more chunks of immutable
in-memory data with a maximum (total) size of 2 GB (2^31 bytes).
Tcp.WriteFile
If you want to send "raw" data from a file you can do so efficiently with the ``Tcp.WriteFile`` command.
This allows you do designate a (contiguous) chunk of on-disk bytes for sending across the connection without
the need to first load them into the JVM memory. As such ``Tcp.WriteFile`` can "hold" more than 2GB of data and
an "ack" event if required.
Tcp.CompoundWrite
Sometimes you might want to group (or interleave) several ``Tcp.Write`` and/or ``Tcp.WriteFile`` commands into
one atomic write command which gets written to the connection in one go. The ``Tcp.CompoundWrite`` allows you
to do just that and offers three benefits:
1. As explained in the following section the TCP connection actor can only handle one single write command at a time.
By combining several writes into one ``CompoundWrite`` you can have them be sent across the connection with
minimum overhead and without the need to spoon feed them to the connection actor via an *ACK-based* message
protocol.
2. Because a ``WriteCommand`` is atomic you can be sure that no other actor can "inject" other writes into your
series of writes if you combine them into one single ``CompoundWrite``. In scenarios where several actors write
to the same connection this can be an important feature which can be somewhat hard to achieve otherwise.
3. The "sub writes" of a ``CompoundWrite`` are regular ``Write`` or ``WriteFile`` commands that themselves can request
"ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to
attach more than one ACK to a ``Write`` or ``WriteFile`` (by combining it with an empty write that itself requests
an ACK) or to have the connection actor acknowledge the progress of transmitting the ``CompoundWrite`` by sending
out intermediate ACKs at arbitrary points.
Throttling Reads and Writes
---------------------------

View file

@ -90,6 +90,8 @@ nacked messages it may need to keep a buffer of pending messages.
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
not error handling. In other words, data may still be lost, even if every write is acknowledged.
.. _ByteString:
ByteString
^^^^^^^^^^