Tcp: abstract writing into WriteCommand to make place for other kinds of writing

This commit is contained in:
Johannes Rudolph 2013-04-09 12:25:43 +02:00
parent 7d152c9ec2
commit 54d91c180a
2 changed files with 16 additions and 12 deletions

View file

@ -92,15 +92,18 @@ object Tcp extends ExtensionKey[TcpExt] {
case class NoAck(token: Any)
object NoAck extends NoAck(null)
sealed trait WriteCommand extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def ack: Any
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
/**
* Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object.
*/
case class Write(data: ByteString, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
case class Write(data: ByteString, ack: Any) extends WriteCommand
object Write {
val Empty: Write = Write(ByteString.empty, NoAck)
def apply(data: ByteString): Write =

View file

@ -103,7 +103,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
def handleWriteMessages(handler: ActorRef): Receive = {
case ChannelWritable if (writePending) doWrite(handler)
case write: Write if writePending
case write: WriteCommand if writePending
if (TraceLogging) log.debug("Dropping write because queue is full")
sender ! write.failureMessage
@ -111,7 +111,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
if (write.wantsAck)
sender ! write.ack
case write: Write
case write: WriteCommand
pendingWrite = createWrite(write)
doWrite(handler)
}
@ -297,12 +297,13 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
copy(remainingData = remainingData.drop(copied))
} else this
}
def createWrite(write: Write): PendingWrite = {
val buffer = bufferPool.acquire()
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
def createWrite(write: WriteCommand): PendingWrite = write match {
case write: Write
val buffer = bufferPool.acquire()
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
}
}