From 487083a9c3f210ea56ef07f94ba6095f9d8aa418 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 7 Jan 2014 15:50:36 +0100 Subject: [PATCH] +act #3586 #3807 Implement mandatory read throttling --- .../scala/akka/io/TcpConnectionSpec.scala | 49 ++++++++-- .../test/scala/akka/io/TcpListenerSpec.scala | 56 ++++++++--- akka-actor/src/main/scala/akka/io/Tcp.scala | 50 +++++----- .../main/scala/akka/io/TcpConnection.scala | 12 ++- .../scala/akka/io/TcpIncomingConnection.scala | 5 +- .../src/main/scala/akka/io/TcpListener.scala | 17 +++- .../scala/akka/io/TcpOutgoingConnection.scala | 2 +- .../rst/java/code/docs/io/IODocTest.java | 4 +- .../code/docs/io/JavaReadBackPressure.java | 93 +++++++++++++++++++ akka-docs/rst/java/io-tcp.rst | 82 ++++++++++++++-- .../rst/scala/code/docs/io/EchoServer.scala | 2 +- .../scala/code/docs/io/ReadBackPressure.scala | 83 +++++++++++++++++ akka-docs/rst/scala/io-tcp.rst | 86 +++++++++++++++-- 13 files changed, 467 insertions(+), 74 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java create mode 100644 akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index c5e81c93d7..0d71e531fb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -134,7 +134,7 @@ class TcpConnectionSpec extends AkkaSpec(""" } } - "receive data directly when the connection is established" in new UnacceptedConnectionTest { + "receive data directly when the connection is established" in new UnacceptedConnectionTest() { run { val serverSideChannel = acceptServerSideConnection(localServerChannel) @@ -355,6 +355,29 @@ class TcpConnectionSpec extends AkkaSpec(""" } } + "respect pull mode" in new EstablishedConnectionTest(pullMode = true) { + run { + serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) + connectionHandler.expectNoMsg(100.millis) + + connectionActor ! ResumeReading + interestCallReceiver.expectMsg(OP_READ) + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata") + + // have two packets in flight before the selector notices + serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) + serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) + + connectionHandler.expectNoMsg(100.millis) + + connectionActor ! ResumeReading + interestCallReceiver.expectMsg(OP_READ) + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata2testdata3") + } + } + "close the connection and reply with `Closed` upon reception of a `Close` command" in new EstablishedConnectionTest() with SmallRcvBuffer { run { @@ -804,8 +827,9 @@ class TcpConnectionSpec extends AkkaSpec(""" def createConnectionActor(serverAddress: InetSocketAddress = serverAddress, options: immutable.Seq[SocketOption] = Nil, - timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] = { - val ref = createConnectionActorWithoutRegistration(serverAddress, options, timeout) + timeout: Option[FiniteDuration] = None, + pullMode: Boolean = false): TestActorRef[TcpOutgoingConnection] = { + val ref = createConnectionActorWithoutRegistration(serverAddress, options, timeout, pullMode) ref ! newChannelRegistration ref } @@ -818,9 +842,11 @@ class TcpConnectionSpec extends AkkaSpec(""" def createConnectionActorWithoutRegistration(serverAddress: InetSocketAddress = serverAddress, options: immutable.Seq[SocketOption] = Nil, - timeout: Option[FiniteDuration] = None): TestActorRef[TcpOutgoingConnection] = + timeout: Option[FiniteDuration] = None, + pullMode: Boolean = false): TestActorRef[TcpOutgoingConnection] = TestActorRef( - new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, Connect(serverAddress, options = options, timeout = timeout)) { + new TcpOutgoingConnection(Tcp(system), this, userHandler.ref, + Connect(serverAddress, options = options, timeout = timeout, pullMode = pullMode)) { override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart }) } @@ -829,9 +855,9 @@ class TcpConnectionSpec extends AkkaSpec(""" override def setServerSocketOptions(): Unit = localServerChannel.socket.setReceiveBufferSize(1024) } - abstract class UnacceptedConnectionTest extends LocalServerTest { + abstract class UnacceptedConnectionTest(pullMode: Boolean = false) extends LocalServerTest { // lazy init since potential exceptions should not be triggered in the constructor but during execution of `run` - private[io] lazy val connectionActor = createConnectionActor(serverAddress) + private[io] lazy val connectionActor = createConnectionActor(serverAddress, pullMode = pullMode) // calling .underlyingActor ensures that the actor is actually created at this point lazy val clientSideChannel = connectionActor.underlyingActor.channel @@ -842,8 +868,11 @@ class TcpConnectionSpec extends AkkaSpec(""" } } - abstract class EstablishedConnectionTest(keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) - extends UnacceptedConnectionTest { + abstract class EstablishedConnectionTest( + keepOpenOnPeerClosed: Boolean = false, + useResumeWriting: Boolean = true, + pullMode: Boolean = false) + extends UnacceptedConnectionTest(pullMode) { // lazy init since potential exceptions should not be triggered in the constructor but during execution of `run` lazy val serverSideChannel = acceptServerSideConnection(localServerChannel) @@ -863,7 +892,7 @@ class TcpConnectionSpec extends AkkaSpec(""" userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) - interestCallReceiver.expectMsg(OP_READ) + if (!pullMode) interestCallReceiver.expectMsg(OP_READ) clientSelectionKey // trigger initialization serverSelectionKey // trigger initialization diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 2f6c901f6f..098b239a49 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -22,9 +22,9 @@ class TcpListenerSpec extends AkkaSpec(""" "A TcpListener" must { - "register its ServerSocketChannel with its selector" in new TestSetup + "register its ServerSocketChannel with its selector" in new TestSetup(pullMode = false) - "let the Bind commander know when binding is completed" in new TestSetup { + "let the Bind commander know when binding is completed" in new TestSetup(pullMode = false) { listener ! new ChannelRegistration { def disableInterest(op: Int) = () def enableInterest(op: Int) = () @@ -32,7 +32,7 @@ class TcpListenerSpec extends AkkaSpec(""" bindCommander.expectMsgType[Bound] } - "accept acceptable connections and register them with its parent" in new TestSetup { + "accept acceptable connections and register them with its parent" in new TestSetup(pullMode = false) { bindListener() attemptConnectionToEndpoint() @@ -52,7 +52,7 @@ class TcpListenerSpec extends AkkaSpec(""" expectWorkerForCommand } - "continue to accept connections after a previous accept" in new TestSetup { + "continue to accept connections after a previous accept" in new TestSetup(pullMode = false) { bindListener() attemptConnectionToEndpoint() @@ -68,7 +68,41 @@ class TcpListenerSpec extends AkkaSpec(""" interestCallReceiver.expectMsg(OP_ACCEPT) } - "react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup { + "not accept connections after a previous accept until read is reenabled" in new TestSetup(pullMode = true) { + bindListener() + + attemptConnectionToEndpoint() + expectNoMsg(100.millis) + + listener ! ResumeAccepting(batchSize = 1) + listener ! ChannelAcceptable + expectWorkerForCommand + selectorRouter.expectNoMsg(100.millis) + interestCallReceiver.expectMsg(OP_ACCEPT) + + // No more accepts are allowed now + interestCallReceiver.expectNoMsg(100.millis) + + listener ! ResumeAccepting(batchSize = 2) + interestCallReceiver.expectMsg(OP_ACCEPT) + + attemptConnectionToEndpoint() + listener ! ChannelAcceptable + expectWorkerForCommand + selectorRouter.expectNoMsg(100.millis) + // There is still one token remaining, accepting + interestCallReceiver.expectMsg(OP_ACCEPT) + + attemptConnectionToEndpoint() + listener ! ChannelAcceptable + expectWorkerForCommand + selectorRouter.expectNoMsg(100.millis) + + // Tokens are depleted now + interestCallReceiver.expectNoMsg(100.millis) + } + + "react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup(pullMode = false) { bindListener() val unbindCommander = TestProbe() @@ -78,7 +112,7 @@ class TcpListenerSpec extends AkkaSpec(""" parent.expectTerminated(listener) } - "drop an incoming connection if it cannot be registered with a selector" in new TestSetup { + "drop an incoming connection if it cannot be registered with a selector" in new TestSetup(pullMode = false) { bindListener() attemptConnectionToEndpoint() @@ -95,7 +129,7 @@ class TcpListenerSpec extends AkkaSpec(""" val counter = Iterator.from(0) - class TestSetup { + class TestSetup(pullMode: Boolean) { val handler = TestProbe() val handlerRef = handler.ref val bindCommander = TestProbe() @@ -106,9 +140,9 @@ class TcpListenerSpec extends AkkaSpec(""" var registerCallReceiver = TestProbe() var interestCallReceiver = TestProbe() - private val parentRef = TestActorRef(new ListenerParent) + private val parentRef = TestActorRef(new ListenerParent(pullMode)) - registerCallReceiver.expectMsg(OP_ACCEPT) + registerCallReceiver.expectMsg(if (pullMode) 0 else OP_ACCEPT) def bindListener() { listener ! new ChannelRegistration { @@ -130,10 +164,10 @@ class TcpListenerSpec extends AkkaSpec(""" chan } - private class ListenerParent extends Actor with ChannelRegistry { + private class ListenerParent(pullMode: Boolean) extends Actor with ChannelRegistry { val listener = context.actorOf( props = Props(classOf[TcpListener], selectorRouter.ref, Tcp(system), this, bindCommander.ref, - Bind(handler.ref, endpoint, 100, Nil)).withDeploy(Deploy.local), + Bind(handler.ref, endpoint, 100, Nil, pullMode)).withDeploy(Deploy.local), name = "test-listener-" + counter.next()) parent.watch(listener) def receive: Receive = { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 3154936994..de8a2b5e2b 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -113,7 +113,8 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil, - timeout: Option[FiniteDuration] = None) extends Command + timeout: Option[FiniteDuration] = None, + pullMode: Boolean = false) extends Command /** * The Bind message is send to the TCP manager actor, which is obtained via @@ -137,7 +138,8 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { case class Bind(handler: ActorRef, localAddress: InetSocketAddress, backlog: Int = 100, - options: immutable.Traversable[SocketOption] = Nil) extends Command + options: immutable.Traversable[SocketOption] = Nil, + pullMode: Boolean = false) extends Command /** * This message must be sent to a TCP connection actor after receiving the @@ -392,6 +394,13 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { */ case object ResumeReading extends Command + /** + * This message enables the accepting of the next connection if read throttling is enabled + * for connection actors. + * @param batchSize The number of connections to accept before waiting for the next resume command + */ + case class ResumeAccepting(batchSize: Int) extends Command + /// EVENTS /** * Common interface for all events generated by the TCP layer actors. @@ -608,35 +617,19 @@ object TcpMessage { * @param localAddress optionally specifies a specific address to bind to * @param options Please refer to [[TcpSO]] for a list of all supported options. * @param timeout is the desired connection timeout, `null` means "no timeout" + * @param pullMode enables pull based reading from the connection */ def connect(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, options: JIterable[SocketOption], - timeout: FiniteDuration): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout)) + timeout: FiniteDuration, + pullMode: Boolean): Command = Connect(remoteAddress, Option(localAddress), options, Option(timeout), pullMode) - /** - * Connect to the given `remoteAddress` with an optional `localAddress` to bind to given the specified Socket Options - */ - def connect(remoteAddress: InetSocketAddress, - localAddress: InetSocketAddress, - options: JIterable[SocketOption]): Command = Connect(remoteAddress, Option(localAddress), options, None) - - /** - * Connect to the given `remoteAddress` without binding to a local address. - */ - def connect(remoteAddress: InetSocketAddress, - options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options, None) /** * Connect to the given `remoteAddress` without binding to a local address and without * specifying options. */ - def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil, None) - - /** - * Connect to the given `remoteAddress` with a connection `timeout` without binding to a local address and without - * specifying options. - */ - def connect(remoteAddress: InetSocketAddress, timeout: FiniteDuration): Command = Connect(remoteAddress, None, Nil, Option(timeout)) + def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil, None, pullMode = false) /** * The Bind message is send to the TCP manager actor, which is obtained via @@ -656,11 +649,15 @@ object TcpMessage { * kernel will hold for this port before refusing connections. * * @param options Please refer to [[TcpSO]] for a list of all supported options. + * + * @param pullMode enables pull based accepting and of connections and pull + * based reading from the accepted connections. */ def bind(handler: ActorRef, endpoint: InetSocketAddress, backlog: Int, - options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options) + options: JIterable[SocketOption], + pullMode: Boolean): Command = Bind(handler, endpoint, backlog, options, pullMode) /** * Open a listening socket without specifying options. */ @@ -789,6 +786,13 @@ object TcpMessage { */ def resumeReading: Command = ResumeReading + /** + * This message enables the accepting of the next connection if pull reading is enabled + * for connection actors. + * @param batchSize The number of connections to accept before waiting for the next resume command + */ + def resumeAccepting(batchSize: Int): Command = ResumeAccepting(batchSize) + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { akka.japi.Util.immutableSeq(coll) } diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 7c5e0d8f80..6bce5e0dd2 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -25,7 +25,7 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } * * INTERNAL API */ -private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel) +private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketChannel, val pullMode: Boolean) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import tcp.Settings._ @@ -35,7 +35,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private[this] var pendingWrite: PendingWrite = EmptyPendingWrite private[this] var peerClosed = false private[this] var writingSuspended = false - private[this] var readingSuspended = false + private[this] var readingSuspended = pullMode private[this] var interestedInResume: Option[ActorRef] = None var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop @@ -55,7 +55,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (TraceLogging) log.debug("[{}] registered as connection handler", handler) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) - doRead(info, None) // immediately try reading + if (!pullMode) doRead(info, None) // immediately try reading context.setReceiveTimeout(Duration.Undefined) context.become(connected(info)) @@ -215,8 +215,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha val buffer = bufferPool.acquire() try innerRead(buffer, ReceivedMessageSizeLimit) match { - case AllRead ⇒ info.registration.enableInterest(OP_READ) - case MoreDataWaiting ⇒ self ! ChannelReadable + case AllRead ⇒ + if (!pullMode) info.registration.enableInterest(OP_READ) + case MoreDataWaiting ⇒ + if (!pullMode) self ! ChannelReadable case EndOfStream if channel.socket.isOutputShutdown ⇒ if (TraceLogging) log.debug("Read returned end-of-stream, our side already closed") doCloseConnection(info.handler, closeCommander, ConfirmedClosed) diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index a63163327a..c4470fe908 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -19,8 +19,9 @@ private[io] class TcpIncomingConnection(_tcp: TcpExt, _channel: SocketChannel, registry: ChannelRegistry, bindHandler: ActorRef, - options: immutable.Traversable[SocketOption]) - extends TcpConnection(_tcp, _channel) { + options: immutable.Traversable[SocketOption], + readThrottling: Boolean) + extends TcpConnection(_tcp, _channel, readThrottling) { context.watch(bindHandler) // sign death pact diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index c18bd8a310..a4b32c6f3d 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -44,6 +44,8 @@ private[io] class TcpListener(selectorRouter: ActorRef, val channel = ServerSocketChannel.open channel.configureBlocking(false) + var acceptLimit = if (bind.pullMode) 0 else BatchAcceptLimit + val localAddress = try { val socket = channel.socket @@ -53,7 +55,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, case isa: InetSocketAddress ⇒ isa case x ⇒ throw new IllegalArgumentException(s"bound to unknown SocketAddress [$x]") } - channelRegistry.register(channel, SelectionKey.OP_ACCEPT) + channelRegistry.register(channel, if (bind.pullMode) 0 else SelectionKey.OP_ACCEPT) log.debug("Successfully bound to {}", ret) ret } catch { @@ -73,7 +75,12 @@ private[io] class TcpListener(selectorRouter: ActorRef, def bound(registration: ChannelRegistration): Receive = { case ChannelAcceptable ⇒ - acceptAllPending(registration, BatchAcceptLimit) + acceptLimit = acceptAllPending(registration, acceptLimit) + if (acceptLimit > 0) registration.enableInterest(SelectionKey.OP_ACCEPT) + + case ResumeAccepting(batchSize) ⇒ + acceptLimit = batchSize + registration.enableInterest(SelectionKey.OP_ACCEPT) case FailedRegisterIncoming(socketChannel) ⇒ log.warning("Could not register incoming connection since selector capacity limit is reached, closing connection") @@ -90,7 +97,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, context.stop(self) } - @tailrec final def acceptAllPending(registration: ChannelRegistration, limit: Int): Unit = { + @tailrec final def acceptAllPending(registration: ChannelRegistration, limit: Int): Int = { val socketChannel = if (limit > 0) { try channel.accept() @@ -102,10 +109,10 @@ private[io] class TcpListener(selectorRouter: ActorRef, log.debug("New connection accepted") socketChannel.configureBlocking(false) def props(registry: ChannelRegistry) = - Props(classOf[TcpIncomingConnection], tcp, socketChannel, registry, bind.handler, bind.options) + Props(classOf[TcpIncomingConnection], tcp, socketChannel, registry, bind.handler, bind.options, bind.pullMode) selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, props) acceptAllPending(registration, limit - 1) - } else registration.enableInterest(SelectionKey.OP_ACCEPT) + } else if (bind.pullMode) limit else BatchAcceptLimit } override def postStop() { diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 41bf35cb82..ba4140facc 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -24,7 +24,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, channelRegistry: ChannelRegistry, commander: ActorRef, connect: Connect) - extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel]) { + extends TcpConnection(_tcp, SocketChannel.open().configureBlocking(false).asInstanceOf[SocketChannel], connect.pullMode) { import connect._ diff --git a/akka-docs/rst/java/code/docs/io/IODocTest.java b/akka-docs/rst/java/code/docs/io/IODocTest.java index a74852f644..bad60cf23f 100644 --- a/akka-docs/rst/java/code/docs/io/IODocTest.java +++ b/akka-docs/rst/java/code/docs/io/IODocTest.java @@ -44,7 +44,7 @@ public class IODocTest { 1234); final List options = new ArrayList(); options.add(TcpSO.keepAlive(true)); - tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf()); + tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options, null, false), getSelf()); //#connect-with-options } else //#connected @@ -80,7 +80,7 @@ public class IODocTest { 1234); final List options = new ArrayList(); options.add(TcpSO.reuseAddress(true)); - tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf()); + tcp.tell(TcpMessage.bind(handler, localAddr, 10, options, false), getSelf()); //#bind } } diff --git a/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java b/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java new file mode 100644 index 0000000000..0b69e72ba8 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/JavaReadBackPressure.java @@ -0,0 +1,93 @@ +package docs.io; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.io.Inet; +import akka.io.Tcp; +import akka.io.TcpMessage; +import akka.util.ByteString; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +public class JavaReadBackPressure { + + static public class Listener extends UntypedActor { + ActorRef tcp; + ActorRef listener; + + @Override + //#pull-accepting + public void onReceive(Object message) throws Exception { + if (message instanceof Tcp.Bound) { + listener = getSender(); + // Accept connections one by one + listener.tell(TcpMessage.resumeAccepting(1), getSelf()); + } else if (message instanceof Tcp.Connected) { + ActorRef handler = getContext().actorOf(Props.create(PullEcho.class, getSender())); + getSender().tell(TcpMessage.register(handler), getSelf()); + // Resume accepting connections + listener.tell(TcpMessage.resumeAccepting(1), getSelf()); + } + } + //#pull-accepting + + @Override + public void preStart() throws Exception { + //#pull-mode-bind + tcp = Tcp.get(getContext().system()).manager(); + final List options = new ArrayList(); + tcp.tell( + TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100, options, true), + getSelf() + ); + //#pull-mode-bind + } + + private void demonstrateConnect() { + //#pull-mode-connect + final List options = new ArrayList(); + tcp.tell( + TcpMessage.connect(new InetSocketAddress("localhost", 3000), null, options, null, true), + getSelf() + ); + //#pull-mode-connect + } + } + + static public class Ack implements Tcp.Event { + } + + static public class PullEcho extends UntypedActor { + final ActorRef connection; + + public PullEcho(ActorRef connection) { + this.connection = connection; + } + + //#pull-reading-echo + @Override + public void preStart() throws Exception { + connection.tell(TcpMessage.resumeReading(), getSelf()); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Tcp.Received) { + ByteString data = ((Tcp.Received) message).data(); + connection.tell(TcpMessage.write(data, new Ack()), getSelf()); + } else if (message instanceof Ack) { + connection.tell(TcpMessage.resumeReading(), getSelf()); + } + } + //#pull-reading-echo + + + } + +} diff --git a/akka-docs/rst/java/io-tcp.rst b/akka-docs/rst/java/io-tcp.rst index ce4a7cdee0..11b0f58606 100644 --- a/akka-docs/rst/java/io-tcp.rst +++ b/akka-docs/rst/java/io-tcp.rst @@ -160,8 +160,9 @@ Throttling Reads and Writes The basic model of the TCP connection actor is that it has no internal buffering (i.e. it can only process one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion -needs to be handled at the user level, for which there are three modes of -operation: +needs to be handled at the user level, for both writes and reads. + +For back-pressuring writes there are three modes of operation * *ACK-based:* every :class:`Write` command carries an arbitrary object, and if this object is not ``Tcp.NoAck`` then it will be returned to the sender of @@ -186,18 +187,32 @@ operation: :class:`WritingResumed` signal then every message is delivered exactly once to the network socket. -These models (with the exception of the second which is rather specialised) are +These write models (with the exception of the second which is rather specialised) are demonstrated in complete examples below. The full and contiguous source is available `on github <@github@/akka-docs/rst/java/code/docs/io/japi>`_. +For back-pressuring reads there are two modes of operation + +* *Push-reading:* in this mode the connection actor sends the registered reader actor + incoming data as soon as available as :class:`Received` events. Whenever the reader actor + wants to signal back-pressure to the remote TCP endpoint it can send a :class:`SuspendReading` + message to the connection actor to indicate that it wants to suspend the + reception of new data. No :class:`Received` events will arrive until a corresponding + :class:`ResumeReading` is sent indicating that the receiver actor is ready again. + +* *Pull-reading:* after sending a :class:`Received` event the connection + actor automatically suspends accepting data from the socket until the reader actor signals + with a :class:`ResumeReading` message that it is ready to process more input data. Hence + new data is "pulled" from the connection by sending :class:`ResumeReading` messages. + .. note:: It should be obvious that all these flow control schemes only work between - one writer and one connection actor; as soon as multiple actors send write + one writer/reader and one connection actor; as soon as multiple actors send write commands to a single connection no consistent result can be achieved. -ACK-Based Back-Pressure ------------------------ +ACK-Based Write Back-Pressure +----------------------------- For proper function of the following example it is important to configure the connection to remain half-open when the remote side closed its writing end: @@ -236,7 +251,7 @@ the remote side from writing, filling up its write buffer, until finally the writer on the other side cannot push any data into the socket anymore. This is how end-to-end back-pressure is realized across a TCP connection. -NACK-Based Back-Pressure with Write Suspending +NACK-Based Write Back-Pressure with Suspending ---------------------------------------------- .. includecode:: code/docs/io/japi/EchoHandler.java#echo-handler @@ -270,3 +285,56 @@ behavior to await the :class:`WritingResumed` event and start over. The helper functions are very similar to the ACK-based case: .. includecode:: code/docs/io/japi/EchoHandler.java#helpers + +Read Back-Pressure with Pull Mode +--------------------------------- + +When using push based reading, data coming from the socket is sent to the actor as soon +as it is available. In the case of the previous Echo server example +this meant that we needed to maintain a buffer of incoming data to keep it around +since the rate of writing might be slower than the rate of the arrival of new data. + +With the Pull mode this buffer can be completely eliminated as the following snippet +demonstrates: + +.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-reading-echo + +The idea here is that reading is not resumed until the previous write has been +completely acknowledged by the connection actor. Every pull mode connection +actor starts from suspended state. To start the flow of data we send a +``ResumeReading`` in the ``preStart`` method to tell the connection actor that +we are ready to receive the first chunk of data. Since we only resume reading when +the previous data chunk has been completely written there is no need for maintaining +a buffer. + +To enable pull reading on an outbound connection the ``pullMode`` parameter of +the :class:`Connect` should be set to ``true``: + +.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-mode-connect + +Pull Mode Reading for Inbound Connections +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The previous section demonstrated how to enable pull reading mode for outbound +connections but it is possible to create a listener actor with this mode of reading +by setting the ``pullMode`` parameter of the :class:`Bind` command to ``true``: + +.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-mode-bind + +One of the effects of this setting is that all connections accepted by this listener +actor will use pull mode reading. + +Another effect of this setting is that in addition of setting all inbound connections to +pull mode, accepting connections becomes pull based, too. This means that after handling +one (or more) :class:`Connected` events the listener actor has to be resumed by sending +it a :class:`ResumeAccepting` message. + +Listener actors with pull mode start suspended so to start accepting connections +a :class:`ResumeAccepting` command has to be sent to the listener actor after binding was successful: + +.. includecode:: code/docs/io/JavaReadBackPressure.java#pull-accepting + +As shown in the example after handling an incoming connection we need to resume accepting again. +The :class:`ResumeAccepting` message accepts a ``batchSize`` parameter that specifies how +many new connections are accepted before a next :class:`ResumeAccepting` message +is needed to resume handling of new connections. diff --git a/akka-docs/rst/scala/code/docs/io/EchoServer.scala b/akka-docs/rst/scala/code/docs/io/EchoServer.scala index 54082c4f26..bc4cbedbaa 100644 --- a/akka-docs/rst/scala/code/docs/io/EchoServer.scala +++ b/akka-docs/rst/scala/code/docs/io/EchoServer.scala @@ -56,7 +56,7 @@ class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging { case Bound(localAddress) => log.info("listening on port {}", localAddress.getPort) - case CommandFailed(Bind(_, local, _, _)) => + case CommandFailed(Bind(_, local, _, _, _)) => log.warning(s"cannot bind to [$local]") context stop self diff --git a/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala b/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala new file mode 100644 index 0000000000..484baaf689 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/io/ReadBackPressure.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package docs.io + +import akka.actor.{ ActorRef, ActorLogging, Props, Actor, ActorSystem } +import akka.io.Tcp._ +import akka.io.{ Tcp, IO } +import java.net.InetSocketAddress +import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec } +import akka.util.ByteString + +object PullReadingExample { + + class Listener(monitor: ActorRef) extends Actor { + + import context.system + + override def preStart: Unit = + //#pull-mode-bind + IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0), pullMode = true) + //#pull-mode-bind + + def receive = { + //#pull-accepting + case Bound(localAddress) => + // Accept connections one by one + sender ! ResumeAccepting(batchSize = 1) + context.become(listening(sender)) + //#pull-accepting + monitor ! localAddress + } + + //#pull-accepting-cont + def listening(listener: ActorRef): Receive = { + case Connected(remote, local) => + val handler = context.actorOf(Props(classOf[PullEcho], sender)) + sender ! Register(handler, keepOpenOnPeerClosed = true) + listener ! ResumeAccepting(batchSize = 1) + } + //#pull-accepting-cont + + } + + case object Ack extends Event + + class PullEcho(connection: ActorRef) extends Actor { + + //#pull-reading-echo + override def preStart: Unit = connection ! ResumeReading + + def receive = { + case Received(data) => connection ! Write(data, Ack) + case Ack => connection ! ResumeReading + } + //#pull-reading-echo + } + +} + +class PullReadingSpec extends AkkaSpec with ImplicitSender { + + "demonstrate pull reading" in { + val probe = TestProbe() + system.actorOf(Props(classOf[PullReadingExample.Listener], probe.ref), "server") + val listenAddress = probe.expectMsgType[InetSocketAddress] + + //#pull-mode-connect + IO(Tcp) ! Connect(listenAddress, pullMode = true) + //#pull-mode-connect + expectMsgType[Connected] + val connection = lastSender + + val client = TestProbe() + client.send(connection, Register(client.ref)) + client.send(connection, Write(ByteString("hello"))) + client.send(connection, ResumeReading) + client.expectMsg(Received(ByteString("hello"))) + + system.shutdown() + system.awaitTermination + } +} diff --git a/akka-docs/rst/scala/io-tcp.rst b/akka-docs/rst/scala/io-tcp.rst index 1b69cd11d3..0c5454c4f4 100644 --- a/akka-docs/rst/scala/io-tcp.rst +++ b/akka-docs/rst/scala/io-tcp.rst @@ -161,8 +161,9 @@ Throttling Reads and Writes The basic model of the TCP connection actor is that it has no internal buffering (i.e. it can only process one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion -needs to be handled at the user level, for which there are three modes of -operation: +needs to be handled at the user level, for both writes and reads. + +For back-pressuring writes there are three modes of operation * *ACK-based:* every :class:`Write` command carries an arbitrary object, and if this object is not ``Tcp.NoAck`` then it will be returned to the sender of @@ -187,18 +188,32 @@ operation: :class:`WritingResumed` signal then every message is delivered exactly once to the network socket. -These models (with the exception of the second which is rather specialised) are +These write back-pressure models (with the exception of the second which is rather specialised) are demonstrated in complete examples below. The full and contiguous source is available `on github <@github@/akka-docs/rst/scala/code/docs/io/EchoServer.scala>`_. +For back-pressuring reads there are two modes of operation + +* *Push-reading:* in this mode the connection actor sends the registered reader actor + incoming data as soon as available as :class:`Received` events. Whenever the reader actor + wants to signal back-pressure to the remote TCP endpoint it can send a :class:`SuspendReading` + message to the connection actor to indicate that it wants to suspend the + reception of new data. No :class:`Received` events will arrive until a corresponding + :class:`ResumeReading` is sent indicating that the receiver actor is ready again. + +* *Pull-reading:* after sending a :class:`Received` event the connection + actor automatically suspends accepting data from the socket until the reader actor signals + with a :class:`ResumeReading` message that it is ready to process more input data. Hence + new data is "pulled" from the connection by sending :class:`ResumeReading` messages. + .. note:: It should be obvious that all these flow control schemes only work between - one writer and one connection actor; as soon as multiple actors send write + one writer/reader and one connection actor; as soon as multiple actors send write commands to a single connection no consistent result can be achieved. -ACK-Based Back-Pressure ------------------------ +ACK-Based Write Back-Pressure +----------------------------- For proper function of the following example it is important to configure the connection to remain half-open when the remote side closed its writing end: @@ -237,7 +252,7 @@ the remote side from writing, filling up its write buffer, until finally the writer on the other side cannot push any data into the socket anymore. This is how end-to-end back-pressure is realized across a TCP connection. -NACK-Based Back-Pressure with Write Suspending +NACK-Based Write Back-Pressure with Suspending ---------------------------------------------- .. includecode:: code/docs/io/EchoServer.scala#echo-handler @@ -271,3 +286,60 @@ behavior to await the :class:`WritingResumed` event and start over. The helper functions are very similar to the ACK-based case: .. includecode:: code/docs/io/EchoServer.scala#helpers + +Read Back-Pressure with Pull Mode +--------------------------------- + +When using push based reading, data coming from the socket is sent to the actor as soon +as it is available. In the case of the previous Echo server example +this meant that we needed to maintain a buffer of incoming data to keep it around +since the rate of writing might be slower than the rate of the arrival of new data. + +With the Pull mode this buffer can be completely eliminated as the following snippet +demonstrates: + +.. includecode:: code/docs/io/ReadBackPressure.scala#pull-reading-echo + +The idea here is that reading is not resumed until the previous write has been +completely acknowledged by the connection actor. Every pull mode connection +actor starts from suspended state. To start the flow of data we send a +``ResumeReading`` in the ``preStart`` method to tell the connection actor that +we are ready to receive the first chunk of data. Since we only resume reading when +the previous data chunk has been completely written there is no need for maintaining +a buffer. + +To enable pull reading on an outbound connection the ``pullMode`` parameter of +the :class:`Connect` should be set to ``true``: + +.. includecode:: code/docs/io/ReadBackPressure.scala#pull-mode-connect + +Pull Mode Reading for Inbound Connections +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The previous section demonstrated how to enable pull reading mode for outbound +connections but it is possible to create a listener actor with this mode of reading +by setting the ``pullMode`` parameter of the :class:`Bind` command to ``true``: + +.. includecode:: code/docs/io/ReadBackPressure.scala#pull-mode-bind + +One of the effects of this setting is that all connections accepted by this listener +actor will use pull mode reading. + +Another effect of this setting is that in addition of setting all inbound connections to +pull mode, accepting connections becomes pull based, too. This means that after handling +one (or more) :class:`Connected` events the listener actor has to be resumed by sending +it a :class:`ResumeAccepting` message. + +Listener actors with pull mode start suspended so to start accepting connections +a :class:`ResumeAccepting` command has to be sent to the listener actor after binding was successful: + +.. includecode:: code/docs/io/ReadBackPressure.scala#pull-accepting + +After handling an incoming connection we need to resume accepting again: + +.. includecode:: code/docs/io/ReadBackPressure.scala#pull-accepting-cont + +The :class:`ResumeAccepting` accepts a ``batchSize`` parameter that specifies how +many new connections are accepted before a next :class:`ResumeAccepting` message +is needed to resume handling of new connections. +