From 54d91c180ae9d4a5ed264e5cbcb24bdc17257df0 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 9 Apr 2013 12:25:43 +0200 Subject: [PATCH] Tcp: abstract writing into WriteCommand to make place for other kinds of writing --- akka-actor/src/main/scala/akka/io/Tcp.scala | 13 ++++++++----- .../src/main/scala/akka/io/TcpConnection.scala | 15 ++++++++------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index b1a9bb62f2..490619f67f 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -92,15 +92,18 @@ object Tcp extends ExtensionKey[TcpExt] { case class NoAck(token: Any) object NoAck extends NoAck(null) + sealed trait WriteCommand extends Command { + require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") + + def ack: Any + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] + } + /** * Write data to the TCP connection. If no ack is needed use the special * `NoAck` object. */ - case class Write(data: ByteString, ack: Any) extends Command { - require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - - def wantsAck: Boolean = !ack.isInstanceOf[NoAck] - } + case class Write(data: ByteString, ack: Any) extends WriteCommand object Write { val Empty: Write = Write(ByteString.empty, NoAck) def apply(data: ByteString): Write = diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 98c2c0cf1f..d64cd71fec 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -103,7 +103,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, def handleWriteMessages(handler: ActorRef): Receive = { case ChannelWritable ⇒ if (writePending) doWrite(handler) - case write: Write if writePending ⇒ + case write: WriteCommand if writePending ⇒ if (TraceLogging) log.debug("Dropping write because queue is full") sender ! write.failureMessage @@ -111,7 +111,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, if (write.wantsAck) sender ! write.ack - case write: Write ⇒ + case write: WriteCommand ⇒ pendingWrite = createWrite(write) doWrite(handler) } @@ -297,12 +297,13 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, copy(remainingData = remainingData.drop(copied)) } else this } - def createWrite(write: Write): PendingWrite = { - val buffer = bufferPool.acquire() - val copied = write.data.copyToBuffer(buffer) - buffer.flip() + def createWrite(write: WriteCommand): PendingWrite = write match { + case write: Write ⇒ + val buffer = bufferPool.acquire() + val copied = write.data.copyToBuffer(buffer) + buffer.flip() - PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer) + PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer) } }