Edit of I/O docs. Fixes #3047
This commit is contained in:
parent
5e54ddaa67
commit
3f31015091
4 changed files with 223 additions and 127 deletions
|
|
@ -38,13 +38,14 @@ public class IODocTest {
|
|||
final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1",
|
||||
12345);
|
||||
tcp.tell(TcpMessage.connect(remoteAddr), getSelf());
|
||||
// or with socket options
|
||||
//#connect
|
||||
//#connect-with-options
|
||||
final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1",
|
||||
1234);
|
||||
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
|
||||
options.add(TcpSO.keepAlive(true));
|
||||
tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf());
|
||||
//#connect
|
||||
//#connect-with-options
|
||||
} else
|
||||
//#connected
|
||||
if (msg instanceof Tcp.Connected) {
|
||||
|
|
|
|||
|
|
@ -38,14 +38,15 @@ public class UdpConnDocTest {
|
|||
final InetSocketAddress remoteAddr =
|
||||
new InetSocketAddress("127.0.0.1", 12345);
|
||||
udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf());
|
||||
// or with socket options
|
||||
//#connect
|
||||
//#connect-with-options
|
||||
final InetSocketAddress localAddr =
|
||||
new InetSocketAddress("127.0.0.1", 1234);
|
||||
final List<Inet.SocketOption> options =
|
||||
new ArrayList<Inet.SocketOption>();
|
||||
options.add(UdpSO.broadcast(true));
|
||||
udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf());
|
||||
//#connect
|
||||
//#connect-with-options
|
||||
} else
|
||||
//#connected
|
||||
if (msg instanceof UdpConn.Connected) {
|
||||
|
|
|
|||
|
|
@ -7,75 +7,97 @@ Introduction
|
|||
------------
|
||||
|
||||
The ``akka.io`` package has been developed in collaboration between the Akka
|
||||
and `spray.io`_ teams. Its design incorporates the experiences with the
|
||||
``spray-io`` module along with improvements that were jointly developed for
|
||||
and `spray.io`_ teams. Its design combines experiences from the
|
||||
``spray-io`` module with improvements that were jointly developed for
|
||||
more general consumption as an actor-based service.
|
||||
|
||||
This documentation is in progress and some sections may be incomplete. More will be coming.
|
||||
|
||||
Terminology, Concepts
|
||||
---------------------
|
||||
The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of
|
||||
direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves
|
||||
as the entry point for the API. The manager is accessible through an extension, for example the following code
|
||||
The I/O API is completely actor based, meaning that all operations are implemented with message passing instead of
|
||||
direct method calls. Every I/O driver (TCP, UDP) has a special actor, called a *manager* that serves
|
||||
as an entry point for the API. I/O is broken into several drivers. The manager for a particular driver
|
||||
is accessible by querying an ``ActorSystem``. For example the following code
|
||||
looks up the TCP manager and returns its ``ActorRef``:
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#manager
|
||||
|
||||
For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the
|
||||
API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates
|
||||
an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending
|
||||
messages to the connection actor which announces itself by sending a ``Connected`` message.
|
||||
The manager receives I/O command messages and instantiates worker actors in response. The worker actors present
|
||||
themselves to the API user in the reply to the command that was sent. For example after a ``Connect`` command sent to
|
||||
the TCP manager the manager creates an actor representing the TCP connection. All operations related to the given TCP
|
||||
connections can be invoked by sending messages to the connection actor which announces itself by sending a ``Connected``
|
||||
message.
|
||||
|
||||
DeathWatch and Resource Management
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections,
|
||||
incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the
|
||||
resources assigned to them are automatically released when the listener stops. This design makes the API more robust
|
||||
against resource leaks.
|
||||
|
||||
I/O worker actors receive commands and also send out events. They usually need a user-side counterpart actor listening
|
||||
for these events (such events could be inbound connections, incoming bytes or acknowledgements for writes). These worker
|
||||
actors *watch* their listener counterparts. If the listener stops then the worker will automatically release any
|
||||
resources that it holds. This design makes the API more robust against resource leaks.
|
||||
|
||||
Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor
|
||||
responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates.
|
||||
responsible for handling a connection can watch the connection actor to be notified if it unexpectedly terminates.
|
||||
|
||||
Write models (Ack, Nack)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an
|
||||
application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has
|
||||
not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite.
|
||||
Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by
|
||||
explicitly notifying it when the next chunk is possible to be written or buffered.
|
||||
I/O devices have a maximum throughput which limits the frequency and size of writes. When an
|
||||
application tries to push more data than a device can handle, the driver has to buffer bytes until the device
|
||||
is able to write them. With buffering it is possible to handle short bursts of intensive writes --- but no buffer is infinite.
|
||||
"Flow control" is needed to avoid overwhelming device buffers.
|
||||
|
||||
Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled
|
||||
by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by
|
||||
the worker to notify the writer about the success.
|
||||
Akka supports two types of flow control:
|
||||
|
||||
If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in
|
||||
the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that
|
||||
in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a
|
||||
write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent.
|
||||
* *Ack-based*, where the driver notifies the writer when writes have succeeded.
|
||||
|
||||
* *Nack-based*, where the driver notifies the writer when writes have failed.
|
||||
|
||||
Each of these models is available in both the TCP and the UDP implementations of Akka I/O.
|
||||
|
||||
Individual writes can be acknowledged by providing an ack object in the write message (``Write`` in the case of TCP and
|
||||
``Send`` for UDP). When the write is complete the worker will send the ack object to the writing actor. This can be
|
||||
used to implement *ack-based* flow control; sending new data only when old data has been acknowledged.
|
||||
|
||||
If a write (or any other command) fails, the driver notifies the actor that sent the command with a special message
|
||||
(``CommandFailed`` in the case of UDP and TCP). This message will also notify the writer of a failed write, serving as a
|
||||
nack for that write. Please note, that in a nack-based flow-control setting the writer has to be prepared for the fact
|
||||
that the failed write might not be the most recent write it sent. For example, the failure notification for a write
|
||||
``W1`` might arrive after additional write commands ``W2`` and ``W3`` have been sent. If the writer wants to resend any
|
||||
nacked messages it may need to keep a buffer of pending messages.
|
||||
|
||||
.. warning::
|
||||
An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack
|
||||
protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the
|
||||
I/O driver is ready to accept a new one.
|
||||
An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write simply signals that
|
||||
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
|
||||
not error handling. In other words, data may still be lost, even if every write is acknowledged.
|
||||
|
||||
ByteString
|
||||
^^^^^^^^^^
|
||||
|
||||
A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed.
|
||||
To maintain isolation, actors should communicate with immutable objects only. ``ByteString`` is an
|
||||
immutable container for bytes. It is used by Akka's I/O system as an efficient, immutable alternative
|
||||
the traditional byte containers used for I/O on the JVM, such as ``byte[]`` and ``ByteBuffer``.
|
||||
|
||||
``ByteString`` is a `Rope-like <http://en.wikipedia.org/wiki/Rope_(computer_science)>`_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice.
|
||||
``ByteString`` is a `rope-like <http://en.wikipedia.org/wiki/Rope_(computer_science)>`_ data structure that is immutable
|
||||
and provides fast concatenation and slicing operations (perfect for I/O). When two ``ByteString``\s are concatenated
|
||||
together they are both stored within the resulting ``ByteString`` instead of copying both to a new array. Operations
|
||||
such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original array, but just change the
|
||||
offset and length that is visible. Great care has also been taken to make sure that the internal array cannot be
|
||||
modified. Whenever a potentially unsafe array is used to create a new ``ByteString`` a defensive copy is created. If
|
||||
you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to
|
||||
get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will
|
||||
result in copying all bytes in that slice.
|
||||
|
||||
``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc.
|
||||
|
||||
``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods:
|
||||
``ByteString`` also comes with its own optimized builder and iterator classes ``ByteStringBuilder`` and
|
||||
``ByteIterator`` which provide extra features in addition to those of normal builders and iterators.
|
||||
|
||||
Compatibility with java.io
|
||||
..........................
|
||||
|
||||
A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams.
|
||||
A ``ByteStringBuilder`` can be wrapped in a ``java.io.OutputStream`` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams.
|
||||
|
||||
Using TCP
|
||||
---------
|
||||
|
|
@ -84,12 +106,15 @@ The following imports are assumed throughout this section:
|
|||
|
||||
.. includecode:: code/docs/io/IODocTest.java#imports
|
||||
|
||||
As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager:
|
||||
All of the Akka I/O APIs are accessed through manager objects. When using an I/O API, the first step is to acquire a
|
||||
reference to the appropriate manager. The code below shows how to acquire a reference to the ``Tcp`` manager.
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#manager
|
||||
|
||||
This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for
|
||||
specific tasks, like listening to incoming connections.
|
||||
The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates
|
||||
workers for specific tasks, such as listening to incoming connections.
|
||||
|
||||
.. _connecting-java:
|
||||
|
||||
Connecting
|
||||
^^^^^^^^^^
|
||||
|
|
@ -98,7 +123,11 @@ The first step of connecting to a remote address is sending a ``Connect`` messag
|
|||
|
||||
.. includecode:: code/docs/io/IODocTest.java#connect
|
||||
|
||||
After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the
|
||||
When connecting, it is also possible to set various socket options or specify a local address:
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#connect-with-options
|
||||
|
||||
After issuing the ``Connect`` command the TCP manager spawns a worker actor to handle commands related to the
|
||||
connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the
|
||||
``Connect`` command.
|
||||
|
||||
|
|
@ -109,27 +138,29 @@ associated with the connection. To finish the connection setup a ``Register``
|
|||
has to be sent to the connection actor with the listener ``ActorRef`` as a
|
||||
parameter, which therefore done in the last line above.
|
||||
|
||||
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime the listener may receive various event notifications:
|
||||
Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter.
|
||||
If the listener actor stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime of the connection the listener may receive various event notifications:
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#received
|
||||
|
||||
``ConnectionClosed`` is a trait, which the different connection close events all implement.
|
||||
The last line handles all connection close events in the same way. It is possible to listen for more fine-grained
|
||||
connection events, see the appropriate section below.
|
||||
connection close events, see :ref:`closing-connections-java` below.
|
||||
|
||||
|
||||
Accepting connections
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager:
|
||||
To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager.
|
||||
This will instruct the TCP manager to listen for TCP connections on a particular address.
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#bind
|
||||
|
||||
The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept
|
||||
incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when
|
||||
an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose
|
||||
sender is the connection actor:
|
||||
incoming connections. The process for accepting connections is similar to the process for making :ref:`outgoing
|
||||
connections <connecting-java>`: when an incoming connection is established, the actor provided as ``handler`` will
|
||||
receive a ``Connected`` message whose sender is the connection actor.
|
||||
|
||||
.. includecode:: code/docs/io/IODocTest.java#connected
|
||||
|
||||
|
|
@ -138,11 +169,13 @@ associated with the connection. To finish the connection setup a ``Register``
|
|||
has to be sent to the connection actor with the listener ``ActorRef`` as a
|
||||
parameter, which therefore done in the last line above.
|
||||
|
||||
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||
Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter.
|
||||
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime the listener will receive various event notifications in the same way as we has seen in the outbound
|
||||
connection lifetime the listener will receive various event notifications in the same way as in the outbound
|
||||
connection case.
|
||||
|
||||
.. _closing-connections-java:
|
||||
|
||||
Closing connections
|
||||
^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
@ -151,14 +184,14 @@ actor.
|
|||
|
||||
``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from
|
||||
the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with
|
||||
``Closed``
|
||||
``Closed``.
|
||||
|
||||
``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives
|
||||
will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is
|
||||
successful, the listener will be notified with ``ConfirmedClosed``
|
||||
successful, the listener will be notified with ``ConfirmedClosed``.
|
||||
|
||||
``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending
|
||||
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``
|
||||
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``.
|
||||
|
||||
``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint.
|
||||
|
||||
|
|
@ -175,10 +208,18 @@ Throttling Reads and Writes
|
|||
Using UDP
|
||||
---------
|
||||
|
||||
UDP support comes in two flavors: connectionless, and connection based:
|
||||
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
|
||||
to any remote address. Connection-based UDP workers are linked to a single remote address.
|
||||
|
||||
The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending
|
||||
UDP datagrams.
|
||||
|
||||
.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager
|
||||
|
||||
The connection-based UDP manager is accessed through ``UdpConn``.
|
||||
|
||||
.. includecode:: code/docs/io/UdpConnDocTest.java#manager
|
||||
|
||||
UDP servers can be only implemented by the connectionless API, but clients can use both.
|
||||
|
||||
Connectionless UDP
|
||||
|
|
@ -192,7 +233,7 @@ Simple Send
|
|||
............
|
||||
|
||||
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
|
||||
manager:
|
||||
``UdpFF`` manager:
|
||||
|
||||
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend
|
||||
|
||||
|
|
@ -225,7 +266,7 @@ The actor passed in the ``handler`` parameter will receive inbound UDP datagrams
|
|||
|
||||
The ``Received`` message contains the payload of the datagram and the address of the sender.
|
||||
|
||||
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
|
||||
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
|
||||
|
||||
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send
|
||||
|
||||
|
|
@ -238,14 +279,18 @@ It is also possible to send UDP datagrams using the ``ActorRef`` of the worker s
|
|||
Connection based UDP
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but
|
||||
the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will
|
||||
The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but
|
||||
the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will
|
||||
receive datagrams only from that address.
|
||||
|
||||
Connecting is similar to what we have seen in the previous section:
|
||||
|
||||
.. includecode:: code/docs/io/UdpConnDocTest.java#connect
|
||||
|
||||
Or, with more options:
|
||||
|
||||
.. includecode:: code/docs/io/UdpConnDocTest.java#connect-with-options
|
||||
|
||||
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
|
||||
this message is the worker for the UDP connection.
|
||||
|
||||
|
|
@ -256,13 +301,14 @@ The actor passed in the ``handler`` parameter will receive inbound UDP datagrams
|
|||
.. includecode:: code/docs/io/UdpConnDocTest.java#received
|
||||
|
||||
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
|
||||
will be provided, as an UDP connection only receives messages from the endpoint it has been connected to.
|
||||
is provided, as a UDP connection only receives messages from the endpoint it has been connected to.
|
||||
|
||||
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
|
||||
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
|
||||
|
||||
.. includecode:: code/docs/io/UdpConnDocTest.java#send
|
||||
|
||||
Again, the send does not contain a remote address, as it is always the endpoint we have been connected to.
|
||||
Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address
|
||||
will always be the endpoint we originally connected to.
|
||||
|
||||
.. note::
|
||||
There is a small performance benefit in using connection based UDP API over the connectionless one.
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ Introduction
|
|||
------------
|
||||
|
||||
The ``akka.io`` package has been developed in collaboration between the Akka
|
||||
and `spray.io`_ teams. Its design incorporates the experiences with the
|
||||
``spray-io`` module along with improvements that were jointly developed for
|
||||
and `spray.io`_ teams. Its design combines experiences from the
|
||||
``spray-io`` module with improvements that were jointly developed for
|
||||
more general consumption as an actor-based service.
|
||||
|
||||
This documentation is in progress and some sections may be incomplete. More will be coming.
|
||||
|
|
@ -22,71 +22,93 @@ This documentation is in progress and some sections may be incomplete. More will
|
|||
|
||||
Terminology, Concepts
|
||||
---------------------
|
||||
The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of
|
||||
direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves
|
||||
as the entry point for the API. The manager is accessible through an extension, for example the following code
|
||||
The I/O API is completely actor based, meaning that all operations are implemented with message passing instead of
|
||||
direct method calls. Every I/O driver (TCP, UDP) has a special actor, called a *manager* that serves
|
||||
as an entry point for the API. I/O is broken into several drivers. The manager for a particular driver
|
||||
is accessible through the ``IO`` entry point. For example the following code
|
||||
looks up the TCP manager and returns its ``ActorRef``:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
val tcpManager = IO(Tcp)
|
||||
|
||||
For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the
|
||||
API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates
|
||||
an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending
|
||||
messages to the connection actor which announces itself by sending a ``Connected`` message.
|
||||
The manager receives I/O command messages and instantiates worker actors in response. The worker actors present
|
||||
themselves to the API user in the reply to the command that was sent. For example after a ``Connect`` command sent to
|
||||
the TCP manager the manager creates an actor representing the TCP connection. All operations related to the given TCP
|
||||
connections can be invoked by sending messages to the connection actor which announces itself by sending a ``Connected``
|
||||
message.
|
||||
|
||||
DeathWatch and Resource Management
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections,
|
||||
incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the
|
||||
resources assigned to them are automatically released when the listener stops. This design makes the API more robust
|
||||
against resource leaks.
|
||||
I/O worker actors receive commands and also send out events. They usually need a user-side counterpart actor listening
|
||||
for these events (such events could be inbound connections, incoming bytes or acknowledgements for writes). These worker
|
||||
actors *watch* their listener counterparts. If the listener stops then the worker will automatically release any
|
||||
resources that it holds. This design makes the API more robust against resource leaks.
|
||||
|
||||
Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor
|
||||
responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates.
|
||||
responsible for handling a connection can watch the connection actor to be notified if it unexpectedly terminates.
|
||||
|
||||
Write models (Ack, Nack)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an
|
||||
application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has
|
||||
not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite.
|
||||
Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by
|
||||
explicitly notifying it when the next chunk is possible to be written or buffered.
|
||||
I/O devices have a maximum throughput which limits the frequency and size of writes. When an
|
||||
application tries to push more data than a device can handle, the driver has to buffer bytes until the device
|
||||
is able to write them. With buffering it is possible to handle short bursts of intensive writes --- but no buffer is infinite.
|
||||
"Flow control" is needed to avoid overwhelming device buffers.
|
||||
|
||||
Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled
|
||||
by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by
|
||||
the worker to notify the writer about the success.
|
||||
Akka supports two types of flow control:
|
||||
|
||||
If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in
|
||||
the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that
|
||||
in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a
|
||||
write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent.
|
||||
* *Ack-based*, where the driver notifies the writer when writes have succeeded.
|
||||
|
||||
* *Nack-based*, where the driver notifies the writer when writes have failed.
|
||||
|
||||
Each of these models is available in both the TCP and the UDP implementations of Akka I/O.
|
||||
|
||||
Individual writes can be acknowledged by providing an ack object in the write message (``Write`` in the case of TCP and
|
||||
``Send`` for UDP). When the write is complete the worker will send the ack object to the writing actor. This can be
|
||||
used to implement *ack-based* flow control; sending new data only when old data has been acknowledged.
|
||||
|
||||
If a write (or any other command) fails, the driver notifies the actor that sent the command with a special message
|
||||
(``CommandFailed`` in the case of UDP and TCP). This message will also notify the writer of a failed write, serving as a
|
||||
nack for that write. Please note, that in a nack-based flow-control setting the writer has to be prepared for the fact
|
||||
that the failed write might not be the most recent write it sent. For example, the failure notification for a write
|
||||
``W1`` might arrive after additional write commands ``W2`` and ``W3`` have been sent. If the writer wants to resend any
|
||||
nacked messages it may need to keep a buffer of pending messages.
|
||||
|
||||
.. warning::
|
||||
An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack
|
||||
protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the
|
||||
I/O driver is ready to accept a new one.
|
||||
An acknowledged write does not mean acknowledged delivery or storage; receiving an ack for a write simply signals that
|
||||
the I/O driver has successfully processed the write. The Ack/Nack protocol described here is a means of flow control
|
||||
not error handling. In other words, data may still be lost, even if every write is acknowledged.
|
||||
|
||||
ByteString
|
||||
^^^^^^^^^^
|
||||
|
||||
A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed.
|
||||
To maintain isolation, actors should communicate with immutable objects only. ``ByteString`` is an
|
||||
immutable container for bytes. It is used by Akka's I/O system as an efficient, immutable alternative
|
||||
the traditional byte containers used for I/O on the JVM, such as ``Array[Byte]`` and ``ByteBuffer``.
|
||||
|
||||
``ByteString`` is a `Rope-like <http://en.wikipedia.org/wiki/Rope_(computer_science)>`_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice.
|
||||
``ByteString`` is a `rope-like <http://en.wikipedia.org/wiki/Rope_(computer_science)>`_ data structure that is immutable
|
||||
and provides fast concatenation and slicing operations (perfect for I/O). When two ``ByteString``\s are concatenated
|
||||
together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations
|
||||
such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the
|
||||
offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be
|
||||
modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If
|
||||
you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to
|
||||
get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will
|
||||
result in copying all bytes in that slice.
|
||||
|
||||
``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc.
|
||||
|
||||
``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods:
|
||||
``ByteString`` also comes with its own optimized builder and iterator classes ``ByteStringBuilder`` and
|
||||
``ByteIterator`` which provide extra features in addition to those of normal builders and iterators.
|
||||
|
||||
Compatibility with java.io
|
||||
..........................
|
||||
|
||||
A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams.
|
||||
A ``ByteStringBuilder`` can be wrapped in a ``java.io.OutputStream`` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams.
|
||||
|
||||
Encoding and decoding of binary data
|
||||
Encoding and decoding binary data
|
||||
....................................
|
||||
|
||||
``ByteStringBuilder`` and ``ByteIterator`` support encoding and decoding of binary data. As an example, consider a stream of binary data frames with the following format:
|
||||
|
|
@ -102,7 +124,7 @@ Encoding and decoding of binary data
|
|||
}
|
||||
data: m times Double
|
||||
|
||||
In this example, the data is to be stored in arrays of ``a``, ``b`` and ``data``.
|
||||
In this example, the data will be stored in arrays of ``a``, ``b`` of length ``n`` and ``data`` of length ``m``.
|
||||
|
||||
Decoding of such frames can be efficiently implemented in the following fashion:
|
||||
|
||||
|
|
@ -111,12 +133,14 @@ Decoding of such frames can be efficiently implemented in the following fashion:
|
|||
|
||||
This implementation naturally follows the example data format. In a true Scala application, one might, of course, want use specialized immutable Short/Long/Double containers instead of mutable Arrays.
|
||||
|
||||
After extracting data from a ``ByteIterator``, the remaining content can also be turned back into a ``ByteString`` using the ``toSeq`` method
|
||||
After extracting data from a ``ByteIterator``, the remaining content can also be turned back into a ``ByteString`` using
|
||||
the ``toSeq`` method. No bytes are copied. Because of immutability the underlying bytes can be shared between both the
|
||||
``ByteIterator`` and the ``ByteString``.
|
||||
|
||||
.. includecode:: code/docs/io/BinaryCoding.scala
|
||||
:include: rest-to-seq
|
||||
|
||||
with no copying from bytes to rest involved. In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings.
|
||||
In general, conversions from ByteString to ByteIterator and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings.
|
||||
|
||||
Encoding of data also is very natural, using ``ByteStringBuilder``
|
||||
|
||||
|
|
@ -126,7 +150,8 @@ Encoding of data also is very natural, using ``ByteStringBuilder``
|
|||
Using TCP
|
||||
---------
|
||||
|
||||
As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager:
|
||||
All of the Akka I/O APIs are accessed through manager objects. When using an I/O API, the first step is to acquire a
|
||||
reference to the appropriate manager. The code below shows how to acquire a reference to the ``Tcp`` manager.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
|
|
@ -134,8 +159,10 @@ As with all of the Akka I/O APIs, everything starts with acquiring a reference t
|
|||
import akka.io.Tcp
|
||||
val tcpManager = IO(Tcp)
|
||||
|
||||
This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for
|
||||
specific tasks, like listening to incoming connections.
|
||||
The manager is an actor that handles the underlying low level I/O resources (selectors, channels) and instantiates
|
||||
workers for specific tasks, such as listening to incoming connections.
|
||||
|
||||
.. _connecting-scala:
|
||||
|
||||
Connecting
|
||||
^^^^^^^^^^
|
||||
|
|
@ -146,10 +173,14 @@ The first step of connecting to a remote address is sending a ``Connect`` messag
|
|||
|
||||
import akka.io.Tcp._
|
||||
IO(Tcp) ! Connect(remoteSocketAddress)
|
||||
// It is also possible to set various socket options or specify a local address:
|
||||
|
||||
When connecting, it is also possible to set various socket options or specify a local address:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
IO(Tcp) ! Connect(remoteSocketAddress, Some(localSocketAddress), List(SO.KeepAlive(true)))
|
||||
|
||||
After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the
|
||||
After issuing the ``Connect`` command the TCP manager spawns a worker actor to handle commands related to the
|
||||
connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the
|
||||
``Connect`` command.
|
||||
|
||||
|
|
@ -165,9 +196,9 @@ has to be sent to the connection actor with the listener ``ActorRef`` as a param
|
|||
|
||||
connectionActor ! Register(listener)
|
||||
|
||||
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime the listener may receive various event notifications:
|
||||
Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter.
|
||||
If the listener actor stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime of the connection the listener may receive various event notifications:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
|
|
@ -175,14 +206,16 @@ lifetime the listener may receive various event notifications:
|
|||
case CommandFailed(cmd) => // handle failure of command: cmd
|
||||
case _: ConnectionClosed => // handle closed connections
|
||||
|
||||
``ConnectionClosed`` is a trait, which the different connection close events all implement.
|
||||
The last line handles all connection close events in the same way. It is possible to listen for more fine-grained
|
||||
connection events, see the appropriate section below.
|
||||
connection close events, see :ref:`closing-connections-scala` below.
|
||||
|
||||
|
||||
Accepting connections
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager:
|
||||
To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager.
|
||||
This will instruct the TCP manager to listen for TCP connections on a particular address.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
|
|
@ -191,9 +224,9 @@ To create a TCP server and listen for inbound connection, a ``Bind`` command has
|
|||
IO(Tcp) ! Bind(handler, localAddress)
|
||||
|
||||
The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept
|
||||
incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when
|
||||
an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose
|
||||
sender is the connection actor:
|
||||
incoming connections. The process for accepting connections is similar to the process for making :ref:`outgoing
|
||||
connections <connecting-scala>`: when an incoming connection is established, the actor provided as ``handler`` will
|
||||
receive a ``Connected`` message whose sender is the connection actor.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
|
|
@ -207,11 +240,13 @@ has to be sent to the connection actor with the listener ``ActorRef`` as a param
|
|||
|
||||
connectionActor ! Register(listener)
|
||||
|
||||
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||
Upon registration, the connection actor will watch the listener actor provided in the ``listener`` parameter.
|
||||
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||
lifetime the listener will receive various event notifications in the same way as we has seen in the outbound
|
||||
connection lifetime the listener will receive various event notifications in the same way as in the outbound
|
||||
connection case.
|
||||
|
||||
.. _closing-connections-scala:
|
||||
|
||||
Closing connections
|
||||
^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
@ -220,14 +255,14 @@ actor.
|
|||
|
||||
``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from
|
||||
the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with
|
||||
``Closed``
|
||||
``Closed``.
|
||||
|
||||
``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives
|
||||
will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is
|
||||
successful, the listener will be notified with ``ConfirmedClosed``
|
||||
successful, the listener will be notified with ``ConfirmedClosed``.
|
||||
|
||||
``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending
|
||||
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``
|
||||
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``.
|
||||
|
||||
``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint.
|
||||
|
||||
|
|
@ -244,14 +279,22 @@ Throttling Reads and Writes
|
|||
Using UDP
|
||||
---------
|
||||
|
||||
UDP support comes in two flavors: connectionless, and connection based:
|
||||
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
|
||||
to any remote address. Connection-based UDP workers are linked to a single remote address.
|
||||
|
||||
The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending
|
||||
UDP datagrams.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.io.IO
|
||||
import akka.io.UdpFF
|
||||
val connectionLessUdp = IO(UdpFF)
|
||||
// ... or ...
|
||||
|
||||
The connection-based UDP manager is accessed through ``UdpConn``.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
import akka.io.UdpConn
|
||||
val connectionBasedUdp = IO(UdpConn)
|
||||
|
||||
|
|
@ -264,7 +307,7 @@ Simple Send
|
|||
............
|
||||
|
||||
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
|
||||
manager:
|
||||
``UdpFF`` manager:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
|
|
@ -328,8 +371,8 @@ It is also possible to send UDP datagrams using the ``ActorRef`` of the worker s
|
|||
Connection based UDP
|
||||
^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but
|
||||
the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will
|
||||
The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but
|
||||
the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will
|
||||
receive datagrams only from that address.
|
||||
|
||||
Connecting is similar to what we have seen in the previous section:
|
||||
|
|
@ -337,7 +380,11 @@ Connecting is similar to what we have seen in the previous section:
|
|||
.. code-block:: scala
|
||||
|
||||
IO(UdpConn) ! Connect(handler, remoteAddress)
|
||||
// or, with more options:
|
||||
|
||||
Or, with more options:
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
|
||||
|
||||
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
|
||||
|
|
@ -355,15 +402,16 @@ The actor passed in the ``handler`` parameter will receive inbound UDP datagrams
|
|||
case Received(dataByteString) => // Do something with the data
|
||||
|
||||
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
|
||||
will be provided, as an UDP connection only receives messages from the endpoint it has been connected to.
|
||||
is provided, as a UDP connection only receives messages from the endpoint it has been connected to.
|
||||
|
||||
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
|
||||
UDP datagrams can be sent by sending a ``Send`` message to the worker actor.
|
||||
|
||||
.. code-block:: scala
|
||||
|
||||
udpConnectionActor ! Send(data)
|
||||
|
||||
Again, the send does not contain a remote address, as it is always the endpoint we have been connected to.
|
||||
Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address
|
||||
will always be the endpoint we originally connected to.
|
||||
|
||||
.. note::
|
||||
There is a small performance benefit in using connection based UDP API over the connectionless one.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue