diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 0b9fb4ca0c..18c96594d6 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -86,4 +86,12 @@ object Inet { val TrafficClass = SO.TrafficClass } + trait SoJavaFactories { + import SO._ + def receiveBufferSize(size: Int) = ReceiveBufferSize(size) + def reuseAddress(on: Boolean) = ReuseAddress(on) + def sendBufferSize(size: Int) = SendBufferSize(size) + def trafficClass(tc: Int) = TrafficClass(tc) + } + } diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 97076477cf..a95e26867a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString import akka.actor._ +import java.lang.{ Iterable ⇒ JIterable } object Tcp extends ExtensionKey[TcpExt] { @@ -69,6 +70,7 @@ object Tcp extends ExtensionKey[TcpExt] { endpoint: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command + case class Register(handler: ActorRef) extends Command case object Unbind extends Command @@ -77,7 +79,8 @@ object Tcp extends ExtensionKey[TcpExt] { case object ConfirmedClose extends CloseCommand case object Abort extends CloseCommand - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) /** * Write data to the TCP connection. If no ack is needed use the special @@ -86,7 +89,7 @@ object Tcp extends ExtensionKey[TcpExt] { case class Write(data: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Write { val Empty: Write = Write(ByteString.empty, NoAck) @@ -106,12 +109,27 @@ object Tcp extends ExtensionKey[TcpExt] { case object Bound extends Event case object Unbound extends Event - sealed trait ConnectionClosed extends Event + sealed trait ConnectionClosed extends Event { + def isAborted: Boolean = false + def isConfirmed: Boolean = false + def isPeerClosed: Boolean = false + def isErrorClosed: Boolean = false + def getErrorCause: String = null + } case object Closed extends ConnectionClosed - case object Aborted extends ConnectionClosed - case object ConfirmedClosed extends ConnectionClosed - case object PeerClosed extends ConnectionClosed - case class ErrorClosed(cause: String) extends ConnectionClosed + case object Aborted extends ConnectionClosed { + override def isAborted = true + } + case object ConfirmedClosed extends ConnectionClosed { + override def isConfirmed = true + } + case object PeerClosed extends ConnectionClosed { + override def isPeerClosed = true + } + case class ErrorClosed(cause: String) extends ConnectionClosed { + override def isErrorClosed = true + override def getErrorCause = cause + } } class TcpExt(system: ExtendedActorSystem) extends IO.Extension { @@ -158,3 +176,51 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) } + +object TcpSO extends SoJavaFactories { + import Tcp.SO._ + def keepAlive(on: Boolean) = KeepAlive(on) + def oobInline(on: Boolean) = OOBInline(on) + def tcpNoDelay(on: Boolean) = TcpNoDelay(on) +} + +object TcpMessage { + import language.implicitConversions + import Tcp._ + + def connect(remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]) = Connect(remoteAddress, Some(localAddress), options) + def connect(remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]) = Connect(remoteAddress, None, options) + def connect(remoteAddress: InetSocketAddress) = Connect(remoteAddress, None, Nil) + + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int, + options: JIterable[SocketOption]) = Bind(handler, endpoint, backlog, options) + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int) = Bind(handler, endpoint, backlog, Nil) + + def register(handler: ActorRef) = Register(handler) + def unbind = Unbind + + def close = Close + def confirmedClose = ConfirmedClose + def abort = Abort + + def noAck = NoAck + def noAck(token: AnyRef) = NoAck(token) + + def write(data: ByteString) = Write(data) + def write(data: ByteString, ack: AnyRef) = Write(data, ack) + + def stopReading = StopReading + def resumeReading = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-docs/rst/java/code/docs/io/IODocTest.java b/akka-docs/rst/java/code/docs/io/IODocTest.java new file mode 100644 index 0000000000..583254e44d --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IODocTest.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.Tcp; +import akka.io.TcpExt; +import akka.io.TcpMessage; +import akka.io.TcpSO; +import akka.util.ByteString; +//#imports + +public class IODocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef listener = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef tcp = Tcp.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1", + 12345); + tcp.tell(TcpMessage.connect(remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.keepAlive(true)); + tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof Tcp.Connected) { + final Tcp.Connected conn = (Tcp.Connected) msg; + connectionActor = getSender(); + connectionActor.tell(TcpMessage.register(listener), getSelf()); + } + //#connected + else + //#received + if (msg instanceof Tcp.Received) { + final Tcp.Received recv = (Tcp.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof Tcp.CommandFailed) { + final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg; + final Tcp.Command command = failed.cmd(); + // react to failed connect, bind, write, etc. + } else if (msg instanceof Tcp.ConnectionClosed) { + final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg; + if (closed.isAborted()) { + // handle close reasons like this + } + } + //#received + else + if ("bind".equals(msg)) { + final ActorRef handler = getSelf(); + //#bind + final ActorRef tcp = Tcp.get(system).manager(); + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.reuseAddress(true)); + tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf()); + //#bind + } + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("IODocTest"); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/index.rst b/akka-docs/rst/java/index.rst index b0f29c27ef..81404409cb 100644 --- a/akka-docs/rst/java/index.rst +++ b/akka-docs/rst/java/index.rst @@ -20,6 +20,7 @@ Java API stm agents transactors + io fsm extending-akka zeromq diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst new file mode 100644 index 0000000000..aef55caba2 --- /dev/null +++ b/akka-docs/rst/java/io.rst @@ -0,0 +1,316 @@ +.. _io-java: + +I/O (Java) +========== + +Introduction +------------ + +The ``akka.io`` package has been developed in collaboration between the Akka +and `spray.io`_ teams. Its design incorporates the experiences with the +``spray-io`` module along with improvements that were jointly developed for +more general consumption as an actor-based service. + +This documentation is in progress and some sections may be incomplete. More will be coming. + +Terminology, Concepts +--------------------- +The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of +direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves +as the entry point for the API. The manager is accessible through an extension, for example the following code +looks up the TCP manager and returns its ``ActorRef``: + +.. includecode:: code/docs/io/IODocTest.java#manager + +For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the +API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates +an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending +messages to the connection actor which announces itself by sending a ``Connected`` message. + +DeathWatch and Resource Management +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections, +incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the +resources assigned to them are automatically released when the listener stops. This design makes the API more robust +against resource leaks. + +Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor +responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates. + +Write models (Ack, Nack) +^^^^^^^^^^^^^^^^^^^^^^^^ + +Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an +application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has +not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite. +Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by +explicitly notifying it when the next chunk is possible to be written or buffered. + +Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled +by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by +the worker to notify the writer about the success. + +If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in +the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that +in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a +write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent. + +.. warning:: + An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack + protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the + I/O driver is ready to accept a new one. + +ByteString +^^^^^^^^^^ + +A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed. + +``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice. + +``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc. + +``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods: + +Compatibility with java.io +.......................... + +A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. + +Using TCP +--------- + +The following imports are assumed throughout this section: + +.. includecode:: code/docs/io/IODocTest.java#imports + +As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager: + +.. includecode:: code/docs/io/IODocTest.java#manager + +This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for +specific tasks, like listening to incoming connections. + +Connecting +^^^^^^^^^^ + +The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#connect + +After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the +connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the +``Connect`` command. + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener may receive various event notifications: + +.. includecode:: code/docs/io/IODocTest.java#received + +The last line handles all connection close events in the same way. It is possible to listen for more fine-grained +connection events, see the appropriate section below. + + +Accepting connections +^^^^^^^^^^^^^^^^^^^^^ + +To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#bind + +The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept +incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when +an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose +sender is the connection actor: + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener will receive various event notifications in the same way as we has seen in the outbound +connection case. + +Closing connections +^^^^^^^^^^^^^^^^^^^ + +A connection can be closed by sending one of the commands ``Close``, ``ConfirmedClose`` or ``Abort`` to the connection +actor. + +``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from +the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with +``Closed`` + +``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives +will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is +successful, the listener will be notified with ``ConfirmedClosed`` + +``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending +writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted`` + +``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. + +``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. + +All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events +may handle all close events in the same way. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + +Using UDP +--------- + +UDP support comes in two flavors: connectionless, and connection based: + +.. code-block:: scala + + import akka.io.IO + import akka.io.UdpFF + val connectionLessUdp = IO(UdpFF) + // ... or ... + import akka.io.UdpConn + val connectionBasedUdp = IO(UdpConn) + +UDP servers can be only implemented by the connectionless API, but clients can use both. + +Connectionless UDP +^^^^^^^^^^^^^^^^^^ + +Simple Send +............ + +To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the +manager: + +.. code-block:: scala + + IO(UdpFF) ! SimpleSender + // or with socket options: + import akka.io.Udp._ + IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true))) + +The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: + +.. code-block:: scala + + case SimpleSendReady => + simpleSender = sender + +After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple +message send: + +.. code-block:: scala + + simpleSender ! Send(data, serverAddress) + + +Bind (and Send) +............... + +To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP +manager + +.. code-block:: scala + + IO(UdpFF) ! Bind(handler, localAddress) + +After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of +this message is the worker for the UDP channel bound to the local address. + +.. code-block:: scala + + case Bound => + udpWorker = sender // Save the worker ref for later use + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. code-block:: scala + + case Received(dataByteString, remoteAddress) => // Do something with the data + +The ``Received`` message contains the payload of the datagram and the address of the sender. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. code-block:: scala + + udpWorker ! Send(data, serverAddress) + +.. note:: + The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case + the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined + ephemeral port. + +Connection based UDP +^^^^^^^^^^^^^^^^^^^^ + +The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but +the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will +receive datagrams only from that address. + +Connecting is similar to what we have seen in the previous section: + +.. code-block:: scala + + IO(UdpConn) ! Connect(handler, remoteAddress) + // or, with more options: + IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) + +After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of +this message is the worker for the UDP connection. + +.. code-block:: scala + + case Connected => + udpConnectionActor = sender // Save the worker ref for later use + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. code-block:: scala + + case Received(dataByteString) => // Do something with the data + +The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address +will be provided, as an UDP connection only receives messages from the endpoint it has been connected to. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. code-block:: scala + + udpConnectionActor ! Send(data) + +Again, the send does not contain a remote address, as it is always the endpoint we have been connected to. + +.. note:: + There is a small performance benefit in using connection based UDP API over the connectionless one. + If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security + check, while in the case of connection-based UDP the security check is cached after connect, thus writes does + not suffer an additional performance penalty. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + + +Architecture in-depth +--------------------- + +For further details on the design and internal architecture see :ref:`io-layer`. + +.. _spray.io: http://spray.io diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 2ebb6d2b49..31dfa6e4ee 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -13,6 +13,10 @@ more general consumption as an actor-based service. This documentation is in progress and some sections may be incomplete. More will be coming. +.. toctree:: + + io-old + .. note:: The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old` @@ -378,4 +382,4 @@ Architecture in-depth For further details on the design and internal architecture see :ref:`io-layer`. -.. _spray.io: http://spray.io \ No newline at end of file +.. _spray.io: http://spray.io