pekko/akka-actor/src/main/scala/akka/io/TcpConnection.scala

453 lines
16 KiB
Scala
Raw Normal View History

2013-01-15 18:08:45 +01:00
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import java.net.InetSocketAddress
import java.io.{ FileInputStream, IOException }
import java.nio.channels.{ FileChannel, SocketChannel }
import java.nio.ByteBuffer
import scala.annotation.tailrec
import scala.collection.immutable
2013-01-15 18:08:45 +01:00
import scala.util.control.NonFatal
import scala.concurrent.duration._
import akka.actor._
import akka.util.ByteString
import akka.io.Inet.SocketOption
import akka.io.Tcp._
import akka.io.SelectionHandler._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
2013-01-15 18:08:45 +01:00
/**
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
*
* INTERNAL API
2013-01-15 18:08:45 +01:00
*/
private[io] abstract class TcpConnection(
val channel: SocketChannel,
val tcp: TcpExt)
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import tcp.Settings._
2013-02-04 16:24:34 +01:00
import tcp.bufferPool
import TcpConnection._
var pendingWrite: PendingWrite = null
// Needed to send the ConnectionClosed message in the postStop handler.
var closedMessage: CloseInformation = null
private[this] var peerClosed = false
private[this] var keepOpenOnPeerClosed = false
def writePending = pendingWrite ne null
2013-01-15 18:08:45 +01:00
def selector = context.parent
2013-01-15 18:08:45 +01:00
// STATES
/** connection established, waiting for registration from user handler */
def waitingForRegistration(commander: ActorRef): Receive = {
case Register(handler, keepOpenOnPeerClosed, useResumeWriting)
// up to this point we've been watching the commander,
// but since registration is now complete we only need to watch the handler from here on
if (handler != commander) {
context.unwatch(commander)
context.watch(handler)
}
2013-02-10 13:52:52 +01:00
if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
this.keepOpenOnPeerClosed = keepOpenOnPeerClosed
this.useResumeWriting = useResumeWriting
doRead(handler, None) // immediately try reading
2013-01-15 18:08:45 +01:00
context.setReceiveTimeout(Duration.Undefined)
context.become(connected(handler))
case cmd: CloseCommand
2013-04-02 16:39:21 +02:00
handleClose(commander, Some(sender), cmd.event)
2013-01-15 18:08:45 +01:00
case ReceiveTimeout
// after sending `Register` user should watch this actor to make sure
// it didn't die because of the timeout
2013-02-10 13:52:52 +01:00
log.warning("Configured registration timeout of [{}] expired, stopping", RegisterTimeout)
2013-01-15 18:08:45 +01:00
context.stop(self)
}
/** normal connected state */
def connected(handler: ActorRef): Receive = handleWriteMessages(handler) orElse {
case SuspendReading selector ! DisableReadInterest
case ResumeReading selector ! ReadInterest
case ChannelReadable doRead(handler, None)
case cmd: CloseCommand handleClose(handler, Some(sender), cmd.event)
}
2013-01-15 18:08:45 +01:00
/** the peer sent EOF first, but we may still want to send */
def peerSentEOF(handler: ActorRef): Receive = handleWriteMessages(handler) orElse {
2013-04-02 16:39:21 +02:00
case cmd: CloseCommand handleClose(handler, Some(sender), cmd.event)
2013-01-15 18:08:45 +01:00
}
/** connection is closing but a write has to be finished first */
def closingWithPendingWrite(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Receive = {
case SuspendReading selector ! DisableReadInterest
2013-01-15 18:08:45 +01:00
case ResumeReading selector ! ReadInterest
case ChannelReadable doRead(handler, closeCommander)
2013-01-15 18:08:45 +01:00
case ChannelWritable
doWrite(handler)
2013-01-15 18:08:45 +01:00
if (!writePending) // writing is now finished
handleClose(handler, closeCommander, closedEvent)
case SendBufferFull(remaining) { pendingWrite = remaining; selector ! WriteInterest }
case WriteFileFinished { pendingWrite = null; handleClose(handler, closeCommander, closedEvent) }
case WriteFileFailed(e) handleError(handler, e) // rethrow exception from dispatcher task
2013-01-15 18:08:45 +01:00
case Abort handleClose(handler, Some(sender), Aborted)
2013-01-15 18:08:45 +01:00
}
/** connection is closed on our side and we're waiting from confirmation from the other side */
def closing(handler: ActorRef, closeCommander: Option[ActorRef]): Receive = {
case SuspendReading selector ! DisableReadInterest
2013-01-15 18:08:45 +01:00
case ResumeReading selector ! ReadInterest
case ChannelReadable doRead(handler, closeCommander)
case Abort handleClose(handler, Some(sender), Aborted)
2013-01-15 18:08:45 +01:00
}
private[this] var useResumeWriting = false
private[this] var writingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None
def handleWriteMessages(handler: ActorRef): Receive = {
case ChannelWritable
if (writePending) {
doWrite(handler)
if (!writePending && interestedInResume.nonEmpty) {
interestedInResume.get ! WritingResumed
interestedInResume = None
}
}
case write: WriteCommand
if (writingSuspended) {
if (TraceLogging) log.debug("Dropping write because writing is suspended")
sender ! write.failureMessage
} else if (writePending) {
if (TraceLogging) log.debug("Dropping write because queue is full")
sender ! write.failureMessage
if (useResumeWriting) writingSuspended = true
} else write match {
case Write(data, ack) if data.isEmpty
if (ack != NoAck) sender ! ack
case _
pendingWrite = createWrite(write)
doWrite(handler)
}
case ResumeWriting
/*
* If more than one actor sends Writes then the first to send this
* message might resume too early for the second, leading to a Write of
* the second to go through although it has not been resumed yet; there
* is nothing we can do about this apart from all actors needing to
* register themselves and us keeping track of them, which sounds bad.
*
* Thus it is documented that useResumeWriting is incompatible with
* multiple writers. But we fail as gracefully as we can.
*/
writingSuspended = false
if (writePending) {
if (interestedInResume.isEmpty) interestedInResume = Some(sender)
else sender ! CommandFailed(ResumeWriting)
} else sender ! WritingResumed
case SendBufferFull(remaining) { pendingWrite = remaining; selector ! WriteInterest }
case WriteFileFinished pendingWrite = null
case WriteFileFailed(e) handleError(handler, e) // rethrow exception from dispatcher task
}
2013-01-15 18:08:45 +01:00
// AUXILIARIES and IMPLEMENTATION
/** used in subclasses to start the common machinery above once a channel is connected */
def completeConnect(commander: ActorRef, options: immutable.Traversable[SocketOption]): Unit = {
2013-03-05 14:22:21 +01:00
// Turn off Nagle's algorithm by default
channel.socket.setTcpNoDelay(true)
2013-01-15 18:08:45 +01:00
options.foreach(_.afterConnect(channel.socket))
commander ! Connected(
channel.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress],
channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])
context.setReceiveTimeout(RegisterTimeout)
2013-01-15 18:08:45 +01:00
context.become(waitingForRegistration(commander))
}
def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) {
// never read more than the configured limit
buffer.clear()
val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
buffer.limit(maxBufferSpace)
val readBytes = channel.read(buffer)
buffer.flip()
2013-01-15 18:08:45 +01:00
if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) handler ! Received(ByteString(buffer))
2013-01-15 18:08:45 +01:00
readBytes match {
case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 AllRead
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting
2013-02-04 16:24:34 +01:00
val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead selector ! ReadInterest
case MoreDataWaiting self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(handler, closeCommander, ConfirmedClosed)
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream, our side not yet closed")
handleClose(handler, closeCommander, PeerClosed)
2013-01-15 18:08:45 +01:00
} catch {
case e: IOException handleError(handler, e)
2013-02-04 16:24:34 +01:00
} finally bufferPool.release(buffer)
2013-01-15 18:08:45 +01:00
}
def doWrite(handler: ActorRef): Unit =
pendingWrite = pendingWrite.doWrite(handler)
2013-01-15 18:08:45 +01:00
def closeReason =
if (channel.socket.isOutputShutdown) ConfirmedClosed
else PeerClosed
2013-04-02 15:22:11 +02:00
def handleClose(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = closedEvent match {
case Aborted
if (TraceLogging) log.debug("Got Abort command. RESETing connection.")
doCloseConnection(handler, closeCommander, closedEvent)
case PeerClosed if keepOpenOnPeerClosed
// report that peer closed the connection
handler ! PeerClosed
// used to check if peer already closed its side later
peerClosed = true
context.become(peerSentEOF(handler))
2013-04-02 15:22:11 +02:00
case _ if writePending // finish writing first
if (TraceLogging) log.debug("Got Close command but write is still pending.")
context.become(closingWithPendingWrite(handler, closeCommander, closedEvent))
2013-04-02 15:22:11 +02:00
case ConfirmedClosed // shutdown output and wait for confirmation
if (TraceLogging) log.debug("Got ConfirmedClose command, sending FIN.")
2013-01-15 18:08:45 +01:00
channel.socket.shutdownOutput()
if (peerClosed) // if peer closed first, the socket is now fully closed
doCloseConnection(handler, closeCommander, closedEvent)
else context.become(closing(handler, closeCommander))
2013-04-02 15:22:11 +02:00
case _ // close now
if (TraceLogging) log.debug("Got Close command, closing connection.")
doCloseConnection(handler, closeCommander, closedEvent)
2013-04-02 15:22:11 +02:00
}
2013-01-15 18:08:45 +01:00
def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = {
2013-01-15 18:08:45 +01:00
if (closedEvent == Aborted) abort()
else channel.close()
closedMessage = CloseInformation(Set(handler) ++ closeCommander, closedEvent)
2013-01-15 18:08:45 +01:00
context.stop(self)
}
def handleError(handler: ActorRef, exception: IOException): Nothing = {
closedMessage = CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))
2013-01-15 18:08:45 +01:00
throw exception
}
@tailrec private[this] def extractMsg(t: Throwable): String =
if (t == null) "unknown"
else {
t.getMessage match {
case null | "" extractMsg(t.getCause)
case msg msg
}
}
2013-01-15 18:08:45 +01:00
def abort(): Unit = {
try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST
catch {
case NonFatal(e)
// setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574
// (also affected: OS/X Java 1.6.0_37)
2013-02-10 13:52:52 +01:00
if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e)
2013-01-15 18:08:45 +01:00
}
channel.close()
}
override def postStop(): Unit = {
if (channel.isOpen)
abort()
if (writePending) pendingWrite.release()
if (closedMessage != null) {
val interestedInClose =
if (writePending) closedMessage.notificationsTo + pendingWrite.commander
else closedMessage.notificationsTo
interestedInClose.foreach(_ ! closedMessage.closedEvent)
}
}
override def postRestart(reason: Throwable): Unit =
throw new IllegalStateException("Restarting not supported for connection actors.")
2013-01-15 18:08:45 +01:00
/** Create a pending write from a WriteCommand */
private[io] def createWrite(write: WriteCommand): PendingWrite = write match {
case write: Write
val buffer = bufferPool.acquire()
try {
val copied = write.data.copyToBuffer(buffer)
buffer.flip()
PendingBufferWrite(sender, write.ack, write.data.drop(copied), buffer)
} catch {
case NonFatal(e)
bufferPool.release(buffer)
throw e
}
case write: WriteFile
PendingWriteFile(sender, write, new FileInputStream(write.filePath).getChannel, 0L)
}
private[io] case class PendingBufferWrite(
commander: ActorRef,
ack: Any,
remainingData: ByteString,
buffer: ByteBuffer) extends PendingWrite {
def release(): Unit = bufferPool.release(buffer)
def doWrite(handler: ActorRef): PendingWrite = {
@tailrec def innerWrite(pendingWrite: PendingBufferWrite): PendingWrite = {
val toWrite = pendingWrite.buffer.remaining()
require(toWrite != 0)
val writtenBytes = channel.write(pendingWrite.buffer)
if (TraceLogging) log.debug("Wrote [{}] bytes to channel", writtenBytes)
val nextWrite = pendingWrite.consume(writtenBytes)
if (pendingWrite.hasData)
if (writtenBytes == toWrite) innerWrite(nextWrite) // wrote complete buffer, try again now
else {
selector ! WriteInterest
nextWrite
} // try again later
else { // everything written
if (pendingWrite.wantsAck)
pendingWrite.commander ! pendingWrite.ack
pendingWrite.release()
null
}
}
try innerWrite(this)
catch { case e: IOException handleError(handler, e) }
}
def hasData = buffer.hasRemaining || remainingData.nonEmpty
def consume(writtenBytes: Int): PendingBufferWrite =
if (buffer.hasRemaining) this
else {
buffer.clear()
val copied = remainingData.copyToBuffer(buffer)
buffer.flip()
copy(remainingData = remainingData.drop(copied))
}
}
private[io] case class PendingWriteFile(
commander: ActorRef,
write: WriteFile,
fileChannel: FileChannel,
alreadyWritten: Long) extends PendingWrite {
def doWrite(handler: ActorRef): PendingWrite = {
tcp.fileIoDispatcher.execute(writeFileRunnable(this))
this
}
def ack: Any = write.ack
/** Release any open resources */
def release() { fileChannel.close() }
def updatedWrite(nowWritten: Long): PendingWriteFile = {
require(nowWritten < write.count)
copy(alreadyWritten = nowWritten)
}
def remainingBytes = write.count - alreadyWritten
def currentPosition = write.position + alreadyWritten
}
private[io] def writeFileRunnable(pendingWrite: PendingWriteFile): Runnable =
new Runnable {
def run(): Unit = try {
import pendingWrite._
val toWrite = math.min(remainingBytes, tcp.Settings.TransferToLimit)
val writtenBytes = fileChannel.transferTo(currentPosition, toWrite, channel)
if (writtenBytes < remainingBytes) self ! SendBufferFull(pendingWrite.updatedWrite(alreadyWritten + writtenBytes))
else { // finished
if (wantsAck) commander ! write.ack
self ! WriteFileFinished
pendingWrite.release()
}
} catch {
case e: IOException self ! WriteFileFailed(e)
}
}
}
/**
* INTERNAL API
*/
private[io] object TcpConnection {
sealed trait ReadResult
object EndOfStream extends ReadResult
object AllRead extends ReadResult
object MoreDataWaiting extends ReadResult
/**
* Used to transport information to the postStop method to notify
* interested party about a connection close.
*/
case class CloseInformation(
notificationsTo: Set[ActorRef],
closedEvent: Event)
// INTERNAL MESSAGES
/** Informs actor that no writing was possible but there is still work remaining */
case class SendBufferFull(remainingWrite: PendingWrite)
/** Informs actor that a pending file write has finished */
case object WriteFileFinished
/** Informs actor that a pending WriteFile failed */
case class WriteFileFailed(e: IOException)
/** Abstraction over pending writes */
trait PendingWrite {
def commander: ActorRef
def ack: Any
def wantsAck = !ack.isInstanceOf[NoAck]
def doWrite(handler: ActorRef): PendingWrite
/** Release any open resources */
def release(): Unit
}
}