diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 9560548293..b10df3320c 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -24,7 +24,8 @@ import akka.util.{ Helpers, ByteString } import akka.TestUtils._ object TcpConnectionSpec { - case object Ack extends Event + case class Ack(i: Int) extends Event + object Ack extends Ack(0) case class Registration(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded } @@ -152,10 +153,6 @@ class TcpConnectionSpec extends AkkaSpec(""" run { val writer = TestProbe() - // directly acknowledge an empty write - writer.send(connectionActor, Write(ByteString.empty, Ack)) - writer.expectMsg(Ack) - // reply to write commander with Ack val ackedWrite = Write(ByteString("testdata"), Ack) val buffer = ByteBuffer.allocate(100) @@ -174,8 +171,7 @@ class TcpConnectionSpec extends AkkaSpec(""" writer.expectNoMsg(500.millis) pullFromServerSide(remaining = 10, into = buffer) buffer.flip() - buffer.limit must be(10) - ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") + ByteString(buffer).utf8String must be("morestuff!") } } @@ -227,6 +223,29 @@ class TcpConnectionSpec extends AkkaSpec(""" } } + "write a CompoundWrite to the network and produce correct ACKs" in new EstablishedConnectionTest() { + run { + val writer = TestProbe() + val compoundWrite = + Write(ByteString("test1"), Ack(1)) +: + Write(ByteString("test2")) +: + Write(ByteString.empty, Ack(3)) +: + Write(ByteString("test4"), Ack(4)) + + // reply to write commander with Ack + val buffer = ByteBuffer.allocate(100) + serverSideChannel.read(buffer) must be(0) + writer.send(connectionActor, compoundWrite) + + pullFromServerSide(remaining = 15, into = buffer) + buffer.flip() + ByteString(buffer).utf8String must be("test1test2test4") + writer.expectMsg(Ack(1)) + writer.expectMsg(Ack(3)) + writer.expectMsg(Ack(4)) + } + } + /* * Disabled on Windows: http://support.microsoft.com/kb/214397 * diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index e6b0c6e1bd..9076155e55 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -10,6 +10,7 @@ import akka.io.Inet._ import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable +import scala.collection.JavaConverters._ import akka.util.ByteString import akka.util.Helpers.Requiring import akka.actor._ @@ -240,9 +241,56 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { object NoAck extends NoAck(null) /** - * Common interface for all write commands, currently [[Write]] and [[WriteFile]]. + * Common interface for all write commands, currently [[Write]], [[WriteFile]] and [[CompoundWrite]]. */ - sealed trait WriteCommand extends Command { + sealed abstract class WriteCommand extends Command { + /** + * Prepends this command with another `Write` or `WriteFile` to form + * a `CompoundWrite`. + */ + def +:(other: SimpleWriteCommand): CompoundWrite = CompoundWrite(other, this) + + /** + * Prepends this command with a number of other writes. + * The first element of the given Iterable becomes the first sub write of a potentially + * created `CompoundWrite`. + */ + def ++:(writes: Iterable[WriteCommand]): WriteCommand = + writes.foldRight(this) { + case (a: SimpleWriteCommand, b) ⇒ a +: b + case (a: CompoundWrite, b) ⇒ a ++: b + } + + /** + * Java API: prepends this command with another `Write` or `WriteFile` to form + * a `CompoundWrite`. + */ + def prepend(that: SimpleWriteCommand): CompoundWrite = that +: this + + /** + * Java API: prepends this command with a number of other writes. + * The first element of the given Iterable becomes the first sub write of a potentially + * created `CompoundWrite`. + */ + def prepend(writes: JIterable[WriteCommand]): WriteCommand = writes.asScala ++: this + } + + object WriteCommand { + /** + * Combines the given number of write commands into one atomic `WriteCommand`. + */ + def apply(writes: Iterable[WriteCommand]): WriteCommand = writes ++: Write.empty + + /** + * Java API: combines the given number of write commands into one atomic `WriteCommand`. + */ + def create(writes: JIterable[WriteCommand]): WriteCommand = apply(writes.asScala) + } + + /** + * Common supertype of [[Write]] and [[WriteFile]]. + */ + sealed abstract class SimpleWriteCommand extends WriteCommand { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") /** @@ -255,6 +303,11 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * equivalent to the [[#ack]] token not being a of type [[NoAck]]. */ def wantsAck: Boolean = !ack.isInstanceOf[NoAck] + + /** + * Java API: appends this command with another `WriteCommand` to form a `CompoundWrite`. + */ + def append(that: WriteCommand): CompoundWrite = this +: that } /** @@ -267,7 +320,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * or have been sent! Unfortunately there is no way to determine whether * a particular write has been sent by the O/S. */ - case class Write(data: ByteString, ack: Event) extends WriteCommand + case class Write(data: ByteString, ack: Event) extends SimpleWriteCommand object Write { /** * The empty Write doesn't write anything and isn't acknowledged. @@ -294,11 +347,35 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * or have been sent! Unfortunately there is no way to determine whether * a particular write has been sent by the O/S. */ - case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand { + case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends SimpleWriteCommand { require(position >= 0, "WriteFile.position must be >= 0") require(count > 0, "WriteFile.count must be > 0") } + /** + * A write command which aggregates two other write commands. Using this construct + * you can chain a number of [[Write]] and/or [[WriteFile]] commands together in a way + * that allows them to be handled as a single write which gets written out to the + * network as quickly as possible. + * If the sub commands contain `ack` requests they will be honored as soon as the + * respective write has been written completely. + */ + case class CompoundWrite(override val head: SimpleWriteCommand, tailCommand: WriteCommand) extends WriteCommand + with immutable.Iterable[SimpleWriteCommand] { + + def iterator: Iterator[SimpleWriteCommand] = + new Iterator[SimpleWriteCommand] { + private[this] var current: WriteCommand = CompoundWrite.this + def hasNext: Boolean = current ne null + def next(): SimpleWriteCommand = + current match { + case null ⇒ Iterator.empty.next() + case CompoundWrite(h, t) ⇒ current = t; h + case x: SimpleWriteCommand ⇒ current = null; x + } + } + } + /** * When `useResumeWriting` is in effect as was indicated in the [[Register]] message * then this command needs to be sent to the connection actor in order to re-enable diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 839c9af554..6f032d96db 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -32,13 +32,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha import tcp.bufferPool import TcpConnection._ - private[this] var pendingWrite: PendingWrite = _ + private[this] var pendingWrite: PendingWrite = EmptyPendingWrite private[this] var peerClosed = false private[this] var writingSuspended = false private[this] var interestedInResume: Option[ActorRef] = None var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop - def writePending = pendingWrite ne null + def writePending = pendingWrite ne EmptyPendingWrite // STATES @@ -95,11 +95,15 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha doWrite(info) if (!writePending) // writing is now finished handleClose(info, closeCommander, closedEvent) - case SendBufferFull(remaining) ⇒ { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) } - case WriteFileFinished ⇒ { pendingWrite = null; handleClose(info, closeCommander, closedEvent) } - case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task - case Abort ⇒ handleClose(info, Some(sender), Aborted) + case UpdatePendingWrite(remaining) ⇒ + pendingWrite = remaining + if (writePending) info.registration.enableInterest(OP_WRITE) + else handleClose(info, closeCommander, closedEvent) + + case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task + + case Abort ⇒ handleClose(info, Some(sender), Aborted) } /** connection is closed on our side and we're waiting from confirmation from the other side */ @@ -130,13 +134,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha sender ! write.failureMessage if (info.useResumeWriting) writingSuspended = true - } else write match { - case Write(data, ack) if data.isEmpty ⇒ - if (write.wantsAck) sender ! ack - - case _ ⇒ - pendingWrite = createWrite(write) - doWrite(info) + } else { + pendingWrite = PendingWrite(sender, write) + if (writePending) doWrite(info) } case ResumeWriting ⇒ @@ -156,9 +156,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha else sender ! CommandFailed(ResumeWriting) } else sender ! WritingResumed - case SendBufferFull(remaining) ⇒ { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) } - case WriteFileFinished ⇒ pendingWrite = null - case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task + case UpdatePendingWrite(remaining) ⇒ + pendingWrite = remaining + if (writePending) info.registration.enableInterest(OP_WRITE) + + case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task } // AUXILIARIES and IMPLEMENTATION @@ -301,114 +303,108 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha override def postRestart(reason: Throwable): Unit = throw new IllegalStateException("Restarting not supported for connection actors.") - /** Create a pending write from a WriteCommand */ - private[io] def createWrite(write: WriteCommand): PendingWrite = write match { - case write: Write ⇒ - val buffer = bufferPool.acquire() - - try { - val copied = write.data.copyToBuffer(buffer) - buffer.flip() - - PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer) - } catch { - case NonFatal(e) ⇒ - bufferPool.release(buffer) - throw e + def PendingWrite(commander: ActorRef, write: WriteCommand): PendingWrite = { + @tailrec def create(head: WriteCommand, tail: WriteCommand = Write.empty): PendingWrite = + head match { + case Write.empty ⇒ if (tail eq Write.empty) EmptyPendingWrite else create(tail) + case Write(data, ack) if data.nonEmpty ⇒ PendingBufferWrite(commander, data, ack, tail) + case WriteFile(path, offset, count, ack) ⇒ PendingWriteFile(commander, path, offset, count, ack, tail) + case CompoundWrite(h, t) ⇒ create(h, t) + case x @ Write(_, ack) ⇒ // empty write with either an ACK or a non-standard NoACK + if (x.wantsAck) commander ! ack + create(tail) } - case write: WriteFile ⇒ - PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L) + create(write) } - private[io] case class PendingBufferWrite( - commander: ActorRef, - ack: Any, - remainingData: ByteString, - buffer: ByteBuffer) extends PendingWrite { + def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = { + val buffer = bufferPool.acquire() + try { + val copied = data.copyToBuffer(buffer) + buffer.flip() + new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail) + } catch { + case NonFatal(e) ⇒ + bufferPool.release(buffer) + throw e + } + } - def release(): Unit = bufferPool.release(buffer) + class PendingBufferWrite( + val commander: ActorRef, + remainingData: ByteString, + ack: Any, + buffer: ByteBuffer, + tail: WriteCommand) extends PendingWrite { def doWrite(info: ConnectionInfo): PendingWrite = { - @tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = { - val toWrite = pendingWrite.buffer.remaining() - require(toWrite != 0) - val writtenBytes = channel.write(pendingWrite.buffer) + @tailrec def writeToChannel(data: ByteString): PendingWrite = { + val writtenBytes = channel.write(buffer) // at first we try to drain the remaining bytes from the buffer if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes) + if (buffer.hasRemaining) { + // we weren't able to write all bytes from the buffer, so we need to try again later + if (data eq remainingData) this + else new PendingBufferWrite(commander, data, ack, buffer, tail) // copy with updated remainingData - val nextWrite = pendingWrite.consume(writtenBytes) + } else if (data.nonEmpty) { + buffer.clear() + val copied = remainingData.copyToBuffer(buffer) + buffer.flip() + writeToChannel(remainingData drop copied) - if (pendingWrite.hasData) - if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now - else { - info.registration.enableInterest(OP_WRITE) - nextWrite - } // try again later - else { // everything written - if (pendingWrite.wantsAck) - pendingWrite.commander ! pendingWrite.ack - - pendingWrite.release() - null + } else { + if (!ack.isInstanceOf[NoAck]) commander ! ack + release() + PendingWrite(commander, tail) } } - - try innerWrite(this) - catch { case e: IOException ⇒ handleError(info.handler, e); this } + try { + val next = writeToChannel(remainingData) + if (next ne EmptyPendingWrite) info.registration.enableInterest(OP_WRITE) + next + } catch { case e: IOException ⇒ handleError(info.handler, e); this } } - def hasData = buffer.hasRemaining || remainingData.nonEmpty - def consume(writtenBytes: Int): PendingBufferWrite = - if (buffer.hasRemaining) this - else { - buffer.clear() - val copied = remainingData.copyToBuffer(buffer) - buffer.flip() - copy(remainingData = remainingData.drop(copied)) - } + + def release(): Unit = bufferPool.release(buffer) } - private[io] case class PendingWriteFile( - commander: ActorRef, - write: WriteFile, + def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event, + tail: WriteCommand): PendingWriteFile = + new PendingWriteFile(commander, new FileInputStream(filePath).getChannel, offset, count, ack, tail) + + class PendingWriteFile( + val commander: ActorRef, fileChannel: FileChannel, - alreadyWritten: Long) extends PendingWrite { + offset: Long, + remaining: Long, + ack: Event, + tail: WriteCommand) extends PendingWrite with Runnable { def doWrite(info: ConnectionInfo): PendingWrite = { - tcp.fileIoDispatcher.execute(writeFileRunnable(this)) + tcp.fileIoDispatcher.execute(this) this } - def ack: Any = write.ack + def release(): Unit = fileChannel.close() - /** Release any open resources */ - def release() { fileChannel.close() } + def run(): Unit = + try { + val toWrite = math.min(remaining, tcp.Settings.TransferToLimit) + val written = fileChannel.transferTo(offset, toWrite, channel) - def updatedWrite(nowWritten: Long): PendingWriteFile = { - require(nowWritten < write.count) - copy(alreadyWritten = nowWritten) - } + if (written < remaining) { + val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail) + self ! UpdatePendingWrite(updated) - def remainingBytes = write.count - alreadyWritten - def currentPosition = write.position + alreadyWritten - } - - private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable = - new Runnable { - def run(): Unit = try { - import pendingWrite._ - val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit) - val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel) - - if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes)) - else { // finished - if (wantsAck) commander ! write.ack - self ! WriteFileFinished - - pendingWrite.release() + } else { + if (!ack.isInstanceOf[NoAck]) commander ! ack + release() + self ! UpdatePendingWrite(PendingWrite(commander, tail)) } } catch { case e: IOException ⇒ self ! WriteFileFailed(e) } - } + } } /** @@ -436,22 +432,18 @@ private[io] object TcpConnection { // INTERNAL MESSAGES - /** Informs actor that no writing was possible but there is still work remaining */ - case class SendBufferFull(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded - /** Informs actor that a pending file write has finished */ - case object WriteFileFinished - /** Informs actor that a pending WriteFile failed */ + case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded case class WriteFileFailed(e: IOException) - /** Abstraction over pending writes */ - trait PendingWrite { + sealed abstract class PendingWrite { def commander: ActorRef - def ack: Any - - def wantsAck = !ack.isInstanceOf[NoAck] def doWrite(info: ConnectionInfo): PendingWrite + def release(): Unit // free any occupied resources + } - /** Release any open resources */ - def release(): Unit + object EmptyPendingWrite extends PendingWrite { + def commander: ActorRef = throw new IllegalStateException + def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException + def release(): Unit = throw new IllegalStateException } } diff --git a/akka-docs/rst/java/io-tcp.rst b/akka-docs/rst/java/io-tcp.rst index 73164b21de..bc271c777a 100644 --- a/akka-docs/rst/java/io-tcp.rst +++ b/akka-docs/rst/java/io-tcp.rst @@ -125,6 +125,43 @@ it receives one of the above close commands. All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events may handle all close events in the same way. +Writing to a connection +----------------------- + +Once a connection has been established data can be sent to it from any actor in the form of a ``Tcp.WriteCommand``. +``Tcp.WriteCommand`` is an abstract class with three concrete implementations: + +Tcp.Write + The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event. + A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable + in-memory data with a maximum (total) size of 2 GB (2^31 bytes). + +Tcp.WriteFile + If you want to send "raw" data from a file you can do so efficiently with the ``Tcp.WriteFile`` command. + This allows you do designate a (contiguous) chunk of on-disk bytes for sending across the connection without + the need to first load them into the JVM memory. As such ``Tcp.WriteFile`` can "hold" more than 2GB of data and + an "ack" event if required. + +Tcp.CompoundWrite + Sometimes you might want to group (or interleave) several ``Tcp.Write`` and/or ``Tcp.WriteFile`` commands into + one atomic write command which gets written to the connection in one go. The ``Tcp.CompoundWrite`` allows you + to do just that and offers three benefits: + + 1. As explained in the following section the TCP connection actor can only handle one single write command at a time. + By combining several writes into one ``CompoundWrite`` you can have them be sent across the connection with + minimum overhead and without the need to spoon feed them to the connection actor via an *ACK-based* message + protocol. + + 2. Because a ``WriteCommand`` is atomic you can be sure that no other actor can "inject" other writes into your + series of writes if you combine them into one single ``CompoundWrite``. In scenarios where several actors write + to the same connection this can be an important feature which can be somewhat hard to achieve otherwise. + + 3. The "sub writes" of a ``CompoundWrite`` are regular ``Write`` or ``WriteFile`` commands that themselves can request + "ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to + attach more than one ACK to a ``Write`` or ``WriteFile`` (by combining it with an empty write that itself requests + an ACK) or to have the connection actor acknowledge the progress of transmitting the ``CompoundWrite`` by sending + out intermediate ACKs at arbitrary points. + Throttling Reads and Writes --------------------------- diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index a6c4027a16..1cbb9fbe9d 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -85,6 +85,8 @@ nacked messages it may need to keep a buffer of pending messages. the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged. +.. _ByteString: + ByteString ^^^^^^^^^^ diff --git a/akka-docs/rst/scala/io-tcp.rst b/akka-docs/rst/scala/io-tcp.rst index c60b360f0d..c898dfb129 100644 --- a/akka-docs/rst/scala/io-tcp.rst +++ b/akka-docs/rst/scala/io-tcp.rst @@ -126,6 +126,43 @@ it receives one of the above close commands. All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events may handle all close events in the same way. +Writing to a connection +----------------------- + +Once a connection has been established data can be sent to it from any actor in the form of a ``Tcp.WriteCommand``. +``Tcp.WriteCommand`` is an abstract class with three concrete implementations: + +Tcp.Write + The simplest ``WriteCommand`` implementation which wraps a ``ByteString`` instance and an "ack" event. + A ``ByteString`` (as explained in :ref:`this section `) models one or more chunks of immutable + in-memory data with a maximum (total) size of 2 GB (2^31 bytes). + +Tcp.WriteFile + If you want to send "raw" data from a file you can do so efficiently with the ``Tcp.WriteFile`` command. + This allows you do designate a (contiguous) chunk of on-disk bytes for sending across the connection without + the need to first load them into the JVM memory. As such ``Tcp.WriteFile`` can "hold" more than 2GB of data and + an "ack" event if required. + +Tcp.CompoundWrite + Sometimes you might want to group (or interleave) several ``Tcp.Write`` and/or ``Tcp.WriteFile`` commands into + one atomic write command which gets written to the connection in one go. The ``Tcp.CompoundWrite`` allows you + to do just that and offers three benefits: + + 1. As explained in the following section the TCP connection actor can only handle one single write command at a time. + By combining several writes into one ``CompoundWrite`` you can have them be sent across the connection with + minimum overhead and without the need to spoon feed them to the connection actor via an *ACK-based* message + protocol. + + 2. Because a ``WriteCommand`` is atomic you can be sure that no other actor can "inject" other writes into your + series of writes if you combine them into one single ``CompoundWrite``. In scenarios where several actors write + to the same connection this can be an important feature which can be somewhat hard to achieve otherwise. + + 3. The "sub writes" of a ``CompoundWrite`` are regular ``Write`` or ``WriteFile`` commands that themselves can request + "ack" events. These ACKs are sent out as soon as the respective "sub write" has been completed. This allows you to + attach more than one ACK to a ``Write`` or ``WriteFile`` (by combining it with an empty write that itself requests + an ACK) or to have the connection actor acknowledge the progress of transmitting the ``CompoundWrite`` by sending + out intermediate ACKs at arbitrary points. + Throttling Reads and Writes --------------------------- diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index ecb944c83a..88e1e64d10 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -90,6 +90,8 @@ nacked messages it may need to keep a buffer of pending messages. the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control not error handling. In other words, data may still be lost, even if every write is acknowledged. +.. _ByteString: + ByteString ^^^^^^^^^^