Tcp: cosmetic changes and more checks

* whitespace
 * Write.Empty => Write.empty
 * check requirements
 * better 0-checks
This commit is contained in:
Johannes Rudolph 2013-04-10 14:56:51 +02:00
parent 15545641a6
commit c5d90a1c57
3 changed files with 26 additions and 9 deletions

View file

@ -258,8 +258,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
writer.expectMsg(CommandFailed(secondWrite)) writer.expectMsg(CommandFailed(secondWrite))
// reject even empty writes // reject even empty writes
writer.send(connectionActor, Write.Empty) writer.send(connectionActor, Write.empty)
writer.expectMsg(CommandFailed(Write.Empty)) writer.expectMsg(CommandFailed(Write.empty))
// there will be immediately more space in the send buffer because // there will be immediately more space in the send buffer because
// some data will have been sent by now, so we assume we can write // some data will have been sent by now, so we assume we can write

View file

@ -105,16 +105,30 @@ object Tcp extends ExtensionKey[TcpExt] {
*/ */
case class Write(data: ByteString, ack: Any) extends WriteCommand case class Write(data: ByteString, ack: Any) extends WriteCommand
object Write { object Write {
val Empty: Write = Write(ByteString.empty, NoAck) /**
* The empty Write doesn't write anything and isn't acknowledged.
* It will, however, be denied and sent back with `CommandFailed` if the
* connection isn't currently ready to send any data (because another WriteCommand
* is still pending).
*/
val empty: Write = Write(ByteString.empty, NoAck)
/**
* Create a new unacknowledged Write command with the given data.
*/
def apply(data: ByteString): Write = def apply(data: ByteString): Write =
if (data.isEmpty) Empty else Write(data, NoAck) if (data.isEmpty) empty else Write(data, NoAck)
} }
/** /**
* Write `count` bytes starting at `position` from file at `filePath` to the connection. * Write `count` bytes starting at `position` from file at `filePath` to the connection.
* When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The
* count must be > 0.
*/ */
case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
case object StopReading extends Command case object StopReading extends Command
case object ResumeReading extends Command case object ResumeReading extends Command

View file

@ -305,20 +305,23 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
try innerWrite(this) try innerWrite(this)
catch { case e: IOException handleError(handler, e) } catch { case e: IOException handleError(handler, e) }
} }
def hasData = buffer.remaining() > 0 || remainingData.size > 0 def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite = def consume(writtenBytes: Int): PendingBufferWrite =
if (buffer.remaining() == 0) { if (buffer.hasRemaining) this
else {
buffer.clear() buffer.clear()
val copied = remainingData.copyToBuffer(buffer) val copied = remainingData.copyToBuffer(buffer)
buffer.flip() buffer.flip()
copy(remainingData = remainingData.drop(copied)) copy(remainingData = remainingData.drop(copied))
} else this }
} }
private[io] case class PendingWriteFile( private[io] case class PendingWriteFile(
commander: ActorRef, commander: ActorRef,
write: WriteFile, write: WriteFile,
fileChannel: FileChannel, fileChannel: FileChannel,
alreadyWritten: Long) extends PendingWrite { alreadyWritten: Long) extends PendingWrite {
def doWrite(handler: ActorRef): PendingWrite = { def doWrite(handler: ActorRef): PendingWrite = {
tcp.fileIoDispatcher.execute(writeFileRunnable(this)) tcp.fileIoDispatcher.execute(writeFileRunnable(this))
this this