diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index c3c3b4b63d..cdfa9019f0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -33,9 +33,24 @@ class ActorDSLSpec extends AkkaSpec { "An Inbox" must { "function as implicit sender" in { + //#inbox implicit val i = inbox() echo ! "hello" i.receive() must be("hello") + //#inbox + } + + "support watch" in { + //#watch + val target = // some actor + //#watch + actor(new Act {}) + //#watch + val i = inbox() + i watch target + //#watch + target ! PoisonPill + i receive 1.second must be(Terminated(target)(true, false)) } "support queueing multiple queries" in { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index cdce29a1ac..e898131926 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -274,7 +274,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "respect StopReading and ResumeReading" in withEstablishedConnection() { setup ⇒ import setup._ - connectionHandler.send(connectionActor, StopReading) + connectionHandler.send(connectionActor, SuspendReading) // the selector interprets StopReading to deregister interest // for reading @@ -553,6 +553,135 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") deaths must be(Set(connectionHandler.ref, connectionActor)) } } + + "support ResumeWriting (backed up)" in withEstablishedConnection() { setup ⇒ + import setup._ + + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + writer.msgAvailable must be(false) + + // writes must fail now + writer.send(connectionActor, write) + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // resuming must not immediately work (queue still full) + writer.send(connectionActor, ResumeWriting) + writer.expectNoMsg(1.second) + + // so drain the queue until it works again + while (!writer.msgAvailable) pullFromServerSide(TestSize) + writer.expectMsg(Duration.Zero, WritingResumed) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") + } + + "support ResumeWriting (queue flushed)" in withEstablishedConnection() { setup ⇒ + import setup._ + + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + + // drain the queue until it works again + pullFromServerSide(TestSize * written) + + // writes must still fail + writer.send(connectionActor, write) + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // resuming must work immediately + writer.send(connectionActor, ResumeWriting) + writer.expectMsg(1.second, WritingResumed) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") + } + + "support useResumeWriting==false (backed up)" in withEstablishedConnection(useResumeWriting = false) { setup ⇒ + import setup._ + + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + writer.msgAvailable must be(false) + + // writes must fail now + writer.send(connectionActor, write) + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // so drain the queue until it works again + pullFromServerSide(TestSize * written) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") + } + + "support useResumeWriting==false (queue flushed)" in withEstablishedConnection(useResumeWriting = false) { setup ⇒ + import setup._ + + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + + // drain the queue until it works again + pullFromServerSide(TestSize * written) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") + } } def acceptServerSideConnection(localServer: ServerSocketChannel): SocketChannel = { @@ -580,6 +709,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") selector: TestProbe, connectionActor: TestActorRef[TcpOutgoingConnection], clientSideChannel: SocketChannel) + case class RegisteredSetup( unregisteredSetup: UnacceptedSetup, connectionHandler: TestProbe, @@ -698,6 +828,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") def interestsDesc(interests: Int): String = interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ") } + private[io] def withUnacceptedConnection( setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), connectionActorCons: (ActorRef, ActorRef) ⇒ TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup ⇒ Any): Unit = @@ -720,30 +851,33 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") clientSideChannel) } } + def withEstablishedConnection( setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), clientSocketOptions: immutable.Seq[SocketOption] = Nil, - keepOpenOnPeerClosed: Boolean = false)(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ - import unregisteredSetup._ + keepOpenOnPeerClosed: Boolean = false, + useResumeWriting: Boolean = true)(body: RegisteredSetup ⇒ Any): Unit = + withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ + import unregisteredSetup._ - val serverSideChannel = acceptServerSideConnection(localServer) - serverSideChannel.configureBlocking(false) + val serverSideChannel = acceptServerSideConnection(localServer) + serverSideChannel.configureBlocking(false) - serverSideChannel must not be (null) - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + serverSideChannel must not be (null) + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - val connectionHandler = TestProbe() - userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed)) - selector.expectMsg(ReadInterest) + val connectionHandler = TestProbe() + userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) + selector.expectMsg(ReadInterest) - body { - RegisteredSetup( - unregisteredSetup, - connectionHandler, - serverSideChannel) + body { + RegisteredSetup( + unregisteredSetup, + connectionHandler, + serverSideChannel) + } } - } val TestSize = 10000 diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 216ce062e9..216e8302fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -97,3 +97,46 @@ object ActorDSL extends dsl.Inbox with dsl.Creators { } } + +/** + * An Inbox is an actor-like object which is interrogated from the outside. + * It contains an actor whose reference can be passed to other actors as + * usual and it can watch other actors’ lifecycle. + */ +abstract class Inbox { + + /** + * Receive the next message from this Inbox. This call will return immediately + * if the internal actor previously received a message, or it will block for + * up to the specified duration to await reception of a message. If no message + * is received a [[TimeoutException]] will be raised. + */ + def receive(max: FiniteDuration): Any + + /** + * Have the internal actor watch the target actor. When the target actor + * terminates a [[Terminated]] message will be received. + */ + def watch(target: ActorRef): Unit + + /** + * Obtain a reference to the internal actor, which can then for example be + * registered with the event stream or whatever else you may want to do with + * an [[ActorRef]]. + */ + def getRef(): ActorRef + + /** + * Have the internal actor act as the sender of the given message which will + * be sent to the given target. This means that should the target actor reply + * then those replies will be received by this Inbox. + */ + def send(target: ActorRef, msg: AnyRef): Unit +} + +object Inbox { + /** + * Create a new Inbox within the given system. + */ + def create(system: ActorSystem): Inbox = ActorDSL.inbox()(system) +} diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 46a4f53af5..642d095961 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -43,6 +43,7 @@ trait Inbox { this: ActorDSL.type ⇒ private case class Select(deadline: Deadline, predicate: PartialFunction[Any, Any], client: ActorRef = null) extends Query { def withClient(c: ActorRef) = copy(client = c) } + private case class StartWatch(target: ActorRef) private case object Kick private implicit val deadlineOrder: Ordering[Query] = new Ordering[Query] { def compare(left: Query, right: Query): Int = left.deadline.time compare right.deadline.time @@ -96,6 +97,7 @@ trait Inbox { this: ActorDSL.type ⇒ } currentSelect = null } + case StartWatch(target) ⇒ context watch target case Kick ⇒ val now = Deadline.now val pred = (q: Query) ⇒ q.deadline.time < now.time @@ -112,7 +114,7 @@ trait Inbox { this: ActorDSL.type ⇒ else { currentMsg = msg clients.dequeueFirst(clientPredicate) match { - case Some(q) ⇒ clientsByTimeout -= q; q.client ! msg + case Some(q) ⇒ { clientsByTimeout -= q; q.client ! msg } case None ⇒ enqueueMessage(msg) } currentMsg = null @@ -151,9 +153,14 @@ trait Inbox { this: ActorDSL.type ⇒ */ def inbox()(implicit system: ActorSystem): Inbox = new Inbox(system) - class Inbox(system: ActorSystem) { + class Inbox(system: ActorSystem) extends akka.actor.Inbox { val receiver: ActorRef = Extension(system).newReceiver + + // Java API + def getRef: ActorRef = receiver + def send(target: ActorRef, msg: AnyRef): Unit = target.tell(msg, receiver) + private val defaultTimeout: FiniteDuration = Extension(system).DSLDefaultTimeout /** @@ -188,6 +195,12 @@ trait Inbox { this: ActorDSL.type ⇒ predicate(Await.result(receiver ? Select(Deadline.now + timeout, predicate), Duration.Inf)) } + /** + * Make the inbox’s actor watch the target actor such that reception of the + * Terminated message can then be awaited. + */ + def watch(target: ActorRef): Unit = receiver ! StartWatch(target) + /** * Overridden finalizer which will try to stop the actor once this Inbox * is no longer referenced. diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index aa5d71adcd..ba5bc51529 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -17,7 +17,9 @@ import java.lang.{ Iterable ⇒ JIterable } object Tcp extends ExtensionKey[TcpExt] { - // Java API + /** + * Java API: retrieve Tcp extension for the given system. + */ override def get(system: ActorSystem): TcpExt = super.get(system) // shared socket options @@ -62,10 +64,20 @@ object Tcp extends ExtensionKey[TcpExt] { } /// COMMANDS + + /** + * This is the common trait for all commands understood by TCP actors. + */ trait Command extends IO.HasFailureMessage { def failureMessage = CommandFailed(this) } + /** + * The Connect message is sent to the [[TcpManager]], which is obtained via + * [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]] + * or the actor handling the new connection replies with a [[Connected]] + * message. + */ case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command @@ -74,7 +86,7 @@ object Tcp extends ExtensionKey[TcpExt] { backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command - case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false) extends Command + case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) extends Command case object Unbind extends Command sealed trait CloseCommand extends Command { @@ -131,7 +143,9 @@ object Tcp extends ExtensionKey[TcpExt] { require(count > 0, "WriteFile.count must be > 0") } - case object StopReading extends Command + case object ResumeWriting extends Command + + case object SuspendReading extends Command case object ResumeReading extends Command /// EVENTS @@ -141,6 +155,9 @@ object Tcp extends ExtensionKey[TcpExt] { case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event + sealed trait WritingResumed extends Event + case object WritingResumed extends WritingResumed + case class Bound(localAddress: InetSocketAddress) extends Event sealed trait Unbound extends Event case object Unbound extends Unbound @@ -209,6 +226,11 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { name = "IO-TCP") } + /** + * Java API: retrieve a reference to the manager actor. + */ + def getManager: ActorRef = manager + val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher) } @@ -240,7 +262,8 @@ object TcpMessage { backlog: Int): Command = Bind(handler, endpoint, backlog, Nil) def register(handler: ActorRef): Command = Register(handler) - def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean): Command = Register(handler, keepOpenOnPeerClosed) + def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, useResumeWriting: Boolean): Command = + Register(handler, keepOpenOnPeerClosed, useResumeWriting) def unbind: Command = Unbind def close: Command = Close @@ -253,9 +276,11 @@ object TcpMessage { def write(data: ByteString): Command = Write(data) def write(data: ByteString, ack: AnyRef): Command = Write(data, ack) - def stopReading: Command = StopReading + def suspendReading: Command = SuspendReading def resumeReading: Command = ResumeReading + def resumeWriting: Command = ResumeWriting + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { import scala.collection.JavaConverters._ coll.asScala.to diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 602c196711..addae22ec9 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -44,7 +44,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection established, waiting for registration from user handler */ def waitingForRegistration(commander: ActorRef): Receive = { - case Register(handler, keepOpenOnPeerClosed) ⇒ + case Register(handler, keepOpenOnPeerClosed, useResumeWriting) ⇒ // up to this point we've been watching the commander, // but since registration is now complete we only need to watch the handler from here on if (handler != commander) { @@ -53,6 +53,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } if (TraceLogging) log.debug("[{}] registered as connection handler", handler) this.keepOpenOnPeerClosed = keepOpenOnPeerClosed + this.useResumeWriting = useResumeWriting doRead(handler, None) // immediately try reading context.setReceiveTimeout(Duration.Undefined) @@ -70,7 +71,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** normal connected state */ def connected(handler: ActorRef): Receive = handleWriteMessages(handler) orElse { - case StopReading ⇒ selector ! DisableReadInterest + case SuspendReading ⇒ selector ! DisableReadInterest case ResumeReading ⇒ selector ! ReadInterest case ChannelReadable ⇒ doRead(handler, None) @@ -84,7 +85,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection is closing but a write has to be finished first */ def closingWithPendingWrite(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Receive = { - case StopReading ⇒ selector ! DisableReadInterest + case SuspendReading ⇒ selector ! DisableReadInterest case ResumeReading ⇒ selector ! ReadInterest case ChannelReadable ⇒ doRead(handler, closeCommander) @@ -101,26 +102,61 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, /** connection is closed on our side and we're waiting from confirmation from the other side */ def closing(handler: ActorRef, closeCommander: Option[ActorRef]): Receive = { - case StopReading ⇒ selector ! DisableReadInterest + case SuspendReading ⇒ selector ! DisableReadInterest case ResumeReading ⇒ selector ! ReadInterest case ChannelReadable ⇒ doRead(handler, closeCommander) case Abort ⇒ handleClose(handler, Some(sender), Aborted) } + private[this] var useResumeWriting = false + private[this] var writingSuspended = false + private[this] var interestedInResume: Option[ActorRef] = None + def handleWriteMessages(handler: ActorRef): Receive = { - case ChannelWritable ⇒ if (writePending) doWrite(handler) - - case write: WriteCommand if writePending ⇒ - if (TraceLogging) log.debug("Dropping write because queue is full") - sender ! write.failureMessage - - case write: Write if write.data.isEmpty ⇒ - if (write.wantsAck) - sender ! write.ack + case ChannelWritable ⇒ + if (writePending) { + doWrite(handler) + if (!writePending && interestedInResume.nonEmpty) { + interestedInResume.get ! WritingResumed + interestedInResume = None + } + } case write: WriteCommand ⇒ - pendingWrite = createWrite(write) - doWrite(handler) + if (writingSuspended) { + if (TraceLogging) log.debug("Dropping write because writing is suspended") + sender ! write.failureMessage + + } else if (writePending) { + if (TraceLogging) log.debug("Dropping write because queue is full") + sender ! write.failureMessage + if (useResumeWriting) writingSuspended = true + + } else write match { + case Write(data, ack) if data.isEmpty ⇒ + if (ack != NoAck) sender ! ack + + case _ ⇒ + pendingWrite = createWrite(write) + doWrite(handler) + } + + case ResumeWriting ⇒ + /* + * If more than one actor sends Writes then the first to send this + * message might resume too early for the second, leading to a Write of + * the second to go through although it has not been resumed yet; there + * is nothing we can do about this apart from all actors needing to + * register themselves and us keeping track of them, which sounds bad. + * + * Thus it is documented that useResumeWriting is incompatible with + * multiple writers. But we fail as gracefully as we can. + */ + writingSuspended = false + if (writePending) { + if (interestedInResume.isEmpty) interestedInResume = Some(sender) + else sender ! CommandFailed(ResumeWriting) + } else sender ! WritingResumed case SendBufferFull(remaining) ⇒ { pendingWrite = remaining; selector ! WriteInterest } case WriteFileFinished ⇒ pendingWrite = null diff --git a/akka-docs/rst/java/code/docs/actor/InboxDocTest.java b/akka-docs/rst/java/code/docs/actor/InboxDocTest.java new file mode 100644 index 0000000000..900bee998a --- /dev/null +++ b/akka-docs/rst/java/code/docs/actor/InboxDocTest.java @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.actor; + +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.concurrent.duration.Duration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Inbox; +import akka.actor.PoisonPill; +import akka.actor.Terminated; +import akka.testkit.AkkaSpec; +import akka.testkit.JavaTestKit; + +public class InboxDocTest { + + private static ActorSystem system; + + @BeforeClass + public static void beforeAll() { + system = ActorSystem.create("MySystem", AkkaSpec.testConf()); + } + + @AfterClass + public static void afterAll() { + system.shutdown(); + system = null; + } + + @Test + public void demonstrateInbox() { + final JavaTestKit probe = new JavaTestKit(system); + final ActorRef target = probe.getRef(); + //#inbox + final Inbox inbox = Inbox.create(system); + inbox.send(target, "hello"); + //#inbox + probe.expectMsgEquals("hello"); + probe.send(probe.getLastSender(), "world"); + //#inbox + assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world"); + //#inbox + } + + @Test + public void demonstrateWatch() { + final JavaTestKit probe = new JavaTestKit(system); + final ActorRef target = probe.getRef(); + //#watch + final Inbox inbox = Inbox.create(system); + inbox.watch(target); + target.tell(PoisonPill.getInstance(), null); + assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated; + //#watch + } + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java b/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java new file mode 100644 index 0000000000..53afaa524b --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java @@ -0,0 +1,229 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.Queue; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.io.Tcp.CommandFailed; +import akka.io.Tcp.ConnectionClosed; +import akka.io.Tcp.Received; +import akka.io.Tcp.Write; +import akka.io.Tcp.WritingResumed; +import akka.io.TcpMessage; +import akka.japi.Procedure; +import akka.util.ByteString; + +//#echo-handler +public class EchoHandler extends UntypedActor { + + final LoggingAdapter log = Logging + .getLogger(getContext().system(), getSelf()); + + final ActorRef connection; + final InetSocketAddress remote; + + public static final long MAX_STORED = 100000000; + public static final long HIGH_WATERMARK = MAX_STORED * 5 / 10; + public static final long LOW_WATERMARK = MAX_STORED * 2 / 10; + + public EchoHandler(ActorRef connection, InetSocketAddress remote) { + this.connection = connection; + this.remote = remote; + + // sign death pact: this actor stops when the connection is closed + getContext().watch(connection); + + // start out in optimistic write-through mode + getContext().become(writing); + } + + private final Procedure writing = new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof Received) { + final ByteString data = ((Received) msg).data(); + connection.tell(TcpMessage.write(data, currentOffset()), getSelf()); + buffer(data); + + } else if (msg instanceof Integer) { + acknowledge((Integer) msg); + + } else if (msg instanceof CommandFailed) { + final Write w = (Write) ((CommandFailed) msg).cmd(); + connection.tell(TcpMessage.resumeWriting(), getSelf()); + getContext().become(buffering((Integer) w.ack())); + + } else if (msg instanceof ConnectionClosed) { + final ConnectionClosed cl = (ConnectionClosed) msg; + if (cl.isPeerClosed()) { + if (storage.isEmpty()) { + getContext().stop(getSelf()); + } else { + getContext().become(closing); + } + } + } + } + }; + + //#buffering + protected Procedure buffering(final int nack) { + return new Procedure() { + + private int toAck = 10; + private boolean peerClosed = false; + + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof Received) { + buffer(((Received) msg).data()); + + } else if (msg instanceof WritingResumed) { + writeFirst(); + + } else if (msg instanceof ConnectionClosed) { + if (((ConnectionClosed) msg).isPeerClosed()) + peerClosed = true; + else + getContext().stop(getSelf()); + + } else if (msg instanceof Integer) { + final int ack = (Integer) msg; + acknowledge(ack); + + if (ack >= nack) { + // otherwise it was the ack of the last successful write + + if (storage.isEmpty()) { + if (peerClosed) + getContext().stop(getSelf()); + else + getContext().become(writing); + + } else { + if (toAck > 0) { + // stay in ACK-based mode for a short while + writeFirst(); + --toAck; + } else { + // then return to NACK-based again + writeAll(); + if (peerClosed) + getContext().become(closing); + else + getContext().become(writing); + } + } + } + } + } + }; + } + //#buffering + + //#closing + protected Procedure closing = new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof CommandFailed) { + // the command can only have been a Write + connection.tell(TcpMessage.resumeWriting(), getSelf()); + getContext().become(closeResend, false); + } else if (msg instanceof Integer) { + acknowledge((Integer) msg); + if (storage.isEmpty()) + getContext().stop(getSelf()); + } + } + }; + + protected Procedure closeResend = new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof WritingResumed) { + writeAll(); + getContext().unbecome(); + } else if (msg instanceof Integer) { + acknowledge((Integer) msg); + } + } + }; + //#closing + + //#storage-omitted + @Override + public void onReceive(Object msg) throws Exception { + // this method is not used due to become() + } + + @Override + public void postStop() { + log.info("transferred {} bytes from/to [{}]", transferred, remote); + } + + private long transferred; + private int storageOffset = 0; + private long stored = 0; + private Queue storage = new LinkedList(); + + private boolean suspended = false; + + //#helpers + protected void buffer(ByteString data) { + storage.add(data); + stored += data.size(); + + if (stored > MAX_STORED) { + log.warning("drop connection to [{}] (buffer overrun)", remote); + getContext().stop(getSelf()); + + } else if (stored > HIGH_WATERMARK) { + log.debug("suspending reading at {}", currentOffset()); + connection.tell(TcpMessage.suspendReading(), getSelf()); + suspended = true; + } + } + + protected void acknowledge(int ack) { + assert ack == storageOffset; + assert !storage.isEmpty(); + + final ByteString acked = storage.remove(); + stored -= acked.size(); + transferred += acked.size(); + storageOffset += 1; + + if (suspended && stored < LOW_WATERMARK) { + log.debug("resuming reading"); + connection.tell(TcpMessage.resumeReading(), getSelf()); + suspended = false; + } + } + //#helpers + + protected int currentOffset() { + return storageOffset + storage.size(); + } + + protected void writeAll() { + int i = 0; + for (ByteString data : storage) { + connection.tell(TcpMessage.write(data, storageOffset + i++), getSelf()); + } + } + + protected void writeFirst() { + connection.tell(TcpMessage.write(storage.peek(), storageOffset), getSelf()); + } + + //#storage-omitted +} +//#echo-handler diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoManager.java b/akka-docs/rst/java/code/docs/io/japi/EchoManager.java new file mode 100644 index 0000000000..5670723c90 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/EchoManager.java @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.net.InetSocketAddress; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.io.Tcp; +import akka.io.Tcp.Bind; +import akka.io.Tcp.Bound; +import akka.io.Tcp.CommandFailed; +import akka.io.Tcp.Connected; +import akka.io.TcpMessage; + +public class EchoManager extends UntypedActor { + + final LoggingAdapter log = Logging + .getLogger(getContext().system(), getSelf()); + + final Class handlerClass; + + public EchoManager(Class handlerClass) { + this.handlerClass = handlerClass; + } + + @Override + public SupervisorStrategy supervisorStrategy() { + return SupervisorStrategy.stoppingStrategy(); + } + + @Override + public void preStart() throws Exception { + //#manager + final ActorRef tcpManager = Tcp.get(getContext().system()).manager(); + //#manager + tcpManager.tell( + TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100), + getSelf()); + } + + @Override + public void postRestart(Throwable arg0) throws Exception { + // do not restart + getContext().stop(getSelf()); + } + + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof Bound) { + log.info("listening on [{}]", ((Bound) msg).localAddress()); + } else if (msg instanceof Tcp.CommandFailed) { + final CommandFailed failed = (CommandFailed) msg; + if (failed.cmd() instanceof Bind) { + log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).endpoint()); + getContext().stop(getSelf()); + } else { + log.warning("unknown command failed [{}]", failed.cmd()); + } + } else + if (msg instanceof Connected) { + final Connected conn = (Connected) msg; + log.info("received connection from [{}]", conn.remoteAddress()); + final ActorRef connection = getSender(); + final ActorRef handler = getContext().actorOf( + Props.create(handlerClass, connection, conn.remoteAddress())); + //#echo-manager + connection.tell(TcpMessage.register(handler, + true, // <-- keepOpenOnPeerClosed flag + true), getSelf()); + //#echo-manager + } + } + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoServer.java b/akka-docs/rst/java/code/docs/io/japi/EchoServer.java new file mode 100644 index 0000000000..0bae9e5af7 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/EchoServer.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class EchoServer { + + public static void main(String[] args) throws InterruptedException { + final Config config = ConfigFactory.parseString("akka.loglevel=DEBUG"); + final ActorSystem system = ActorSystem.create("EchoServer", config); + try { + final CountDownLatch latch = new CountDownLatch(1); + final ActorRef watcher = system.actorOf(Props.create(Watcher.class, latch), "watcher"); + final ActorRef nackServer = system.actorOf(Props.create(EchoManager.class, EchoHandler.class), "nack"); + final ActorRef ackServer = system.actorOf(Props.create(EchoManager.class, SimpleEchoHandler.class), "ack"); + watcher.tell(nackServer, null); + watcher.tell(ackServer, null); + latch.await(10, TimeUnit.MINUTES); + } finally { + system.shutdown(); + } + } + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/IODocTest.java b/akka-docs/rst/java/code/docs/io/japi/IODocTest.java new file mode 100644 index 0000000000..fd34917a3a --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/IODocTest.java @@ -0,0 +1,179 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +//#imports +import java.net.InetSocketAddress; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.io.Tcp; +import akka.io.Tcp.Bound; +import akka.io.Tcp.CommandFailed; +import akka.io.Tcp.Connected; +import akka.io.Tcp.ConnectionClosed; +import akka.io.Tcp.Received; +import akka.io.TcpMessage; +import akka.japi.Procedure; +import akka.util.ByteString; +//#imports + +import akka.testkit.JavaTestKit; +import akka.testkit.AkkaSpec; + +public class IODocTest { + + static + //#server + public class Server extends UntypedActor { + + final ActorRef manager; + + public Server(ActorRef manager) { + this.manager = manager; + } + + @Override + public void preStart() throws Exception { + final ActorRef tcp = Tcp.get(getContext().system()).manager(); + tcp.tell(TcpMessage.bind(getSelf(), + new InetSocketAddress("localhost", 0), 100), getSelf()); + } + + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof Bound) { + manager.tell(msg, getSelf()); + + } else if (msg instanceof CommandFailed) { + getContext().stop(getSelf()); + + } else if (msg instanceof Connected) { + final Connected conn = (Connected) msg; + manager.tell(conn, getSelf()); + final ActorRef handler = getContext().actorOf( + Props.create(SimplisticHandler.class)); + getSender().tell(TcpMessage.register(handler), getSelf()); + } + } + + } + //#server + + static + //#simplistic-handler + public class SimplisticHandler extends UntypedActor { + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof Received) { + final ByteString data = ((Received) msg).data(); + System.out.println(data); + getSender().tell(TcpMessage.write(data), getSelf()); + } else if (msg instanceof ConnectionClosed) { + getContext().stop(getSelf()); + } + } + } + //#simplistic-handler + + static + //#client + public class Client extends UntypedActor { + + final InetSocketAddress remote; + final ActorRef listener; + + public Client(InetSocketAddress remote, ActorRef listener) { + this.remote = remote; + this.listener = listener; + + final ActorRef tcp = Tcp.get(getContext().system()).manager(); + tcp.tell(TcpMessage.connect(remote), getSelf()); + } + + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof CommandFailed) { + listener.tell("failed", getSelf()); + getContext().stop(getSelf()); + + } else if (msg instanceof Connected) { + listener.tell(msg, getSelf()); + getSender().tell(TcpMessage.register(getSelf()), getSelf()); + getContext().become(connected(getSender())); + } + } + + private Procedure connected(final ActorRef connection) { + return new Procedure() { + @Override + public void apply(Object msg) throws Exception { + + if (msg instanceof ByteString) { + connection.tell(TcpMessage.write((ByteString) msg), getSelf()); + + } else if (msg instanceof CommandFailed) { + // OS kernel socket buffer was full + + } else if (msg instanceof Received) { + listener.tell(((Received) msg).data(), getSelf()); + + } else if (msg.equals("close")) { + connection.tell(TcpMessage.close(), getSelf()); + + } else if (msg instanceof ConnectionClosed) { + getContext().stop(getSelf()); + } + } + }; + } + + } + //#client + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("IODocTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + @Test + public void testConnection() { + new JavaTestKit(system) { + { + @SuppressWarnings("unused") + final ActorRef server = system.actorOf(Props.create(Server.class, getRef()), "server1"); + final InetSocketAddress listen = expectMsgClass(Bound.class).localAddress(); + final ActorRef client = system.actorOf(Props.create(Client.class, listen, getRef()), "client1"); + + final Connected c1 = expectMsgClass(Connected.class); + final Connected c2 = expectMsgClass(Connected.class); + assert c1.localAddress().equals(c2.remoteAddress()); + assert c2.localAddress().equals(c1.remoteAddress()); + + client.tell(ByteString.fromString("hello"), getRef()); + final ByteString reply = expectMsgClass(ByteString.class); + assert reply.utf8String().equals("hello"); + + watch(client); + client.tell("close", getRef()); + expectTerminated(client); + } + }; + } + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java b/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java new file mode 100644 index 0000000000..980e2f4768 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.Queue; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.io.Tcp.ConnectionClosed; +import akka.io.Tcp.Received; +import akka.io.TcpMessage; +import akka.japi.Procedure; +import akka.util.ByteString; + +//#simple-echo-handler +public class SimpleEchoHandler extends UntypedActor { + + final LoggingAdapter log = Logging + .getLogger(getContext().system(), getSelf()); + + final ActorRef connection; + final InetSocketAddress remote; + + public static final long maxStored = 100000000; + public static final long highWatermark = maxStored * 5 / 10; + public static final long lowWatermark = maxStored * 2 / 10; + + public SimpleEchoHandler(ActorRef connection, InetSocketAddress remote) { + this.connection = connection; + this.remote = remote; + + // sign death pact: this actor stops when the connection is closed + getContext().watch(connection); + } + + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof Received) { + final ByteString data = ((Received) msg).data(); + buffer(data); + connection.tell(TcpMessage.write(data, ACK), getSelf()); + // now switch behavior to “waiting for acknowledgement” + getContext().become(buffering, false); + + } else if (msg instanceof ConnectionClosed) { + getContext().stop(getSelf()); + } + } + + private final Procedure buffering = new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof Received) { + buffer(((Received) msg).data()); + + } else if (msg == ACK) { + acknowledge(); + + } else if (msg instanceof ConnectionClosed) { + if (((ConnectionClosed) msg).isPeerClosed()) { + closing = true; + } else { + // could also be ErrorClosed, in which case we just give up + getContext().stop(getSelf()); + } + } + } + }; + + //#storage-omitted + public void postStop() { + log.info("transferred {} bytes from/to [{}]", transferred, remote); + } + + private long transferred; + private long stored = 0; + private Queue storage = new LinkedList(); + + private boolean suspended = false; + private boolean closing = false; + + private final Object ACK = new Object(); + + //#simple-helpers + protected void buffer(ByteString data) { + storage.add(data); + stored += data.size(); + + if (stored > maxStored) { + log.warning("drop connection to [{}] (buffer overrun)", remote); + getContext().stop(getSelf()); + + } else if (stored > highWatermark) { + log.debug("suspending reading"); + connection.tell(TcpMessage.suspendReading(), getSelf()); + suspended = true; + } + } + + protected void acknowledge() { + final ByteString acked = storage.remove(); + stored -= acked.size(); + transferred += acked.size(); + + if (suspended && stored < lowWatermark) { + log.debug("resuming reading"); + connection.tell(TcpMessage.resumeReading(), getSelf()); + suspended = false; + } + + if (storage.isEmpty()) { + if (closing) { + getContext().stop(getSelf()); + } else { + getContext().unbecome(); + } + } else { + connection.tell(TcpMessage.write(storage.peek(), ACK), getSelf()); + } + } + //#simple-helpers + //#storage-omitted +} +//#simple-echo-handler diff --git a/akka-docs/rst/java/code/docs/io/japi/Watcher.java b/akka-docs/rst/java/code/docs/io/japi/Watcher.java new file mode 100644 index 0000000000..47941771e5 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/Watcher.java @@ -0,0 +1,34 @@ +package docs.io.japi; + +import java.util.concurrent.CountDownLatch; + +import akka.actor.ActorRef; +import akka.actor.Terminated; +import akka.actor.UntypedActor; + +public class Watcher extends UntypedActor { + + static public class Watch { + final ActorRef target; + public Watch(ActorRef target) { + this.target = target; + } + } + + final CountDownLatch latch; + + public Watcher(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void onReceive(Object msg) throws Exception { + if (msg instanceof Watch) { + getContext().watch(((Watch) msg).target); + } else if (msg instanceof Terminated) { + latch.countDown(); + if (latch.getCount() == 0) getContext().stop(getSelf()); + } + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index 68f2a82ea1..a96758ebd2 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -11,7 +11,12 @@ and `spray.io`_ teams. Its design combines experiences from the ``spray-io`` module with improvements that were jointly developed for more general consumption as an actor-based service. -This documentation is in progress and some sections may be incomplete. More will be coming. +The guiding design goal for this I/O implementation was to reach extreme +scalability, make no compromises in providing an API correctly matching the +underlying transport mechanism and to be fully event-driven, non-blocking and +asynchronous. The API is meant to be a solid foundation for the implementation +of network protocols and building higher abstractions; it is not meant to be a +full-service high-level NIO wrapper for end users. Terminology, Concepts --------------------- @@ -21,7 +26,7 @@ as an entry point for the API. I/O is broken into several drivers. The manager f is accessible by querying an ``ActorSystem``. For example the following code looks up the TCP manager and returns its ``ActorRef``: -.. includecode:: code/docs/io/IODocTest.java#manager +.. includecode:: code/docs/io/japi/EchoManager.java#manager The manager receives I/O command messages and instantiates worker actors in response. The worker actors present themselves to the API user in the reply to the command that was sent. For example after a ``Connect`` command sent to @@ -346,84 +351,92 @@ this must be modeled either as a command or event, i.e. it will be part of the Using TCP --------- -The following imports are assumed throughout this section: +The code snippets through-out this section assume the following imports: -.. includecode:: code/docs/io/IODocTest.java#imports +.. includecode:: code/docs/io/japi/IODocTest.java#imports All of the Akka I/O APIs are accessed through manager objects. When using an I/O API, the first step is to acquire a reference to the appropriate manager. The code below shows how to acquire a reference to the ``Tcp`` manager. -.. includecode:: code/docs/io/IODocTest.java#manager +.. includecode:: code/docs/io/japi/EchoManager.java#manager The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates workers for specific tasks, such as listening to incoming connections. -.. _connecting-java: - Connecting ^^^^^^^^^^ -The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager: +.. includecode:: code/docs/io/japi/IODocTest.java#client -.. includecode:: code/docs/io/IODocTest.java#connect - -When connecting, it is also possible to set various socket options or specify a local address: - -.. includecode:: code/docs/io/IODocTest.java#connect-with-options +The first step of connecting to a remote address is sending a :class:`Connect` +message to the TCP manager; in addition to the simplest form shown above there +is also the possibility to specify a local :class:`InetSocketAddress` to bind +to and a list of socket options to apply. .. note:: - The SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in Akka, independently of the OS default - settings. This setting disables Nagle's algorithm considerably improving latency for most applications. This setting - could be overridden by passing ``SO.TcpNoDelay(false)`` in the list of socket options of the ``Connect`` message. -After issuing the ``Connect`` command the TCP manager spawns a worker actor to handle commands related to the -connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the -``Connect`` command. + The SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in + Akka, independently of the OS default settings. This setting disables Nagle's + algorithm, considerably improving latency for most applications. This setting + could be overridden by passing ``SO.TcpNoDelay(false)`` in the list of socket + options of the ``Connect`` message. -.. includecode:: code/docs/io/IODocTest.java#connected +The TCP manager will then reply either with a :class:`CommandFailed` or it will +spawn an internal actor representing the new connection. This new actor will +then send a :class:`Connected` message to the original sender of the +:class:`Connect` message. -When receiving the :class:`Connected` message there is still no listener -associated with the connection. To finish the connection setup a ``Register`` -has to be sent to the connection actor with the listener ``ActorRef`` as a -parameter, which therefore done in the last line above. +In order to activate the new connection a :class:`Register` message must be +sent to the connection actor, informing that one about who shall receive data +from the socket. Before this step is done the connection cannot be used, and +there is an internal timeout after which the connection actor will shut itself +down if no :class:`Register` message is received. -Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter. -If the listener actor stops, the connection is closed, and all resources allocated for the connection released. During the -lifetime of the connection the listener may receive various event notifications: - -.. includecode:: code/docs/io/IODocTest.java#received - -``ConnectionClosed`` is a trait, which the different connection close events all implement. -The last line handles all connection close events in the same way. It is possible to listen for more fine-grained -connection close events, see :ref:`closing-connections-java` below. +The connection actor watches the registered handler and closes the connection +when that one terminates, thereby cleaning up all internal resources associated +with that connection. +The actor in the example above uses :meth:`become` to switch from unconnected +to connected operation, demonstrating the commands and events which are +observed in that state. For a discussion on :class:`CommandFailed` see +`Throttling Reads and Writes`_ below. :class:`ConnectionClosed` is a trait, +which marks the different connection close events. The last line handles all +connection close events in the same way. It is possible to listen for more +fine-grained connection close events, see `Closing Connections`_ below. Accepting connections ^^^^^^^^^^^^^^^^^^^^^ -To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager. -This will instruct the TCP manager to listen for TCP connections on a particular address. +.. includecode:: code/docs/io/japi/IODocTest.java#server -.. includecode:: code/docs/io/IODocTest.java#bind +To create a TCP server and listen for inbound connections, a :class:`Bind` +command has to be sent to the TCP manager. This will instruct the TCP manager +to listen for TCP connections on a particular :class:`InetSocketAddress`; the +port may be specified as ``0`` in order to bind to a random port. -The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept -incoming connections. The process for accepting connections is similar to the process for making :ref:`outgoing -connections `: when an incoming connection is established, the actor provided as ``handler`` will -receive a ``Connected`` message whose sender is the connection actor. +The actor sending the :class:`Bind` message will receive a :class:`Bound` +message signalling that the server is ready to accept incoming connections; +this message also contains the :class:`InetSocketAddress` to which the socket +was actually bound (i.e. resolved IP address and correct port number). -.. includecode:: code/docs/io/IODocTest.java#connected +From this point forward the process of handling connections is the same as for +outgoing connections. The example demonstrates that handling the reads from a +certain connection can be delegated to another actor by naming it as the +handler when sending the :class:`Register` message. Writes can be sent from any +actor in the system to the connection actor (i.e. the actor which sent the +:class:`Connected` message). The simplistic handler is defined as: -When receiving the :class:`Connected` message there is still no listener -associated with the connection. To finish the connection setup a ``Register`` -has to be sent to the connection actor with the listener ``ActorRef`` as a -parameter, which therefore done in the last line above. +.. includecode:: code/docs/io/japi/IODocTest.java#simplistic-handler -Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter. -If the listener stops, the connection is closed, and all resources allocated for the connection released. During the -connection lifetime the listener will receive various event notifications in the same way as in the outbound -connection case. +For a more complete sample which also takes into account the possibility of +failures when sending please see `Throttling Reads and Writes`_ below. -.. _closing-connections-java: +The only difference to outgoing connections is that the internal actor managing +the listen port—the sender of the :class:`Bound` message—watches the actor +which was named as the recipient for :class:`Connected` messages in the +:class:`Bind` message. When that actor terminates the listen port will be +closed and all resources associated with it will be released; existing +connections will not be terminated at this point. Closing connections ^^^^^^^^^^^^^^^^^^^ @@ -435,8 +448,8 @@ actor. the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with ``Closed``. -``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives -will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is +``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but data +will continue to be received until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is successful, the listener will be notified with ``ConfirmedClosed``. ``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending @@ -449,13 +462,126 @@ it receives one of the above close commands. ``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. -All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events +All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events may handle all close events in the same way. Throttling Reads and Writes ^^^^^^^^^^^^^^^^^^^^^^^^^^^ -*This section is not yet ready. More coming soon* +The basic model of the TCP connection actor is that it has no internal +buffering (i.e. it can only process one write at a time, meaning it can buffer +one write until it has been passed on to the O/S kernel in full). Congestion +needs to be handled at the user level, for which there are three modes of +operation: + +* *ACK-based:* every :class:`Write` command carries an arbitrary object, and if + this object is not ``Tcp.NoAck`` then it will be returned to the sender of + the :class:`Write` upon successfully writing all contained data to the + socket. If no other write is initiated before having received this + acknowledgement then no failures can happen due to buffer overrun. + +* *NACK-based:* every write which arrives while a previous write is not yet + completed will be replied to with a :class:`CommandFailed` message containing + the failed write. Just relying on this mechanism requires the implemented + protocol to tolerate skipping writes (e.g. if each write is a valid message + on its own and it is not required that all are delivered). This mode is + enabled by setting the ``useResumeWriting`` flag to ``false`` within the + :class:`Register` message during connection activation. + +* *NACK-based with write suspending:* this mode is very similar to the + NACK-based one, but once a single write has failed no further writes will + succeed until a :class:`ResumeWriting` message is received. This message will + be answered with a :class:`WritingResumed` message once the last accepted + write has completed. If the actor driving the connection implements buffering + and resends the NACK’ed messages after having awaited the + :class:`WritingResumed` signal then every message is delivered exactly once + to the network socket. + +These models (with the exception of the second which is rather specialised) are +demonstrated in complete examples below. The full and contiguous source is +available `on github <@github@/akka-docs/rst/java/code/io/japi>`_. + +.. note:: + + It should be obvious that all these flow control schemes only work between + one writer and one connection actor; as soon as multiple actors send write + commands to a single connection no consistent result can be achieved. + +ACK-Based Back-Pressure +^^^^^^^^^^^^^^^^^^^^^^^ + +For proper function of the following example it is important to configure the +connection to remain half-open when the remote side closed its writing end: +this allows the example :class:`EchoHandler` to write all outstanding data back +to the client before fully closing the connection. This is enabled using a flag +upon connection activation (observe the :class:`Register` message): + +.. includecode:: code/docs/io/japi/EchoManager.java#echo-manager + +With this preparation let us dive into the handler itself: + +.. includecode:: code/docs/io/japi/SimpleEchoHandler.java#simple-echo-handler + :exclude: storage-omitted + +The principle is simple: when having written a chunk always wait for the +``Ack`` to come back before sending the next chunk. While waiting we switch +behavior such that new incoming data are buffered. The helper functions used +are a bit lengthy but not complicated: + +.. includecode:: code/docs/io/japi/SimpleEchoHandler.java#simple-helpers + +The most interesting part is probably the last: an ``Ack`` removes the oldest +data chunk from the buffer, and if that was the last chunk then we either close +the connection (if the peer closed its half already) or return to the idle +behavior; otherwise we just send the next buffered chunk and stay waiting for +the next ``Ack``. + +Back-pressure can be propagated also across the reading side back to the writer +on the other end of the connection by sending the :class:`SuspendReading` +command to the connection actor. This will lead to no data being read from the +socket anymore (although this does happen after a delay because it takes some +time until the connection actor processes this command, hence appropriate +head-room in the buffer should be present), which in turn will lead to the O/S +kernel buffer filling up on our end, then the TCP window mechanism will stop +the remote side from writing, filling up its write buffer, until finally the +writer on the other side cannot push any data into the socket anymore. This is +how end-to-end back-pressure is realized across a TCP connection. + +NACK-Based Back-Pressure with Write Suspending +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. includecode:: code/docs/io/japi/EchoHandler.java#echo-handler + :exclude: buffering,closing,storage-omitted + +The principle here is to keep writing until a :class:`CommandFailed` is +received, using acknowledgements only to prune the resend buffer. When a such a +failure was received, transition into a different state for handling and handle +resending of all queued data: + +.. includecode:: code/docs/io/japi/EchoHandler.java#buffering + +It should be noted that all writes which are currently buffered have also been +sent to the connection actor upon entering this state, which means that the +:class:`ResumeWriting` message is enqueued after those writes, leading to the +reception of all outstanding :class:`CommandFailre` messages (which are ignored +in this state) before receiving the :class:`WritingResumed` signal. That latter +message is sent by the connection actor only once the internally queued write +has been fully completed, meaning that a subsequent write will not fail. This +is exploited by the :class:`EchoHandler` to switch to an ACK-based approach for +the first ten writes after a failure before resuming the optimistic +write-through behavior. + +.. includecode:: code/docs/io/japi/EchoHandler.java#closing + +Closing the connection while still sending all data is a bit more involved than +in the ACK-based approach: the idea is to always send all outstanding messages +and acknowledge all successful writes, and if a failure happens then switch +behavior to await the :class:`WritingResumed` event and start over. + +The helper functions are very similar to the ACK-based case: + +.. includecode:: code/docs/io/japi/EchoHandler.java#helpers + Using UDP --------- @@ -568,12 +694,6 @@ will always be the endpoint we originally connected to. check, while in the case of connection-based UDP the security check is cached after connect, thus writes does not suffer an additional performance penalty. -Throttling Reads and Writes -^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -*This section is not yet ready. More coming soon* - - Architecture in-depth --------------------- diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index b3940efbe1..fe68e2f240 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -168,6 +168,23 @@ constructor arguments are determined by a dependency injection framework. When using a dependency injection framework, actor beans *MUST NOT* have singleton scope. +The Inbox +--------- + +When writing code outside of actors which shall communicate with actors, the +``ask`` pattern can be a solution (see below), but there are two thing it +cannot do: receiving multiple replies (e.g. by subscribing an :class:`ActorRef` +to a notification service) and watching other actors’ lifecycle. For these +purposes there is the :class:`Inbox` class: + +.. includecode:: code/docs/actor/InboxDocTest.java#inbox + +The :meth:`send` method wraps a normal :meth:`tell` and supplies the internal +actor’s reference as the sender. This allows the reply to be received on the +last line. Watching an actor is quite simple as well: + +.. includecode:: code/docs/actor/InboxDocTest.java#watch + UntypedActor API ================ diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 78ff547ddf..d83c90fa7d 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -255,6 +255,24 @@ If you want to use this magic, simply extend :class:`ActWithStash`: .. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#act-with-stash +The Inbox +--------- + +When writing code outside of actors which shall communicate with actors, the +``ask`` pattern can be a solution (see below), but there are two thing it +cannot do: receiving multiple replies (e.g. by subscribing an :class:`ActorRef` +to a notification service) and watching other actors’ lifecycle. For these +purposes there is the :class:`Inbox` class: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#inbox + +There is an implicit conversion from inbox to actor reference which means that +in this example the sender reference will be that of the actor hidden away +within the inbox. This allows the reply to be received on the last line. +Watching an actor is quite simple as well: + +.. includecode:: ../../../akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala#watch + Actor API ========= diff --git a/akka-docs/rst/scala/code/docs/io/EchoServer.scala b/akka-docs/rst/scala/code/docs/io/EchoServer.scala new file mode 100644 index 0000000000..bdefdcee6f --- /dev/null +++ b/akka-docs/rst/scala/code/docs/io/EchoServer.scala @@ -0,0 +1,302 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io + +import java.net.InetSocketAddress + +import scala.concurrent.duration.DurationInt + +import com.typesafe.config.ConfigFactory + +import akka.actor.{ Actor, ActorDSL, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy } +import akka.actor.ActorDSL.inbox +import akka.io.{ IO, Tcp } +import akka.util.ByteString + +object EchoServer extends App { + + val config = ConfigFactory.parseString("akka.loglevel = DEBUG") + implicit val system = ActorSystem("EchoServer", config) + + // make sure to stop the system so that the application stops + try run() + finally system.shutdown() + + def run(): Unit = { + import ActorDSL._ + + // create two EchoManager and stop the application once one dies + val watcher = inbox() + watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo")) + watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple")) + watcher.receive(10.minutes) + } + +} + +class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging { + + import Tcp._ + import context.system + + // there is not recovery for broken connections + override val supervisorStrategy = SupervisorStrategy.stoppingStrategy + + // bind to the listen port; the port will automatically be closed once this actor dies + override def preStart(): Unit = { + IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0)) + } + + // do not restart + override def postRestart(thr: Throwable): Unit = context stop self + + def receive = { + case Bound(localAddress) ⇒ + log.info("listening on port {}", localAddress.getPort) + + case CommandFailed(Bind(_, local, _, _)) ⇒ + log.warning(s"cannot bind to [$local]") + context stop self + + //#echo-manager + case Connected(remote, local) ⇒ + log.info("received connection from {}", remote) + val handler = context.actorOf(Props(handlerClass, sender, remote)) + sender ! Register(handler, keepOpenOnPeerClosed = true) + //#echo-manager + } + +} + +object EchoHandler { + def apply(connection: ActorRef, remote: InetSocketAddress): Props = + Props(classOf[EchoHandler], connection, remote) +} + +//#echo-handler +class EchoHandler(connection: ActorRef, remote: InetSocketAddress) + extends Actor with ActorLogging { + + import Tcp._ + + // sign death pact: this actor terminates when connection breaks + context watch connection + + // start out in optimistic write-through mode + def receive = writing + + //#writing + def writing: Receive = { + case Received(data) ⇒ + connection ! Write(data, currentOffset) + buffer(data) + + case ack: Int ⇒ + acknowledge(ack) + + case CommandFailed(Write(_, ack: Int)) ⇒ + connection ! ResumeWriting + context become buffering(ack) + + case PeerClosed ⇒ + if (storage.isEmpty) context stop self + else context become closing + } + //#writing + + //#buffering + def buffering(nack: Int): Receive = { + var toAck = 10 + var peerClosed = false + + { + case Received(data) ⇒ buffer(data) + case WritingResumed ⇒ writeFirst() + case PeerClosed ⇒ peerClosed = true + case ack: Int if ack < nack ⇒ acknowledge(ack) + case ack: Int ⇒ + acknowledge(ack) + if (storage.nonEmpty) { + if (toAck > 0) { + // stay in ACK-based mode for a while + writeFirst() + toAck -= 1 + } else { + // then return to NACK-based again + writeAll() + context become (if (peerClosed) closing else writing) + } + } else if (peerClosed) context stop self + else context become writing + } + } + //#buffering + + //#closing + def closing: Receive = { + case CommandFailed(_: Write) ⇒ + connection ! ResumeWriting + context.become({ + + case WritingResumed ⇒ + writeAll() + context.unbecome() + + case ack: Int ⇒ acknowledge(ack) + + }, discardOld = false) + + case ack: Int ⇒ + acknowledge(ack) + if (storage.isEmpty) context stop self + } + //#closing + + override def postStop(): Unit = { + log.info(s"transferred $transferred bytes from/to [$remote]") + } + + //#storage-omitted + var storageOffset = 0 + var storage = Vector.empty[ByteString] + var stored = 0L + var transferred = 0L + + val maxStored = 100000000L + val highWatermark = maxStored * 5 / 10 + val lowWatermark = maxStored * 3 / 10 + var suspended = false + + private def currentOffset = storageOffset + storage.size + + //#helpers + private def buffer(data: ByteString): Unit = { + storage :+= data + stored += data.size + + if (stored > maxStored) { + log.warning(s"drop connection to [$remote] (buffer overrun)") + context stop self + + } else if (stored > highWatermark) { + log.debug(s"suspending reading at $currentOffset") + connection ! SuspendReading + suspended = true + } + } + + private def acknowledge(ack: Int): Unit = { + require(ack == storageOffset, s"received ack $ack at $storageOffset") + require(storage.nonEmpty, s"storage was empty at ack $ack") + + val size = storage(0).size + stored -= size + transferred += size + + storageOffset += 1 + storage = storage drop 1 + + if (suspended && stored < lowWatermark) { + log.debug("resuming reading") + connection ! ResumeReading + suspended = false + } + } + //#helpers + + private def writeFirst(): Unit = { + connection ! Write(storage(0), storageOffset) + } + + private def writeAll(): Unit = { + for ((data, i) ← storage.zipWithIndex) { + connection ! Write(data, storageOffset + i) + } + } + + //#storage-omitted +} +//#echo-handler + +//#simple-echo-handler +class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress) + extends Actor with ActorLogging { + + import Tcp._ + + // sign death pact: this actor terminates when connection breaks + context watch connection + + case object Ack + + def receive = { + case Received(data) ⇒ + buffer(data) + connection ! Write(data, Ack) + + context.become({ + case Received(data) ⇒ buffer(data) + case Ack ⇒ acknowledge() + case PeerClosed ⇒ closing = true + }, discardOld = false) + + case PeerClosed ⇒ context stop self + } + + //#storage-omitted + override def postStop(): Unit = { + log.info(s"transferred $transferred bytes from/to [$remote]") + } + + var storage = Vector.empty[ByteString] + var stored = 0L + var transferred = 0L + var closing = false + + val maxStored = 100000000L + val highWatermark = maxStored * 5 / 10 + val lowWatermark = maxStored * 3 / 10 + var suspended = false + + //#simple-helpers + private def buffer(data: ByteString): Unit = { + storage :+= data + stored += data.size + + if (stored > maxStored) { + log.warning(s"drop connection to [$remote] (buffer overrun)") + context stop self + + } else if (stored > highWatermark) { + log.debug(s"suspending reading") + connection ! SuspendReading + suspended = true + } + } + + private def acknowledge(): Unit = { + require(storage.nonEmpty, "storage was empty") + + val size = storage(0).size + stored -= size + transferred += size + + storage = storage drop 1 + + if (suspended && stored < lowWatermark) { + log.debug("resuming reading") + connection ! ResumeReading + suspended = false + } + + if (storage.isEmpty) { + if (closing) context stop self + else context.unbecome() + } else connection ! Write(storage(0), Ack) + } + //#simple-helpers + //#storage-omitted +} +//#simple-echo-handler diff --git a/akka-docs/rst/scala/code/docs/io/IODocSpec.scala b/akka-docs/rst/scala/code/docs/io/IODocSpec.scala new file mode 100644 index 0000000000..141f33312a --- /dev/null +++ b/akka-docs/rst/scala/code/docs/io/IODocSpec.scala @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io + +//#imports +import akka.actor.{ Actor, ActorRef, Props } +import akka.io.{ IO, Tcp } +import akka.util.ByteString +import java.net.InetSocketAddress +//#imports + +import akka.testkit.AkkaSpec +import scala.concurrent.duration._ + +class DemoActor extends Actor { + //#manager + import akka.io.{ IO, Tcp } + import context.system // implicitly used by IO(Tcp) + + val manager = IO(Tcp) + //#manager + + def receive = Actor.emptyBehavior +} + +//#server +object Server { + def apply(manager: ActorRef) = Props(classOf[Server], manager) +} + +class Server(manager: ActorRef) extends Actor { + + import Tcp._ + import context.system + + IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0)) + + def receive = { + case b @ Bound(localAddress) ⇒ manager ! b + + case CommandFailed(_: Bind) ⇒ context stop self + + case c @ Connected(remote, local) ⇒ + manager ! c + val handler = context.actorOf(Props[SimplisticHandler]) + val connection = sender + connection ! Register(handler) + } + +} +//#server + +//#simplistic-handler +class SimplisticHandler extends Actor { + import Tcp._ + def receive = { + case Received(data) ⇒ sender ! Write(data) + case PeerClosed ⇒ context stop self + } +} +//#simplistic-handler + +//#client +object Client { + def apply(remote: InetSocketAddress, replies: ActorRef) = + Props(classOf[Client], remote, replies) +} + +class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor { + + import Tcp._ + import context.system + + IO(Tcp) ! Connect(remote) + + def receive = { + case CommandFailed(_: Connect) ⇒ + listener ! "failed" + context stop self + + case c @ Connected(remote, local) ⇒ + listener ! c + val connection = sender + connection ! Register(self) + context become { + case data: ByteString ⇒ connection ! Write(data) + case CommandFailed(w: Write) ⇒ // O/S buffer was full + case Received(data) ⇒ listener ! data + case "close" ⇒ connection ! Close + case _: ConnectionClosed ⇒ context stop self + } + } +} +//#client + +class IODocSpec extends AkkaSpec { + + "demonstrate connect" in { + val server = system.actorOf(Server(testActor), "server1") + val listen = expectMsgType[Tcp.Bound].localAddress + val client = system.actorOf(Client(listen, testActor), "client1") + + val c1, c2 = expectMsgType[Tcp.Connected] + c1.localAddress must be(c2.remoteAddress) + c2.localAddress must be(c1.remoteAddress) + + client ! ByteString("hello") + expectMsgType[ByteString].utf8String must be("hello") + + watch(client) + client ! "close" + expectTerminated(client, 1.second) + } + +} diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index dc711bd84d..c44fb4e9c1 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -11,22 +11,26 @@ and `spray.io`_ teams. Its design combines experiences from the ``spray-io`` module with improvements that were jointly developed for more general consumption as an actor-based service. -This documentation is in progress and some sections may be incomplete. More will be coming. +The guiding design goal for this I/O implementation was to reach extreme +scalability, make no compromises in providing an API correctly matching the +underlying transport mechanism and to be fully event-driven, non-blocking and +asynchronous. The API is meant to be a solid foundation for the implementation +of network protocols and building higher abstractions; it is not meant to be a +full-service high-level NIO wrapper for end users. .. note:: The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old` Terminology, Concepts --------------------- + The I/O API is completely actor based, meaning that all operations are implemented with message passing instead of direct method calls. Every I/O driver (TCP, UDP) has a special actor, called a *manager* that serves as an entry point for the API. I/O is broken into several drivers. The manager for a particular driver is accessible through the ``IO`` entry point. For example the following code looks up the TCP manager and returns its ``ActorRef``: -.. code-block:: scala - - val tcpManager = IO(Tcp) +.. includecode:: code/docs/io/IODocSpec.scala#manager The manager receives I/O command messages and instantiates worker actors in response. The worker actors present themselves to the API user in the reply to the command that was sent. For example after a ``Connect`` command sent to @@ -366,107 +370,92 @@ this must be modeled either as a command or event, i.e. it will be part of the Using TCP --------- +The code snippets through-out this section assume the following imports: + +.. includecode:: code/docs/io/IODocSpec.scala#imports + All of the Akka I/O APIs are accessed through manager objects. When using an I/O API, the first step is to acquire a reference to the appropriate manager. The code below shows how to acquire a reference to the ``Tcp`` manager. -.. code-block:: scala - - import akka.io.IO - import akka.io.Tcp - val tcpManager = IO(Tcp) +.. includecode:: code/docs/io/IODocSpec.scala#manager The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates workers for specific tasks, such as listening to incoming connections. -.. _connecting-scala: - Connecting ^^^^^^^^^^ -The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager: +.. includecode:: code/docs/io/IODocSpec.scala#client -.. code-block:: scala - - import akka.io.Tcp._ - IO(Tcp) ! Connect(remoteSocketAddress) - -When connecting, it is also possible to set various socket options or specify a local address: - -.. code-block:: scala - - IO(Tcp) ! Connect(remoteSocketAddress, Some(localSocketAddress), List(SO.KeepAlive(true))) +The first step of connecting to a remote address is sending a :class:`Connect` +message to the TCP manager; in addition to the simplest form shown above there +is also the possibility to specify a local :class:`InetSocketAddress` to bind +to and a list of socket options to apply. .. note:: - The SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in Akka, independently of the OS default - settings. This setting disables Nagle's algorithm considerably improving latency for most applications. This setting - could be overridden by passing ``SO.TcpNoDelay(false)`` in the list of socket options of the ``Connect`` message. -After issuing the ``Connect`` command the TCP manager spawns a worker actor to handle commands related to the -connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the -``Connect`` command. + The SO_NODELAY (TCP_NODELAY on Windows) socket option defaults to true in + Akka, independently of the OS default settings. This setting disables Nagle's + algorithm, considerably improving latency for most applications. This setting + could be overridden by passing ``SO.TcpNoDelay(false)`` in the list of socket + options of the ``Connect`` message. -.. code-block:: scala +The TCP manager will then reply either with a :class:`CommandFailed` or it will +spawn an internal actor representing the new connection. This new actor will +then send a :class:`Connected` message to the original sender of the +:class:`Connect` message. - case Connected(remoteAddress, localAddress) => - connectionActor = sender +In order to activate the new connection a :class:`Register` message must be +sent to the connection actor, informing that one about who shall receive data +from the socket. Before this step is done the connection cannot be used, and +there is an internal timeout after which the connection actor will shut itself +down if no :class:`Register` message is received. -At this point, there is still no listener associated with the connection. To finish the connection setup a ``Register`` -has to be sent to the connection actor with the listener ``ActorRef`` as a parameter. - -.. code-block:: scala - - connectionActor ! Register(listener) - -Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter. -If the listener actor stops, the connection is closed, and all resources allocated for the connection released. During the -lifetime of the connection the listener may receive various event notifications: - -.. code-block:: scala - - case Received(dataByteString) => // handle incoming chunk of data - case CommandFailed(cmd) => // handle failure of command: cmd - case _: ConnectionClosed => // handle closed connections - -``ConnectionClosed`` is a trait, which the different connection close events all implement. -The last line handles all connection close events in the same way. It is possible to listen for more fine-grained -connection close events, see :ref:`closing-connections-scala` below. +The connection actor watches the registered handler and closes the connection +when that one terminates, thereby cleaning up all internal resources associated +with that connection. +The actor in the example above uses :meth:`become` to switch from unconnected +to connected operation, demonstrating the commands and events which are +observed in that state. For a discussion on :class:`CommandFailed` see +`Throttling Reads and Writes`_ below. :class:`ConnectionClosed` is a trait, +which marks the different connection close events. The last line handles all +connection close events in the same way. It is possible to listen for more +fine-grained connection close events, see `Closing Connections`_ below. Accepting connections ^^^^^^^^^^^^^^^^^^^^^ -To create a TCP server and listen for inbound connections, a ``Bind`` command has to be sent to the TCP manager. -This will instruct the TCP manager to listen for TCP connections on a particular address. +.. includecode:: code/docs/io/IODocSpec.scala#server -.. code-block:: scala +To create a TCP server and listen for inbound connections, a :class:`Bind` +command has to be sent to the TCP manager. This will instruct the TCP manager +to listen for TCP connections on a particular :class:`InetSocketAddress`; the +port may be specified as ``0`` in order to bind to a random port. - import akka.io.IO - import akka.io.Tcp - IO(Tcp) ! Bind(handler, localAddress) +The actor sending the :class:`Bind` message will receive a :class:`Bound` +message signalling that the server is ready to accept incoming connections; +this message also contains the :class:`InetSocketAddress` to which the socket +was actually bound (i.e. resolved IP address and correct port number). -The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept -incoming connections. The process for accepting connections is similar to the process for making :ref:`outgoing -connections `: when an incoming connection is established, the actor provided as ``handler`` will -receive a ``Connected`` message whose sender is the connection actor. +From this point forward the process of handling connections is the same as for +outgoing connections. The example demonstrates that handling the reads from a +certain connection can be delegated to another actor by naming it as the +handler when sending the :class:`Register` message. Writes can be sent from any +actor in the system to the connection actor (i.e. the actor which sent the +:class:`Connected` message). The simplistic handler is defined as: -.. code-block:: scala +.. includecode:: code/docs/io/IODocSpec.scala#simplistic-handler - case Connected(remoteAddress, localAddress) => - connectionActor = sender +For a more complete sample which also takes into account the possibility of +failures when sending please see `Throttling Reads and Writes`_ below. -At this point, there is still no listener associated with the connection. To finish the connection setup a ``Register`` -has to be sent to the connection actor with the listener ``ActorRef`` as a parameter. - -.. code-block:: scala - - connectionActor ! Register(listener) - -Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter. -If the listener stops, the connection is closed, and all resources allocated for the connection are released. During the -connection lifetime the listener will receive various event notifications in the same way as in the outbound -connection case. - -.. _closing-connections-scala: +The only difference to outgoing connections is that the internal actor managing +the listen port—the sender of the :class:`Bound` message—watches the actor +which was named as the recipient for :class:`Connected` messages in the +:class:`Bind` message. When that actor terminates the listen port will be +closed and all resources associated with it will be released; existing +connections will not be terminated at this point. Closing connections ^^^^^^^^^^^^^^^^^^^ @@ -478,8 +467,8 @@ actor. the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with ``Closed``. -``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives -will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is +``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but data +will continue to be received until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is successful, the listener will be notified with ``ConfirmedClosed``. ``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending @@ -492,13 +481,125 @@ it receives one of the above close commands. ``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. -All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events +All close notifications are sub-types of ``ConnectionClosed`` so listeners who do not need fine-grained close events may handle all close events in the same way. Throttling Reads and Writes ^^^^^^^^^^^^^^^^^^^^^^^^^^^ -*This section is not yet ready. More coming soon* +The basic model of the TCP connection actor is that it has no internal +buffering (i.e. it can only process one write at a time, meaning it can buffer +one write until it has been passed on to the O/S kernel in full). Congestion +needs to be handled at the user level, for which there are three modes of +operation: + +* *ACK-based:* every :class:`Write` command carries an arbitrary object, and if + this object is not ``Tcp.NoAck`` then it will be returned to the sender of + the :class:`Write` upon successfully writing all contained data to the + socket. If no other write is initiated before having received this + acknowledgement then no failures can happen due to buffer overrun. + +* *NACK-based:* every write which arrives while a previous write is not yet + completed will be replied to with a :class:`CommandFailed` message containing + the failed write. Just relying on this mechanism requires the implemented + protocol to tolerate skipping writes (e.g. if each write is a valid message + on its own and it is not required that all are delivered). This mode is + enabled by setting the ``useResumeWriting`` flag to ``false`` within the + :class:`Register` message during connection activation. + +* *NACK-based with write suspending:* this mode is very similar to the + NACK-based one, but once a single write has failed no further writes will + succeed until a :class:`ResumeWriting` message is received. This message will + be answered with a :class:`WritingResumed` message once the last accepted + write has completed. If the actor driving the connection implements buffering + and resends the NACK’ed messages after having awaited the + :class:`WritingResumed` signal then every message is delivered exactly once + to the network socket. + +These models (with the exception of the second which is rather specialised) are +demonstrated in complete examples below. The full and contiguous source is +available `on github <@github@/akka-docs/rst/scala/code/io/EchoServer.scala>`_. + +.. note:: + + It should be obvious that all these flow control schemes only work between + one writer and one connection actor; as soon as multiple actors send write + commands to a single connection no consistent result can be achieved. + +ACK-Based Back-Pressure +^^^^^^^^^^^^^^^^^^^^^^^ + +For proper function of the following example it is important to configure the +connection to remain half-open when the remote side closed its writing end: +this allows the example :class:`EchoHandler` to write all outstanding data back +to the client before fully closing the connection. This is enabled using a flag +upon connection activation (observe the :class:`Register` message): + +.. includecode:: code/docs/io/EchoServer.scala#echo-manager + +With this preparation let us dive into the handler itself: + +.. includecode:: code/docs/io/EchoServer.scala#simple-echo-handler + :exclude: storage-omitted + +The principle is simple: when having written a chunk always wait for the +``Ack`` to come back before sending the next chunk. While waiting we switch +behavior such that new incoming data are buffered. The helper functions used +are a bit lengthy but not complicated: + +.. includecode:: code/docs/io/EchoServer.scala#simple-helpers + +The most interesting part is probably the last: an ``Ack`` removes the oldest +data chunk from the buffer, and if that was the last chunk then we either close +the connection (if the peer closed its half already) or return to the idle +behavior; otherwise we just send the next buffered chunk and stay waiting for +the next ``Ack``. + +Back-pressure can be propagated also across the reading side back to the writer +on the other end of the connection by sending the :class:`SuspendReading` +command to the connection actor. This will lead to no data being read from the +socket anymore (although this does happen after a delay because it takes some +time until the connection actor processes this command, hence appropriate +head-room in the buffer should be present), which in turn will lead to the O/S +kernel buffer filling up on our end, then the TCP window mechanism will stop +the remote side from writing, filling up its write buffer, until finally the +writer on the other side cannot push any data into the socket anymore. This is +how end-to-end back-pressure is realized across a TCP connection. + +NACK-Based Back-Pressure with Write Suspending +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. includecode:: code/docs/io/EchoServer.scala#echo-handler + :exclude: buffering,closing,storage-omitted + +The principle here is to keep writing until a :class:`CommandFailed` is +received, using acknowledgements only to prune the resend buffer. When a such a +failure was received, transition into a different state for handling and handle +resending of all queued data: + +.. includecode:: code/docs/io/EchoServer.scala#buffering + +It should be noted that all writes which are currently buffered have also been +sent to the connection actor upon entering this state, which means that the +:class:`ResumeWriting` message is enqueued after those writes, leading to the +reception of all outstanding :class:`CommandFailre` messages (which are ignored +in this state) before receiving the :class:`WritingResumed` signal. That latter +message is sent by the connection actor only once the internally queued write +has been fully completed, meaning that a subsequent write will not fail. This +is exploited by the :class:`EchoHandler` to switch to an ACK-based approach for +the first ten writes after a failure before resuming the optimistic +write-through behavior. + +.. includecode:: code/docs/io/EchoServer.scala#closing + +Closing the connection while still sending all data is a bit more involved than +in the ACK-based approach: the idea is to always send all outstanding messages +and acknowledge all successful writes, and if a failure happens then switch +behavior to await the :class:`WritingResumed` event and start over. + +The helper functions are very similar to the ACK-based case: + +.. includecode:: code/docs/io/EchoServer.scala#helpers Using UDP --------- @@ -643,12 +744,6 @@ will always be the endpoint we originally connected to. check, while in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty. -Throttling Reads and Writes -^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -*This section is not yet ready. More coming soon* - - Architecture in-depth --------------------- diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index fc3ba8e6a7..1d99d70894 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -494,7 +494,7 @@ object AkkaBuild extends Build { libraryDependencies ++= Dependencies.docs, publishArtifact in Compile := false, unmanagedSourceDirectories in ScalariformKeys.format in Test <<= unmanagedSourceDirectories in Test, - testOptions += Tests.Argument(TestFrameworks.JUnit, "-v") + testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") ) )