=act #3680 fix SuspendReading being overruled by racy ChannelReadable

This commit is contained in:
Johannes Rudolph 2013-10-21 15:33:31 +02:00
parent d2644876e5
commit 3da7f4ee07
2 changed files with 66 additions and 42 deletions

View file

@ -333,12 +333,25 @@ class TcpConnectionSpec extends AkkaSpec("""
"respect StopReading and ResumeReading" in new EstablishedConnectionTest() { "respect StopReading and ResumeReading" in new EstablishedConnectionTest() {
run { run {
serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII")))
connectionHandler.send(connectionActor, SuspendReading) connectionHandler.send(connectionActor, SuspendReading)
// the selector interprets StopReading to deregister interest for reading // the selector interprets SuspendReading to deregister interest for reading
interestCallReceiver.expectMsg(-OP_READ) interestCallReceiver.expectMsg(-OP_READ)
// this simulates a race condition where ChannelReadable was already underway while
// processing SuspendReading
selector.send(connectionActor, ChannelReadable)
// this ChannelReadable should be properly ignored, even if data is already pending
interestCallReceiver.expectNoMsg(100.millis)
connectionHandler.expectNoMsg(100.millis)
connectionHandler.send(connectionActor, ResumeReading) connectionHandler.send(connectionActor, ResumeReading)
interestCallReceiver.expectMsg(OP_READ) interestCallReceiver.expectMsg(OP_READ)
// data should be received only after ResumeReading
expectReceivedString("testdata")
} }
} }

View file

@ -35,6 +35,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false private[this] var peerClosed = false
private[this] var writingSuspended = false private[this] var writingSuspended = false
private[this] var readingSuspended = false
private[this] var interestedInResume: Option[ActorRef] = None private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
@ -72,8 +73,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** normal connected state */ /** normal connected state */
def connected(info: ConnectionInfo): Receive = def connected(info: ConnectionInfo): Receive =
handleWriteMessages(info) orElse { handleWriteMessages(info) orElse {
case SuspendReading info.registration.disableInterest(OP_READ) case SuspendReading suspendReading(info)
case ResumeReading info.registration.enableInterest(OP_READ) case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, None) case ChannelReadable doRead(info, None)
case cmd: CloseCommand handleClose(info, Some(sender), cmd.event) case cmd: CloseCommand handleClose(info, Some(sender), cmd.event)
} }
@ -87,8 +88,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** connection is closing but a write has to be finished first */ /** connection is closing but a write has to be finished first */
def closingWithPendingWrite(info: ConnectionInfo, closeCommander: Option[ActorRef], def closingWithPendingWrite(info: ConnectionInfo, closeCommander: Option[ActorRef],
closedEvent: ConnectionClosed): Receive = { closedEvent: ConnectionClosed): Receive = {
case SuspendReading info.registration.disableInterest(OP_READ) case SuspendReading suspendReading(info)
case ResumeReading info.registration.enableInterest(OP_READ) case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, closeCommander) case ChannelReadable doRead(info, closeCommander)
case ChannelWritable case ChannelWritable
@ -108,8 +109,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** connection is closed on our side and we're waiting from confirmation from the other side */ /** connection is closed on our side and we're waiting from confirmation from the other side */
def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = { def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = {
case SuspendReading info.registration.disableInterest(OP_READ) case SuspendReading suspendReading(info)
case ResumeReading info.registration.enableInterest(OP_READ) case ResumeReading resumeReading(info)
case ChannelReadable doRead(info, closeCommander) case ChannelReadable doRead(info, closeCommander)
case Abort handleClose(info, Some(sender), Aborted) case Abort handleClose(info, Some(sender), Aborted)
} }
@ -180,42 +181,52 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
context.become(waitingForRegistration(registration, commander)) context.become(waitingForRegistration(registration, commander))
} }
def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = { def suspendReading(info: ConnectionInfo): Unit = {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult = readingSuspended = true
if (remainingLimit > 0) { info.registration.disableInterest(OP_READ)
// never read more than the configured limit
buffer.clear()
val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
buffer.limit(maxBufferSpace)
val readBytes = channel.read(buffer)
buffer.flip()
if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) info.handler ! Received(ByteString(buffer))
readBytes match {
case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 AllRead
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting
val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream, our side not yet closed")
handleClose(info, closeCommander, PeerClosed)
} catch {
case e: IOException handleError(info.handler, e)
} finally bufferPool.release(buffer)
} }
def resumeReading(info: ConnectionInfo): Unit = {
readingSuspended = false
info.registration.enableInterest(OP_READ)
}
def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit =
if (!readingSuspended) {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) {
// never read more than the configured limit
buffer.clear()
val maxBufferSpace = math.min(DirectBufferSize, remainingLimit)
buffer.limit(maxBufferSpace)
val readBytes = channel.read(buffer)
buffer.flip()
if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) info.handler ! Received(ByteString(buffer))
readBytes match {
case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 AllRead
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting
val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream, our side not yet closed")
handleClose(info, closeCommander, PeerClosed)
} catch {
case e: IOException handleError(info.handler, e)
} finally bufferPool.release(buffer)
}
def doWrite(info: ConnectionInfo): Unit = pendingWrite = pendingWrite.doWrite(info) def doWrite(info: ConnectionInfo): Unit = pendingWrite = pendingWrite.doWrite(info)