+act #3586 #3807 Implement mandatory read throttling

This commit is contained in:
Endre Sándor Varga 2014-01-07 15:50:36 +01:00
parent 537840bd2a
commit 487083a9c3
13 changed files with 467 additions and 74 deletions

View file

@ -134,7 +134,7 @@ class TcpConnectionSpec extends AkkaSpec("""
}
}
"receive data directly when the connection is established" in new UnacceptedConnectionTest {
"receive data directly when the connection is established" in new UnacceptedConnectionTest() {
run {
val serverSideChannel = acceptServerSideConnection(localServerChannel)
@ -355,6 +355,29 @@ class TcpConnectionSpec extends AkkaSpec("""
}
}
"respect pull mode" in new EstablishedConnectionTest(pullMode = true) {
run {
serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII")))
connectionHandler.expectNoMsg(100.millis)
connectionActor ! ResumeReading
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata")
// have two packets in flight before the selector notices
serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII")))
serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII")))
connectionHandler.expectNoMsg(100.millis)
connectionActor ! ResumeReading
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata2testdata3")
}
}
"close the connection and reply with `Closed` upon reception of a `Close` command" in
new EstablishedConnectionTest() with SmallRcvBuffer {
run {
@ -804,8 +827,9 @@ class TcpConnectionSpec extends AkkaSpec("""
def createConnectionActor(serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil,
timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] = {
val ref = createConnectionActorWithoutRegistration(serverAddress, options, timeout)
timeout: Option[FiniteDuration] = None,
pullMode: Boolean = false): TestActorRef[TcpOutgoingConnection] = {
val ref = createConnectionActorWithoutRegistration(serverAddress, options, timeout, pullMode)
ref ! newChannelRegistration
ref
}
@ -818,9 +842,11 @@ class TcpConnectionSpec extends AkkaSpec("""
def createConnectionActorWithoutRegistration(serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil,
timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] =
timeout: Option[FiniteDuration] = None,
pullMode: Boolean = false): TestActorRef[TcpOutgoingConnection] =
TestActorRef(
new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, Connect(serverAddress, options = options, timeout = timeout)) {
new TcpOutgoingConnection(Tcp(system), this, userHandler.ref,
Connect(serverAddress, options = options, timeout = timeout, pullMode = pullMode)) {
override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart
})
}
@ -829,9 +855,9 @@ class TcpConnectionSpec extends AkkaSpec("""
override def setServerSocketOptions(): Unit = localServerChannel.socket.setReceiveBufferSize(1024)
}
abstract class UnacceptedConnectionTest extends LocalServerTest {
abstract class UnacceptedConnectionTest(pullMode: Boolean = false) extends LocalServerTest {
// lazy init since potential exceptions should not be triggered in the constructor but during execution of `run`
private[io] lazy val connectionActor = createConnectionActor(serverAddress)
private[io] lazy val connectionActor = createConnectionActor(serverAddress, pullMode = pullMode)
// calling .underlyingActor ensures that the actor is actually created at this point
lazy val clientSideChannel = connectionActor.underlyingActor.channel
@ -842,8 +868,11 @@ class TcpConnectionSpec extends AkkaSpec("""
}
}
abstract class EstablishedConnectionTest(keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true)
extends UnacceptedConnectionTest {
abstract class EstablishedConnectionTest(
keepOpenOnPeerClosed: Boolean = false,
useResumeWriting: Boolean = true,
pullMode: Boolean = false)
extends UnacceptedConnectionTest(pullMode) {
// lazy init since potential exceptions should not be triggered in the constructor but during execution of `run`
lazy val serverSideChannel = acceptServerSideConnection(localServerChannel)
@ -863,7 +892,7 @@ class TcpConnectionSpec extends AkkaSpec("""
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting))
interestCallReceiver.expectMsg(OP_READ)
if (!pullMode) interestCallReceiver.expectMsg(OP_READ)
clientSelectionKey // trigger initialization
serverSelectionKey // trigger initialization

View file

@ -22,9 +22,9 @@ class TcpListenerSpec extends AkkaSpec("""
"A TcpListener" must {
"register its ServerSocketChannel with its selector" in new TestSetup
"register its ServerSocketChannel with its selector" in new TestSetup(pullMode = false)
"let the Bind commander know when binding is completed" in new TestSetup {
"let the Bind commander know when binding is completed" in new TestSetup(pullMode = false) {
listener ! new ChannelRegistration {
def disableInterest(op: Int) = ()
def enableInterest(op: Int) = ()
@ -32,7 +32,7 @@ class TcpListenerSpec extends AkkaSpec("""
bindCommander.expectMsgType[Bound]
}
"accept acceptable connections and register them with its parent" in new TestSetup {
"accept acceptable connections and register them with its parent" in new TestSetup(pullMode = false) {
bindListener()
attemptConnectionToEndpoint()
@ -52,7 +52,7 @@ class TcpListenerSpec extends AkkaSpec("""
expectWorkerForCommand
}
"continue to accept connections after a previous accept" in new TestSetup {
"continue to accept connections after a previous accept" in new TestSetup(pullMode = false) {
bindListener()
attemptConnectionToEndpoint()
@ -68,7 +68,41 @@ class TcpListenerSpec extends AkkaSpec("""
interestCallReceiver.expectMsg(OP_ACCEPT)
}
"react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup {
"not accept connections after a previous accept until read is reenabled" in new TestSetup(pullMode = true) {
bindListener()
attemptConnectionToEndpoint()
expectNoMsg(100.millis)
listener ! ResumeAccepting(batchSize = 1)
listener ! ChannelAcceptable
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
interestCallReceiver.expectMsg(OP_ACCEPT)
// No more accepts are allowed now
interestCallReceiver.expectNoMsg(100.millis)
listener ! ResumeAccepting(batchSize = 2)
interestCallReceiver.expectMsg(OP_ACCEPT)
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
// There is still one token remaining, accepting
interestCallReceiver.expectMsg(OP_ACCEPT)
attemptConnectionToEndpoint()
listener ! ChannelAcceptable
expectWorkerForCommand
selectorRouter.expectNoMsg(100.millis)
// Tokens are depleted now
interestCallReceiver.expectNoMsg(100.millis)
}
"react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup(pullMode = false) {
bindListener()
val unbindCommander = TestProbe()
@ -78,7 +112,7 @@ class TcpListenerSpec extends AkkaSpec("""
parent.expectTerminated(listener)
}
"drop an incoming connection if it cannot be registered with a selector" in new TestSetup {
"drop an incoming connection if it cannot be registered with a selector" in new TestSetup(pullMode = false) {
bindListener()
attemptConnectionToEndpoint()
@ -95,7 +129,7 @@ class TcpListenerSpec extends AkkaSpec("""
val counter = Iterator.from(0)
class TestSetup {
class TestSetup(pullMode: Boolean) {
val handler = TestProbe()
val handlerRef = handler.ref
val bindCommander = TestProbe()
@ -106,9 +140,9 @@ class TcpListenerSpec extends AkkaSpec("""
var registerCallReceiver = TestProbe()
var interestCallReceiver = TestProbe()
private val parentRef = TestActorRef(new ListenerParent)
private val parentRef = TestActorRef(new ListenerParent(pullMode))
registerCallReceiver.expectMsg(OP_ACCEPT)
registerCallReceiver.expectMsg(if (pullMode) 0 else OP_ACCEPT)
def bindListener() {
listener ! new ChannelRegistration {
@ -130,10 +164,10 @@ class TcpListenerSpec extends AkkaSpec("""
chan
}
private class ListenerParent extends Actor with ChannelRegistry {
private class ListenerParent(pullMode: Boolean) extends Actor with ChannelRegistry {
val listener = context.actorOf(
props = Props(classOf[TcpListener], selectorRouter.ref, Tcp(system), this, bindCommander.ref,
Bind(handler.ref, endpoint, 100, Nil)).withDeploy(Deploy.local),
Bind(handler.ref, endpoint, 100, Nil, pullMode)).withDeploy(Deploy.local),
name = "test-listener-" + counter.next())
parent.watch(listener)
def receive: Receive = {

View file

@ -113,7 +113,8 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
timeout: Option[FiniteDuration] = None) extends Command
timeout: Option[FiniteDuration] = None,
pullMode: Boolean = false) extends Command
/**
* The Bind message is send to the TCP manager actor, which is obtained via
@ -137,7 +138,8 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
case class Bind(handler: ActorRef,
localAddress: InetSocketAddress,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil) extends Command
options: immutable.Traversable[SocketOption] = Nil,
pullMode: Boolean = false) extends Command
/**
* This message must be sent to a TCP connection actor after receiving the
@ -392,6 +394,13 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
*/
case object ResumeReading extends Command
/**
* This message enables the accepting of the next connection if read throttling is enabled
* for connection actors.
* @param batchSize The number of connections to accept before waiting for the next resume command
*/
case class ResumeAccepting(batchSize: Int) extends Command
/// EVENTS
/**
* Common interface for all events generated by the TCP layer actors.
@ -608,35 +617,19 @@ object TcpMessage {
* @param localAddress optionally specifies a specific address to bind to
* @param options Please refer to [[TcpSO]] for a list of all supported options.
* @param timeout is the desired connection timeout, `null` means "no timeout"
* @param pullMode enables pull based reading from the connection
*/
def connect(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption],
timeout: FiniteDuration): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout))
timeout: FiniteDuration,
pullMode: Boolean): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout), pullMode)
/**
* Connect to the given `remoteAddress` with an optional `localAddress` to bind to given the specified Socket Options
*/
def connect(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, Option(localAddress), options, None)
/**
* Connect to the given `remoteAddress` without binding to a local address.
*/
def connect(remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options, None)
/**
* Connect to the given `remoteAddress` without binding to a local address and without
* specifying options.
*/
def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil, None)
/**
* Connect to the given `remoteAddress` with a connection `timeout` without binding to a local address and without
* specifying options.
*/
def connect(remoteAddress: InetSocketAddress, timeout: FiniteDuration): Command = Connect(remoteAddress, None, Nil, Option(timeout))
def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil, None, pullMode = false)
/**
* The Bind message is send to the TCP manager actor, which is obtained via
@ -656,11 +649,15 @@ object TcpMessage {
* kernel will hold for this port before refusing connections.
*
* @param options Please refer to [[TcpSO]] for a list of all supported options.
*
* @param pullMode enables pull based accepting and of connections and pull
* based reading from the accepted connections.
*/
def bind(handler: ActorRef,
endpoint: InetSocketAddress,
backlog: Int,
options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options)
options: JIterable[SocketOption],
pullMode: Boolean): Command = Bind(handler, endpoint, backlog, options, pullMode)
/**
* Open a listening socket without specifying options.
*/
@ -789,6 +786,13 @@ object TcpMessage {
*/
def resumeReading: Command = ResumeReading
/**
* This message enables the accepting of the next connection if pull reading is enabled
* for connection actors.
* @param batchSize The number of connections to accept before waiting for the next resume command
*/
def resumeAccepting(batchSize: Int): Command = ResumeAccepting(batchSize)
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
akka.japi.Util.immutableSeq(coll)
}

View file

@ -25,7 +25,7 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
*
* INTERNAL API
*/
private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel)
private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel, val pullMode: Boolean)
extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import tcp.Settings._
@ -35,7 +35,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var pendingWrite: PendingWrite = EmptyPendingWrite
private[this] var peerClosed = false
private[this] var writingSuspended = false
private[this] var readingSuspended = false
private[this] var readingSuspended = pullMode
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
@ -55,7 +55,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
doRead(info, None) // immediately try reading
if (!pullMode) doRead(info, None) // immediately try reading
context.setReceiveTimeout(Duration.Undefined)
context.become(connected(info))
@ -215,8 +215,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
val buffer = bufferPool.acquire()
try innerRead(buffer, ReceivedMessageSizeLimit) match {
case AllRead info.registration.enableInterest(OP_READ)
case MoreDataWaiting self ! ChannelReadable
case AllRead
if (!pullMode) info.registration.enableInterest(OP_READ)
case MoreDataWaiting
if (!pullMode) self ! ChannelReadable
case EndOfStream if channel.socket.isOutputShutdown
if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed")
doCloseConnection(info.handler, closeCommander, ConfirmedClosed)

View file

@ -19,8 +19,9 @@ private[io] class TcpIncomingConnection(_tcp: TcpExt,
_channel: SocketChannel,
registry: ChannelRegistry,
bindHandler: ActorRef,
options: immutable.Traversable[SocketOption])
extends TcpConnection(_tcp, _channel) {
options: immutable.Traversable[SocketOption],
readThrottling: Boolean)
extends TcpConnection(_tcp, _channel, readThrottling) {
context.watch(bindHandler) // sign death pact

View file

@ -44,6 +44,8 @@ private[io] class TcpListener(selectorRouter: ActorRef,
val channel = ServerSocketChannel.open
channel.configureBlocking(false)
var acceptLimit = if (bind.pullMode) 0 else BatchAcceptLimit
val localAddress =
try {
val socket = channel.socket
@ -53,7 +55,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
case isa: InetSocketAddress isa
case x throw new IllegalArgumentException(s"bound to unknown SocketAddress [$x]")
}
channelRegistry.register(channel, SelectionKey.OP_ACCEPT)
channelRegistry.register(channel, if (bind.pullMode) 0 else SelectionKey.OP_ACCEPT)
log.debug("Successfully bound to {}", ret)
ret
} catch {
@ -73,7 +75,12 @@ private[io] class TcpListener(selectorRouter: ActorRef,
def bound(registration: ChannelRegistration): Receive = {
case ChannelAcceptable
acceptAllPending(registration, BatchAcceptLimit)
acceptLimit = acceptAllPending(registration, acceptLimit)
if (acceptLimit > 0) registration.enableInterest(SelectionKey.OP_ACCEPT)
case ResumeAccepting(batchSize)
acceptLimit = batchSize
registration.enableInterest(SelectionKey.OP_ACCEPT)
case FailedRegisterIncoming(socketChannel)
log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection")
@ -90,7 +97,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
context.stop(self)
}
@tailrec final def acceptAllPending(registration: ChannelRegistration, limit: Int): Unit = {
@tailrec final def acceptAllPending(registration: ChannelRegistration, limit: Int): Int = {
val socketChannel =
if (limit > 0) {
try channel.accept()
@ -102,10 +109,10 @@ private[io] class TcpListener(selectorRouter: ActorRef,
log.debug("New connection accepted")
socketChannel.configureBlocking(false)
def props(registry: ChannelRegistry) =
Props(classOf[TcpIncomingConnection], tcp, socketChannel, registry, bind.handler, bind.options)
Props(classOf[TcpIncomingConnection], tcp, socketChannel, registry, bind.handler, bind.options, bind.pullMode)
selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, props)
acceptAllPending(registration, limit - 1)
} else registration.enableInterest(SelectionKey.OP_ACCEPT)
} else if (bind.pullMode) limit else BatchAcceptLimit
}
override def postStop() {

View file

@ -24,7 +24,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
channelRegistry: ChannelRegistry,
commander: ActorRef,
connect: Connect)
extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel]) {
extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel], connect.pullMode) {
import connect._

View file

@ -44,7 +44,7 @@ public class IODocTest {
1234);
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(TcpSO.keepAlive(true));
tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf());
tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options, null, false), getSelf());
//#connect-with-options
} else
//#connected
@ -80,7 +80,7 @@ public class IODocTest {
1234);
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(TcpSO.reuseAddress(true));
tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf());
tcp.tell(TcpMessage.bind(handler, localAddr, 10, options, false), getSelf());
//#bind
}
}

View file

@ -0,0 +1,93 @@
package docs.io;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.io.Inet;
import akka.io.Tcp;
import akka.io.TcpMessage;
import akka.util.ByteString;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
public class JavaReadBackPressure {
static public class Listener extends UntypedActor {
ActorRef tcp;
ActorRef listener;
@Override
//#pull-accepting
public void onReceive(Object message) throws Exception {
if (message instanceof Tcp.Bound) {
listener = getSender();
// Accept connections one by one
listener.tell(TcpMessage.resumeAccepting(1), getSelf());
} else if (message instanceof Tcp.Connected) {
ActorRef handler = getContext().actorOf(Props.create(PullEcho.class, getSender()));
getSender().tell(TcpMessage.register(handler), getSelf());
// Resume accepting connections
listener.tell(TcpMessage.resumeAccepting(1), getSelf());
}
}
//#pull-accepting
@Override
public void preStart() throws Exception {
//#pull-mode-bind
tcp = Tcp.get(getContext().system()).manager();
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
tcp.tell(
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100, options, true),
getSelf()
);
//#pull-mode-bind
}
private void demonstrateConnect() {
//#pull-mode-connect
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
tcp.tell(
TcpMessage.connect(new InetSocketAddress("localhost", 3000), null, options, null, true),
getSelf()
);
//#pull-mode-connect
}
}
static public class Ack implements Tcp.Event {
}
static public class PullEcho extends UntypedActor {
final ActorRef connection;
public PullEcho(ActorRef connection) {
this.connection = connection;
}
//#pull-reading-echo
@Override
public void preStart() throws Exception {
connection.tell(TcpMessage.resumeReading(), getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Tcp.Received) {
ByteString data = ((Tcp.Received) message).data();
connection.tell(TcpMessage.write(data, new Ack()), getSelf());
} else if (message instanceof Ack) {
connection.tell(TcpMessage.resumeReading(), getSelf());
}
}
//#pull-reading-echo
}
}

View file

@ -160,8 +160,9 @@ Throttling Reads and Writes
The basic model of the TCP connection actor is that it has no internal
buffering (i.e. it can only process one write at a time, meaning it can buffer
one write until it has been passed on to the O/S kernel in full). Congestion
needs to be handled at the user level, for which there are three modes of
operation:
needs to be handled at the user level, for both writes and reads.
For back-pressuring writes there are three modes of operation
* *ACK-based:* every :class:`Write` command carries an arbitrary object, and if
this object is not ``Tcp.NoAck`` then it will be returned to the sender of
@ -186,18 +187,32 @@ operation:
:class:`WritingResumed` signal then every message is delivered exactly once
to the network socket.
These models (with the exception of the second which is rather specialised) are
These write models (with the exception of the second which is rather specialised) are
demonstrated in complete examples below. The full and contiguous source is
available `on github <@github@/akka-docs/rst/java/code/docs/io/japi>`_.
For back-pressuring reads there are two modes of operation
* *Push-reading:* in this mode the connection actor sends the registered reader actor
incoming data as soon as available as :class:`Received` events. Whenever the reader actor
wants to signal back-pressure to the remote TCP endpoint it can send a :class:`SuspendReading`
message to the connection actor to indicate that it wants to suspend the
reception of new data. No :class:`Received` events will arrive until a corresponding
:class:`ResumeReading` is sent indicating that the receiver actor is ready again.
* *Pull-reading:* after sending a :class:`Received` event the connection
actor automatically suspends accepting data from the socket until the reader actor signals
with a :class:`ResumeReading` message that it is ready to process more input data. Hence
new data is "pulled" from the connection by sending :class:`ResumeReading` messages.
.. note::
It should be obvious that all these flow control schemes only work between
one writer and one connection actor; as soon as multiple actors send write
one writer/reader and one connection actor; as soon as multiple actors send write
commands to a single connection no consistent result can be achieved.
ACK-Based Back-Pressure
-----------------------
ACK-Based Write Back-Pressure
-----------------------------
For proper function of the following example it is important to configure the
connection to remain half-open when the remote side closed its writing end:
@ -236,7 +251,7 @@ the remote side from writing, filling up its write buffer, until finally the
writer on the other side cannot push any data into the socket anymore. This is
how end-to-end back-pressure is realized across a TCP connection.
NACK-Based Back-Pressure with Write Suspending
NACK-Based Write Back-Pressure with Suspending
----------------------------------------------
.. includecode:: code/docs/io/japi/EchoHandler.java#echo-handler
@ -270,3 +285,56 @@ behavior to await the :class:`WritingResumed` event and start over.
The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/japi/EchoHandler.java#helpers
Read Back-Pressure with Pull Mode
---------------------------------
When using push based reading, data coming from the socket is sent to the actor as soon
as it is available. In the case of the previous Echo server example
this meant that we needed to maintain a buffer of incoming data to keep it around
since the rate of writing might be slower than the rate of the arrival of new data.
With the Pull mode this buffer can be completely eliminated as the following snippet
demonstrates:
.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-reading-echo
The idea here is that reading is not resumed until the previous write has been
completely acknowledged by the connection actor. Every pull mode connection
actor starts from suspended state. To start the flow of data we send a
``ResumeReading`` in the ``preStart`` method to tell the connection actor that
we are ready to receive the first chunk of data. Since we only resume reading when
the previous data chunk has been completely written there is no need for maintaining
a buffer.
To enable pull reading on an outbound connection the ``pullMode`` parameter of
the :class:`Connect` should be set to ``true``:
.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-mode-connect
Pull Mode Reading for Inbound Connections
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The previous section demonstrated how to enable pull reading mode for outbound
connections but it is possible to create a listener actor with this mode of reading
by setting the ``pullMode`` parameter of the :class:`Bind` command to ``true``:
.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-mode-bind
One of the effects of this setting is that all connections accepted by this listener
actor will use pull mode reading.
Another effect of this setting is that in addition of setting all inbound connections to
pull mode, accepting connections becomes pull based, too. This means that after handling
one (or more) :class:`Connected` events the listener actor has to be resumed by sending
it a :class:`ResumeAccepting` message.
Listener actors with pull mode start suspended so to start accepting connections
a :class:`ResumeAccepting` command has to be sent to the listener actor after binding was successful:
.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-accepting
As shown in the example after handling an incoming connection we need to resume accepting again.
The :class:`ResumeAccepting` message accepts a ``batchSize`` parameter that specifies how
many new connections are accepted before a next :class:`ResumeAccepting` message
is needed to resume handling of new connections.

View file

@ -56,7 +56,7 @@ class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging {
case Bound(localAddress) =>
log.info("listening on port {}", localAddress.getPort)
case CommandFailed(Bind(_, local, _, _)) =>
case CommandFailed(Bind(_, local, _, _, _)) =>
log.warning(s"cannot bind to [$local]")
context stop self

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import akka.actor.{ ActorRef, ActorLogging, Props, Actor, ActorSystem }
import akka.io.Tcp._
import akka.io.{ Tcp, IO }
import java.net.InetSocketAddress
import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec }
import akka.util.ByteString
object PullReadingExample {
class Listener(monitor: ActorRef) extends Actor {
import context.system
override def preStart: Unit =
//#pull-mode-bind
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0), pullMode = true)
//#pull-mode-bind
def receive = {
//#pull-accepting
case Bound(localAddress) =>
// Accept connections one by one
sender ! ResumeAccepting(batchSize = 1)
context.become(listening(sender))
//#pull-accepting
monitor ! localAddress
}
//#pull-accepting-cont
def listening(listener: ActorRef): Receive = {
case Connected(remote, local) =>
val handler = context.actorOf(Props(classOf[PullEcho], sender))
sender ! Register(handler, keepOpenOnPeerClosed = true)
listener ! ResumeAccepting(batchSize = 1)
}
//#pull-accepting-cont
}
case object Ack extends Event
class PullEcho(connection: ActorRef) extends Actor {
//#pull-reading-echo
override def preStart: Unit = connection ! ResumeReading
def receive = {
case Received(data) => connection ! Write(data, Ack)
case Ack => connection ! ResumeReading
}
//#pull-reading-echo
}
}
class PullReadingSpec extends AkkaSpec with ImplicitSender {
"demonstrate pull reading" in {
val probe = TestProbe()
system.actorOf(Props(classOf[PullReadingExample.Listener], probe.ref), "server")
val listenAddress = probe.expectMsgType[InetSocketAddress]
//#pull-mode-connect
IO(Tcp) ! Connect(listenAddress, pullMode = true)
//#pull-mode-connect
expectMsgType[Connected]
val connection = lastSender
val client = TestProbe()
client.send(connection, Register(client.ref))
client.send(connection, Write(ByteString("hello")))
client.send(connection, ResumeReading)
client.expectMsg(Received(ByteString("hello")))
system.shutdown()
system.awaitTermination
}
}

View file

@ -161,8 +161,9 @@ Throttling Reads and Writes
The basic model of the TCP connection actor is that it has no internal
buffering (i.e. it can only process one write at a time, meaning it can buffer
one write until it has been passed on to the O/S kernel in full). Congestion
needs to be handled at the user level, for which there are three modes of
operation:
needs to be handled at the user level, for both writes and reads.
For back-pressuring writes there are three modes of operation
* *ACK-based:* every :class:`Write` command carries an arbitrary object, and if
this object is not ``Tcp.NoAck`` then it will be returned to the sender of
@ -187,18 +188,32 @@ operation:
:class:`WritingResumed` signal then every message is delivered exactly once
to the network socket.
These models (with the exception of the second which is rather specialised) are
These write back-pressure models (with the exception of the second which is rather specialised) are
demonstrated in complete examples below. The full and contiguous source is
available `on github <@github@/akka-docs/rst/scala/code/docs/io/EchoServer.scala>`_.
For back-pressuring reads there are two modes of operation
* *Push-reading:* in this mode the connection actor sends the registered reader actor
incoming data as soon as available as :class:`Received` events. Whenever the reader actor
wants to signal back-pressure to the remote TCP endpoint it can send a :class:`SuspendReading`
message to the connection actor to indicate that it wants to suspend the
reception of new data. No :class:`Received` events will arrive until a corresponding
:class:`ResumeReading` is sent indicating that the receiver actor is ready again.
* *Pull-reading:* after sending a :class:`Received` event the connection
actor automatically suspends accepting data from the socket until the reader actor signals
with a :class:`ResumeReading` message that it is ready to process more input data. Hence
new data is "pulled" from the connection by sending :class:`ResumeReading` messages.
.. note::
It should be obvious that all these flow control schemes only work between
one writer and one connection actor; as soon as multiple actors send write
one writer/reader and one connection actor; as soon as multiple actors send write
commands to a single connection no consistent result can be achieved.
ACK-Based Back-Pressure
-----------------------
ACK-Based Write Back-Pressure
-----------------------------
For proper function of the following example it is important to configure the
connection to remain half-open when the remote side closed its writing end:
@ -237,7 +252,7 @@ the remote side from writing, filling up its write buffer, until finally the
writer on the other side cannot push any data into the socket anymore. This is
how end-to-end back-pressure is realized across a TCP connection.
NACK-Based Back-Pressure with Write Suspending
NACK-Based Write Back-Pressure with Suspending
----------------------------------------------
.. includecode:: code/docs/io/EchoServer.scala#echo-handler
@ -271,3 +286,60 @@ behavior to await the :class:`WritingResumed` event and start over.
The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/EchoServer.scala#helpers
Read Back-Pressure with Pull Mode
---------------------------------
When using push based reading, data coming from the socket is sent to the actor as soon
as it is available. In the case of the previous Echo server example
this meant that we needed to maintain a buffer of incoming data to keep it around
since the rate of writing might be slower than the rate of the arrival of new data.
With the Pull mode this buffer can be completely eliminated as the following snippet
demonstrates:
.. includecode:: code/docs/io/ReadBackPressure.scala#pull-reading-echo
The idea here is that reading is not resumed until the previous write has been
completely acknowledged by the connection actor. Every pull mode connection
actor starts from suspended state. To start the flow of data we send a
``ResumeReading`` in the ``preStart`` method to tell the connection actor that
we are ready to receive the first chunk of data. Since we only resume reading when
the previous data chunk has been completely written there is no need for maintaining
a buffer.
To enable pull reading on an outbound connection the ``pullMode`` parameter of
the :class:`Connect` should be set to ``true``:
.. includecode:: code/docs/io/ReadBackPressure.scala#pull-mode-connect
Pull Mode Reading for Inbound Connections
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The previous section demonstrated how to enable pull reading mode for outbound
connections but it is possible to create a listener actor with this mode of reading
by setting the ``pullMode`` parameter of the :class:`Bind` command to ``true``:
.. includecode:: code/docs/io/ReadBackPressure.scala#pull-mode-bind
One of the effects of this setting is that all connections accepted by this listener
actor will use pull mode reading.
Another effect of this setting is that in addition of setting all inbound connections to
pull mode, accepting connections becomes pull based, too. This means that after handling
one (or more) :class:`Connected` events the listener actor has to be resumed by sending
it a :class:`ResumeAccepting` message.
Listener actors with pull mode start suspended so to start accepting connections
a :class:`ResumeAccepting` command has to be sent to the listener actor after binding was successful:
.. includecode:: code/docs/io/ReadBackPressure.scala#pull-accepting
After handling an incoming connection we need to resume accepting again:
.. includecode:: code/docs/io/ReadBackPressure.scala#pull-accepting-cont
The :class:`ResumeAccepting` accepts a ``batchSize`` parameter that specifies how
many new connections are accepted before a next :class:`ResumeAccepting` message
is needed to resume handling of new connections.