diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 97e3e2fde1..1f3c308b3c 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -390,28 +390,19 @@ class TcpConnectionSpec extends AkkaSpec(""" connectionHandler.expectNoMessage(100.millis) connectionActor ! ResumeReading - interestCallReceiver.expectMsg(OP_READ) - selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(ts) - interestCallReceiver.expectNoMessage(100.millis) connectionHandler.expectNoMessage(100.millis) connectionActor ! ResumeReading - interestCallReceiver.expectMsg(OP_READ) - selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(us) - // make sure that after reading all pending data we don't yet register for reading more data - interestCallReceiver.expectNoMessage(100.millis) connectionHandler.expectNoMessage(100.millis) val vs = "v" * (maxBufferSize / 2) serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII"))) connectionActor ! ResumeReading - interestCallReceiver.expectMsg(OP_READ) - selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs) } finally shutdown(system) diff --git a/akka-actor/src/main/mima-filters/2.6.15.backwards.excludes/30354-improve-TcpConnection.excludes b/akka-actor/src/main/mima-filters/2.6.15.backwards.excludes/30354-improve-TcpConnection.excludes new file mode 100644 index 0000000000..d2b0590f41 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.15.backwards.excludes/30354-improve-TcpConnection.excludes @@ -0,0 +1,2 @@ +# internal actor class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.resumeReading") \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 371d9cd63e..38d5491733 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -76,7 +76,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha // if we are in push mode or already have resumed reading in pullMode while waiting for Register // then register OP_READ interest - if (!pullMode || (/*pullMode && */ !readingSuspended)) resumeReading(info) + if (!pullMode || (/*pullMode && */ !readingSuspended)) resumeReading(info, None) context.setReceiveTimeout(Duration.Undefined) context.become(connected(info)) @@ -101,7 +101,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha def connected(info: ConnectionInfo): Receive = handleWriteMessages(info).orElse { case SuspendReading => suspendReading(info) - case ResumeReading => resumeReading(info) + case ResumeReading => resumeReading(info, None) case ChannelReadable => doRead(info, None) case cmd: CloseCommand => handleClose(info, Some(sender()), cmd.event) } @@ -119,7 +119,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Receive = { case SuspendReading => suspendReading(info) - case ResumeReading => resumeReading(info) + case ResumeReading => resumeReading(info, closeCommander) case ChannelReadable => doRead(info, closeCommander) case ChannelWritable => @@ -141,7 +141,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha /** connection is closed on our side and we're waiting from confirmation from the other side */ def closing(info: ConnectionInfo, closeCommander: Option[ActorRef]): Receive = { case SuspendReading => suspendReading(info) - case ResumeReading => resumeReading(info) + case ResumeReading => resumeReading(info, closeCommander) case ChannelReadable => doRead(info, closeCommander) case Close => doCloseConnection(info.handler, closeCommander, Close.event) case Abort => handleClose(info, Some(sender()), Aborted) @@ -240,9 +240,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha readingSuspended = true info.registration.disableInterest(OP_READ) } - def resumeReading(info: ConnectionInfo): Unit = { + def resumeReading(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = { readingSuspended = false - info.registration.enableInterest(OP_READ) + doRead(info, closeCommander) } /** @@ -267,8 +267,15 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha readBytes match { case `maxBufferSpace` => if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace) - case x if x >= 0 => AllRead - case -1 => EndOfStream + case x if x >= 0 => + if (!pullMode || x == 0) + // if (pullMode) we reach here by probing from resumeReading, + // otherwise we have just exhausted the network receive buffer. + // In any case, we now want to be notified about more data being available + info.registration.enableInterest(OP_READ) + + AllRead + case -1 => EndOfStream case _ => throw new IllegalStateException("Unexpected value returned from read: " + readBytes) } @@ -276,8 +283,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha val buffer = bufferPool.acquire() try innerRead(buffer, ReceivedMessageSizeLimit) match { - case AllRead => - if (!pullMode) info.registration.enableInterest(OP_READ) + case AllRead => // nothing to do case MoreDataWaiting => if (!pullMode) self ! ChannelReadable case EndOfStream if channel.socket.isOutputShutdown =>