diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 6acafaeba8..c913007edb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -258,8 +258,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") writer.expectMsg(CommandFailed(secondWrite)) // reject even empty writes - writer.send(connectionActor, Write.Empty) - writer.expectMsg(CommandFailed(Write.Empty)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) // there will be immediately more space in the send buffer because // some data will have been sent by now, so we assume we can write diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 17995acb94..ab11280f04 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -105,16 +105,30 @@ object Tcp extends ExtensionKey[TcpExt] { */ case class Write(data: ByteString, ack: Any) extends WriteCommand object Write { - val Empty: Write = Write(ByteString.empty, NoAck) + /** + * The empty Write doesn't write anything and isn't acknowledged. + * It will, however, be denied and sent back with `CommandFailed` if the + * connection isn't currently ready to send any data (because another WriteCommand + * is still pending). + */ + val empty: Write = Write(ByteString.empty, NoAck) + + /** + * Create a new unacknowledged Write command with the given data. + */ def apply(data: ByteString): Write = - if (data.isEmpty) Empty else Write(data, NoAck) + if (data.isEmpty) empty else Write(data, NoAck) } /** * Write `count` bytes starting at `position` from file at `filePath` to the connection. - * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. + * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The + * count must be > 0. */ - case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand + case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand { + require(position >= 0, "WriteFile.position must be >= 0") + require(count > 0, "WriteFile.count must be > 0") + } case object StopReading extends Command case object ResumeReading extends Command diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index d308581bd7..a93c3e35b3 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -305,20 +305,23 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, try innerWrite(this) catch { case e: IOException ⇒ handleError(handler, e) } } - def hasData = buffer.remaining() > 0 || remainingData.size > 0 + def hasData = buffer.hasRemaining || remainingData.nonEmpty def consume(writtenBytes: Int): PendingBufferWrite = - if (buffer.remaining() == 0) { + if (buffer.hasRemaining) this + else { buffer.clear() val copied = remainingData.copyToBuffer(buffer) buffer.flip() copy(remainingData = remainingData.drop(copied)) - } else this + } } + private[io] case class PendingWriteFile( commander: ActorRef, write: WriteFile, fileChannel: FileChannel, alreadyWritten: Long) extends PendingWrite { + def doWrite(handler: ActorRef): PendingWrite = { tcp.fileIoDispatcher.execute(writeFileRunnable(this)) this