io: probe TCP socket when reading before registering interest (#30354)

Just asking once for more data is cheaper than instantly updating epoll.
This commit is contained in:
Johannes Rudolph 2021-07-22 10:32:18 +02:00 committed by GitHub
parent 70ba0a1af0
commit 82aa15e6ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 19 deletions

View file

@ -390,28 +390,19 @@ class TcpConnectionSpec extends AkkaSpec("""
connectionHandler.expectNoMessage(100.millis) connectionHandler.expectNoMessage(100.millis)
connectionActor ! ResumeReading connectionActor ! ResumeReading
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(ts) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(ts)
interestCallReceiver.expectNoMessage(100.millis)
connectionHandler.expectNoMessage(100.millis) connectionHandler.expectNoMessage(100.millis)
connectionActor ! ResumeReading connectionActor ! ResumeReading
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(us) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(us)
// make sure that after reading all pending data we don't yet register for reading more data
interestCallReceiver.expectNoMessage(100.millis)
connectionHandler.expectNoMessage(100.millis) connectionHandler.expectNoMessage(100.millis)
val vs = "v" * (maxBufferSize / 2) val vs = "v" * (maxBufferSize / 2)
serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII"))) serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII")))
connectionActor ! ResumeReading connectionActor ! ResumeReading
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs)
} finally shutdown(system) } finally shutdown(system)

View file

@ -0,0 +1,2 @@
# internal actor class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.resumeReading")

View file

@ -76,7 +76,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
// if we are in push mode or already have resumed reading in pullMode while waiting for Register // if we are in push mode or already have resumed reading in pullMode while waiting for Register
// then register OP_READ interest // then register OP_READ interest
if (!pullMode || (/*pullMode && */ !readingSuspended)) resumeReading(info) if (!pullMode || (/*pullMode && */ !readingSuspended)) resumeReading(info, None)
context.setReceiveTimeout(Duration.Undefined) context.setReceiveTimeout(Duration.Undefined)
context.become(connected(info)) context.become(connected(info))
@ -101,7 +101,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
def connected(info: ConnectionInfo): Receive = def connected(info: ConnectionInfo): Receive =
handleWriteMessages(info).orElse { handleWriteMessages(info).orElse {
case SuspendReading => suspendReading(info) case SuspendReading => suspendReading(info)
case ResumeReading => resumeReading(info) case ResumeReading => resumeReading(info, None)
case ChannelReadable => doRead(info, None) case ChannelReadable => doRead(info, None)
case cmd: CloseCommand => handleClose(info, Some(sender()), cmd.event) case cmd: CloseCommand => handleClose(info, Some(sender()), cmd.event)
} }
@ -119,7 +119,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
closeCommander: Option[ActorRef], closeCommander: Option[ActorRef],
closedEvent: ConnectionClosed): Receive = { closedEvent: ConnectionClosed): Receive = {
case SuspendReading => suspendReading(info) case SuspendReading => suspendReading(info)
case ResumeReading => resumeReading(info) case ResumeReading => resumeReading(info, closeCommander)
case ChannelReadable => doRead(info, closeCommander) case ChannelReadable => doRead(info, closeCommander)
case ChannelWritable => case ChannelWritable =>
@ -141,7 +141,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
/** connection is closed on our side and we're waiting from confirmation from the other side */ /** connection is closed on our side and we're waiting from confirmation from the other side */
def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = { def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = {
case SuspendReading => suspendReading(info) case SuspendReading => suspendReading(info)
case ResumeReading => resumeReading(info) case ResumeReading => resumeReading(info, closeCommander)
case ChannelReadable => doRead(info, closeCommander) case ChannelReadable => doRead(info, closeCommander)
case Close => doCloseConnection(info.handler, closeCommander, Close.event) case Close => doCloseConnection(info.handler, closeCommander, Close.event)
case Abort => handleClose(info, Some(sender()), Aborted) case Abort => handleClose(info, Some(sender()), Aborted)
@ -240,9 +240,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
readingSuspended = true readingSuspended = true
info.registration.disableInterest(OP_READ) info.registration.disableInterest(OP_READ)
} }
def resumeReading(info: ConnectionInfo): Unit = { def resumeReading(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = {
readingSuspended = false readingSuspended = false
info.registration.enableInterest(OP_READ) doRead(info, closeCommander)
} }
/** /**
@ -267,7 +267,14 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
readBytes match { readBytes match {
case `maxBufferSpace` => case `maxBufferSpace` =>
if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace) if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace)
case x if x >= 0 => AllRead case x if x >= 0 =>
if (!pullMode || x == 0)
// if (pullMode) we reach here by probing from resumeReading,
// otherwise we have just exhausted the network receive buffer.
// In any case, we now want to be notified about more data being available
info.registration.enableInterest(OP_READ)
AllRead
case -1 => EndOfStream case -1 => EndOfStream
case _ => case _ =>
throw new IllegalStateException("Unexpected value returned from read: " + readBytes) throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
@ -276,8 +283,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
val buffer = bufferPool.acquire() val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match { try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead => case AllRead => // nothing to do
if (!pullMode) info.registration.enableInterest(OP_READ)
case MoreDataWaiting => case MoreDataWaiting =>
if (!pullMode) self ! ChannelReadable if (!pullMode) self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown => case EndOfStream if channel.socket.isOutputShutdown =>