Merge pull request #1909 from drewhk/wip-io-read-throtting-drewhk

+act #3586 #3807 Implement mandatory read throttling
This commit is contained in:
drewhk 2014-01-17 11:53:23 -08:00
commit 1db16428da
13 changed files with 467 additions and 74 deletions

View file

@ -25,7 +25,7 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
*
* INTERNAL API
*/
private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel)
private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel, val pullMode: Boolean)
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import tcp.Settings._
@ -35,7 +35,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var readingSuspended = false
private[this] var readingSuspended = pullMode
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
@ -55,7 +55,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
doRead(info, None) // immediately try reading
if (!pullMode) doRead(info, None) // immediately try reading
context.setReceiveTimeout(Duration.Undefined)
context.become(connected(info))
@ -215,8 +215,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case AllRead
if (!pullMode) info.registration.enableInterest(OP_READ)
case MoreDataWaiting
if (!pullMode) self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)