!act #3581 Add Tcp.CompoundWrite, some cleanup

Moves `def ack: Event` and `def wantsAck: Boolean` from the `Tcp.WriteCommand` type down to the newly introduced `Tcp.CompactWriteCommand`, which breaks existing code depending on these.
Additionally `Tcp.WriteCommand` now has a few additional methods (`+:`, `++:`, prepend) and a companion object.
This commit is contained in:
Mathias 2013-09-05 16:40:06 +02:00
parent c55189f615
commit 85c771d731
3 changed files with 208 additions and 120 deletions

View file

@ -32,13 +32,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
import tcp.bufferPool
import TcpConnection._
private[this] var pendingWrite: PendingWrite = _
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
def writePending = pendingWrite ne null
def writePending = pendingWrite ne EmptyPendingWrite
// STATES
@ -95,11 +95,15 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
doWrite(info)
if (!writePending) // writing is now finished
handleClose(info, closeCommander, closedEvent)
case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
case WriteFileFinished { pendingWrite = null; handleClose(info, closeCommander, closedEvent) }
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
case Abort handleClose(info, Some(sender), Aborted)
case UpdatePendingWrite(remaining)
pendingWrite = remaining
if (writePending) info.registration.enableInterest(OP_WRITE)
else handleClose(info, closeCommander, closedEvent)
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
case Abort handleClose(info, Some(sender), Aborted)
}
/** connection is closed on our side and we're waiting from confirmation from the other side */
@ -130,13 +134,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
sender ! write.failureMessage
if (info.useResumeWriting) writingSuspended = true
} else write match {
case Write(data, ack) if data.isEmpty
if (write.wantsAck) sender ! ack
case _
pendingWrite = createWrite(write)
doWrite(info)
} else {
pendingWrite = PendingWrite(sender, write)
if (writePending) doWrite(info)
}
case ResumeWriting
@ -156,9 +156,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
else sender ! CommandFailed(ResumeWriting)
} else sender ! WritingResumed
case SendBufferFull(remaining) { pendingWrite = remaining; info.registration.enableInterest(OP_WRITE) }
case WriteFileFinished pendingWrite = null
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
case UpdatePendingWrite(remaining)
pendingWrite = remaining
if (writePending) info.registration.enableInterest(OP_WRITE)
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
}
// AUXILIARIES and IMPLEMENTATION
@ -301,114 +303,108 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
/** Create a pending write from a WriteCommand */
private[io] def createWrite(write: WriteCommand): PendingWrite = write match {
case write: Write
val buffer = bufferPool.acquire()
try {
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
} catch {
case NonFatal(e)
bufferPool.release(buffer)
throw e
def PendingWrite(commander: ActorRef, write: WriteCommand): PendingWrite = {
@tailrec def create(head: WriteCommand, tail: WriteCommand = Write.empty): PendingWrite =
head match {
case Write.empty if (tail eq Write.empty) EmptyPendingWrite else create(tail)
case Write(data, ack) if data.nonEmpty PendingBufferWrite(commander, data, ack, tail)
case WriteFile(path, offset, count, ack) PendingWriteFile(commander, path, offset, count, ack, tail)
case CompoundWrite(h, t) create(h, t)
case x @ Write(_, ack) // empty write with either an ACK or a non-standard NoACK
if (x.wantsAck) commander ! ack
create(tail)
}
case write: WriteFile
PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L)
create(write)
}
private[io] case class PendingBufferWrite(
commander: ActorRef,
ack: Any,
remainingData: ByteString,
buffer: ByteBuffer) extends PendingWrite {
def PendingBufferWrite(commander: ActorRef, data: ByteString, ack: Event, tail: WriteCommand): PendingBufferWrite = {
val buffer = bufferPool.acquire()
try {
val copied = data.copyToBuffer(buffer)
buffer.flip()
new PendingBufferWrite(commander, data.drop(copied), ack, buffer, tail)
} catch {
case NonFatal(e)
bufferPool.release(buffer)
throw e
}
}
def release(): Unit = bufferPool.release(buffer)
class PendingBufferWrite(
val commander: ActorRef,
remainingData: ByteString,
ack: Any,
buffer: ByteBuffer,
tail: WriteCommand) extends PendingWrite {
def doWrite(info: ConnectionInfo): PendingWrite = {
@tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = {
val toWrite = pendingWrite.buffer.remaining()
require(toWrite != 0)
val writtenBytes = channel.write(pendingWrite.buffer)
@tailrec def writeToChannel(data: ByteString): PendingWrite = {
val writtenBytes = channel.write(buffer) // at first we try to drain the remaining bytes from the buffer
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
if (buffer.hasRemaining) {
// we weren't able to write all bytes from the buffer, so we need to try again later
if (data eq remainingData) this
else new PendingBufferWrite(commander, data, ack, buffer, tail) // copy with updated remainingData
val nextWrite = pendingWrite.consume(writtenBytes)
} else if (data.nonEmpty) {
buffer.clear()
val copied = remainingData.copyToBuffer(buffer)
buffer.flip()
writeToChannel(remainingData drop copied)
if (pendingWrite.hasData)
if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now
else {
info.registration.enableInterest(OP_WRITE)
nextWrite
} // try again later
else { // everything written
if (pendingWrite.wantsAck)
pendingWrite.commander ! pendingWrite.ack
pendingWrite.release()
null
} else {
if (!ack.isInstanceOf[NoAck]) commander ! ack
release()
PendingWrite(commander, tail)
}
}
try innerWrite(this)
catch { case e: IOException handleError(info.handler, e); this }
try {
val next = writeToChannel(remainingData)
if (next ne EmptyPendingWrite) info.registration.enableInterest(OP_WRITE)
next
} catch { case e: IOException handleError(info.handler, e); this }
}
def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite =
if (buffer.hasRemaining) this
else {
buffer.clear()
val copied = remainingData.copyToBuffer(buffer)
buffer.flip()
copy(remainingData = remainingData.drop(copied))
}
def release(): Unit = bufferPool.release(buffer)
}
private[io] case class PendingWriteFile(
commander: ActorRef,
write: WriteFile,
def PendingWriteFile(commander: ActorRef, filePath: String, offset: Long, count: Long, ack: Event,
tail: WriteCommand): PendingWriteFile =
new PendingWriteFile(commander, new FileInputStream(filePath).getChannel, offset, count, ack, tail)
class PendingWriteFile(
val commander: ActorRef,
fileChannel: FileChannel,
alreadyWritten: Long) extends PendingWrite {
offset: Long,
remaining: Long,
ack: Event,
tail: WriteCommand) extends PendingWrite with Runnable {
def doWrite(info: ConnectionInfo): PendingWrite = {
tcp.fileIoDispatcher.execute(writeFileRunnable(this))
tcp.fileIoDispatcher.execute(this)
this
}
def ack: Any = write.ack
def release(): Unit = fileChannel.close()
/** Release any open resources */
def release() { fileChannel.close() }
def run(): Unit =
try {
val toWrite = math.min(remaining, tcp.Settings.TransferToLimit)
val written = fileChannel.transferTo(offset, toWrite, channel)
def updatedWrite(nowWritten: Long): PendingWriteFile = {
require(nowWritten < write.count)
copy(alreadyWritten = nowWritten)
}
if (written < remaining) {
val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
self ! UpdatePendingWrite(updated)
def remainingBytes = write.count - alreadyWritten
def currentPosition = write.position + alreadyWritten
}
private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable =
new Runnable {
def run(): Unit = try {
import pendingWrite._
val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit)
val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel)
if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes))
else { // finished
if (wantsAck) commander ! write.ack
self ! WriteFileFinished
pendingWrite.release()
} else {
if (!ack.isInstanceOf[NoAck]) commander ! ack
release()
self ! UpdatePendingWrite(PendingWrite(commander, tail))
}
} catch {
case e: IOException self ! WriteFileFailed(e)
}
}
}
}
/**
@ -436,22 +432,18 @@ private[io] object TcpConnection {
// INTERNAL MESSAGES
/** Informs actor that no writing was possible but there is still work remaining */
case class SendBufferFull(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
/** Informs actor that a pending file write has finished */
case object WriteFileFinished
/** Informs actor that a pending WriteFile failed */
case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
case class WriteFileFailed(e: IOException)
/** Abstraction over pending writes */
trait PendingWrite {
sealed abstract class PendingWrite {
def commander: ActorRef
def ack: Any
def wantsAck = !ack.isInstanceOf[NoAck]
def doWrite(info: ConnectionInfo): PendingWrite
def release(): Unit // free any occupied resources
}
/** Release any open resources */
def release(): Unit
object EmptyPendingWrite extends PendingWrite {
def commander: ActorRef = throw new IllegalStateException
def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException
def release(): Unit = throw new IllegalStateException
}
}