IO layer: add Java API and docs for TCP
This commit is contained in:
parent
fe5769695f
commit
d71a541596
6 changed files with 502 additions and 8 deletions
|
|
@ -86,4 +86,12 @@ object Inet {
|
||||||
val TrafficClass = SO.TrafficClass
|
val TrafficClass = SO.TrafficClass
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait SoJavaFactories {
|
||||||
|
import SO._
|
||||||
|
def receiveBufferSize(size: Int) = ReceiveBufferSize(size)
|
||||||
|
def reuseAddress(on: Boolean) = ReuseAddress(on)
|
||||||
|
def sendBufferSize(size: Int) = SendBufferSize(size)
|
||||||
|
def trafficClass(tc: Int) = TrafficClass(tc)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import scala.concurrent.duration._
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
import java.lang.{ Iterable ⇒ JIterable }
|
||||||
|
|
||||||
object Tcp extends ExtensionKey[TcpExt] {
|
object Tcp extends ExtensionKey[TcpExt] {
|
||||||
|
|
||||||
|
|
@ -69,6 +70,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
endpoint: InetSocketAddress,
|
endpoint: InetSocketAddress,
|
||||||
backlog: Int = 100,
|
backlog: Int = 100,
|
||||||
options: immutable.Traversable[SocketOption] = Nil) extends Command
|
options: immutable.Traversable[SocketOption] = Nil) extends Command
|
||||||
|
|
||||||
case class Register(handler: ActorRef) extends Command
|
case class Register(handler: ActorRef) extends Command
|
||||||
case object Unbind extends Command
|
case object Unbind extends Command
|
||||||
|
|
||||||
|
|
@ -77,7 +79,8 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
case object ConfirmedClose extends CloseCommand
|
case object ConfirmedClose extends CloseCommand
|
||||||
case object Abort extends CloseCommand
|
case object Abort extends CloseCommand
|
||||||
|
|
||||||
case object NoAck
|
case class NoAck(token: Any)
|
||||||
|
object NoAck extends NoAck(null)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write data to the TCP connection. If no ack is needed use the special
|
* Write data to the TCP connection. If no ack is needed use the special
|
||||||
|
|
@ -86,7 +89,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
case class Write(data: ByteString, ack: Any) extends Command {
|
case class Write(data: ByteString, ack: Any) extends Command {
|
||||||
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
|
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
|
||||||
|
|
||||||
def wantsAck: Boolean = ack != NoAck
|
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
|
||||||
}
|
}
|
||||||
object Write {
|
object Write {
|
||||||
val Empty: Write = Write(ByteString.empty, NoAck)
|
val Empty: Write = Write(ByteString.empty, NoAck)
|
||||||
|
|
@ -106,12 +109,27 @@ object Tcp extends ExtensionKey[TcpExt] {
|
||||||
case object Bound extends Event
|
case object Bound extends Event
|
||||||
case object Unbound extends Event
|
case object Unbound extends Event
|
||||||
|
|
||||||
sealed trait ConnectionClosed extends Event
|
sealed trait ConnectionClosed extends Event {
|
||||||
|
def isAborted: Boolean = false
|
||||||
|
def isConfirmed: Boolean = false
|
||||||
|
def isPeerClosed: Boolean = false
|
||||||
|
def isErrorClosed: Boolean = false
|
||||||
|
def getErrorCause: String = null
|
||||||
|
}
|
||||||
case object Closed extends ConnectionClosed
|
case object Closed extends ConnectionClosed
|
||||||
case object Aborted extends ConnectionClosed
|
case object Aborted extends ConnectionClosed {
|
||||||
case object ConfirmedClosed extends ConnectionClosed
|
override def isAborted = true
|
||||||
case object PeerClosed extends ConnectionClosed
|
}
|
||||||
case class ErrorClosed(cause: String) extends ConnectionClosed
|
case object ConfirmedClosed extends ConnectionClosed {
|
||||||
|
override def isConfirmed = true
|
||||||
|
}
|
||||||
|
case object PeerClosed extends ConnectionClosed {
|
||||||
|
override def isPeerClosed = true
|
||||||
|
}
|
||||||
|
case class ErrorClosed(cause: String) extends ConnectionClosed {
|
||||||
|
override def isErrorClosed = true
|
||||||
|
override def getErrorCause = cause
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
||||||
|
|
@ -158,3 +176,51 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
||||||
|
|
||||||
val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize)
|
val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object TcpSO extends SoJavaFactories {
|
||||||
|
import Tcp.SO._
|
||||||
|
def keepAlive(on: Boolean) = KeepAlive(on)
|
||||||
|
def oobInline(on: Boolean) = OOBInline(on)
|
||||||
|
def tcpNoDelay(on: Boolean) = TcpNoDelay(on)
|
||||||
|
}
|
||||||
|
|
||||||
|
object TcpMessage {
|
||||||
|
import language.implicitConversions
|
||||||
|
import Tcp._
|
||||||
|
|
||||||
|
def connect(remoteAddress: InetSocketAddress,
|
||||||
|
localAddress: InetSocketAddress,
|
||||||
|
options: JIterable[SocketOption]) = Connect(remoteAddress, Some(localAddress), options)
|
||||||
|
def connect(remoteAddress: InetSocketAddress,
|
||||||
|
options: JIterable[SocketOption]) = Connect(remoteAddress, None, options)
|
||||||
|
def connect(remoteAddress: InetSocketAddress) = Connect(remoteAddress, None, Nil)
|
||||||
|
|
||||||
|
def bind(handler: ActorRef,
|
||||||
|
endpoint: InetSocketAddress,
|
||||||
|
backlog: Int,
|
||||||
|
options: JIterable[SocketOption]) = Bind(handler, endpoint, backlog, options)
|
||||||
|
def bind(handler: ActorRef,
|
||||||
|
endpoint: InetSocketAddress,
|
||||||
|
backlog: Int) = Bind(handler, endpoint, backlog, Nil)
|
||||||
|
|
||||||
|
def register(handler: ActorRef) = Register(handler)
|
||||||
|
def unbind = Unbind
|
||||||
|
|
||||||
|
def close = Close
|
||||||
|
def confirmedClose = ConfirmedClose
|
||||||
|
def abort = Abort
|
||||||
|
|
||||||
|
def noAck = NoAck
|
||||||
|
def noAck(token: AnyRef) = NoAck(token)
|
||||||
|
|
||||||
|
def write(data: ByteString) = Write(data)
|
||||||
|
def write(data: ByteString, ack: AnyRef) = Write(data, ack)
|
||||||
|
|
||||||
|
def stopReading = StopReading
|
||||||
|
def resumeReading = ResumeReading
|
||||||
|
|
||||||
|
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
coll.asScala.to
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
99
akka-docs/rst/java/code/docs/io/IODocTest.java
Normal file
99
akka-docs/rst/java/code/docs/io/IODocTest.java
Normal file
|
|
@ -0,0 +1,99 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.io;
|
||||||
|
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.UntypedActor;
|
||||||
|
//#imports
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.io.Inet;
|
||||||
|
import akka.io.Tcp;
|
||||||
|
import akka.io.TcpExt;
|
||||||
|
import akka.io.TcpMessage;
|
||||||
|
import akka.io.TcpSO;
|
||||||
|
import akka.util.ByteString;
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
public class IODocTest {
|
||||||
|
|
||||||
|
static public class Demo extends UntypedActor {
|
||||||
|
ActorRef connectionActor = null;
|
||||||
|
ActorRef listener = getSelf();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
if ("connect".equals(msg)) {
|
||||||
|
//#manager
|
||||||
|
final ActorRef tcp = Tcp.get(system).manager();
|
||||||
|
//#manager
|
||||||
|
//#connect
|
||||||
|
final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1",
|
||||||
|
12345);
|
||||||
|
tcp.tell(TcpMessage.connect(remoteAddr), getSelf());
|
||||||
|
// or with socket options
|
||||||
|
final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1",
|
||||||
|
1234);
|
||||||
|
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
|
||||||
|
options.add(TcpSO.keepAlive(true));
|
||||||
|
tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf());
|
||||||
|
//#connect
|
||||||
|
} else
|
||||||
|
//#connected
|
||||||
|
if (msg instanceof Tcp.Connected) {
|
||||||
|
final Tcp.Connected conn = (Tcp.Connected) msg;
|
||||||
|
connectionActor = getSender();
|
||||||
|
connectionActor.tell(TcpMessage.register(listener), getSelf());
|
||||||
|
}
|
||||||
|
//#connected
|
||||||
|
else
|
||||||
|
//#received
|
||||||
|
if (msg instanceof Tcp.Received) {
|
||||||
|
final Tcp.Received recv = (Tcp.Received) msg;
|
||||||
|
final ByteString data = recv.data();
|
||||||
|
// and do something with the received data ...
|
||||||
|
} else if (msg instanceof Tcp.CommandFailed) {
|
||||||
|
final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg;
|
||||||
|
final Tcp.Command command = failed.cmd();
|
||||||
|
// react to failed connect, bind, write, etc.
|
||||||
|
} else if (msg instanceof Tcp.ConnectionClosed) {
|
||||||
|
final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg;
|
||||||
|
if (closed.isAborted()) {
|
||||||
|
// handle close reasons like this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#received
|
||||||
|
else
|
||||||
|
if ("bind".equals(msg)) {
|
||||||
|
final ActorRef handler = getSelf();
|
||||||
|
//#bind
|
||||||
|
final ActorRef tcp = Tcp.get(system).manager();
|
||||||
|
final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1",
|
||||||
|
1234);
|
||||||
|
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
|
||||||
|
options.add(TcpSO.reuseAddress(true));
|
||||||
|
tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf());
|
||||||
|
//#bind
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static ActorSystem system;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
static public void setup() {
|
||||||
|
system = ActorSystem.create("IODocTest");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void demonstrateConnect() {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -20,6 +20,7 @@ Java API
|
||||||
stm
|
stm
|
||||||
agents
|
agents
|
||||||
transactors
|
transactors
|
||||||
|
io
|
||||||
fsm
|
fsm
|
||||||
extending-akka
|
extending-akka
|
||||||
zeromq
|
zeromq
|
||||||
|
|
|
||||||
316
akka-docs/rst/java/io.rst
Normal file
316
akka-docs/rst/java/io.rst
Normal file
|
|
@ -0,0 +1,316 @@
|
||||||
|
.. _io-java:
|
||||||
|
|
||||||
|
I/O (Java)
|
||||||
|
==========
|
||||||
|
|
||||||
|
Introduction
|
||||||
|
------------
|
||||||
|
|
||||||
|
The ``akka.io`` package has been developed in collaboration between the Akka
|
||||||
|
and `spray.io`_ teams. Its design incorporates the experiences with the
|
||||||
|
``spray-io`` module along with improvements that were jointly developed for
|
||||||
|
more general consumption as an actor-based service.
|
||||||
|
|
||||||
|
This documentation is in progress and some sections may be incomplete. More will be coming.
|
||||||
|
|
||||||
|
Terminology, Concepts
|
||||||
|
---------------------
|
||||||
|
The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of
|
||||||
|
direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves
|
||||||
|
as the entry point for the API. The manager is accessible through an extension, for example the following code
|
||||||
|
looks up the TCP manager and returns its ``ActorRef``:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#manager
|
||||||
|
|
||||||
|
For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the
|
||||||
|
API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates
|
||||||
|
an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending
|
||||||
|
messages to the connection actor which announces itself by sending a ``Connected`` message.
|
||||||
|
|
||||||
|
DeathWatch and Resource Management
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections,
|
||||||
|
incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the
|
||||||
|
resources assigned to them are automatically released when the listener stops. This design makes the API more robust
|
||||||
|
against resource leaks.
|
||||||
|
|
||||||
|
Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor
|
||||||
|
responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates.
|
||||||
|
|
||||||
|
Write models (Ack, Nack)
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an
|
||||||
|
application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has
|
||||||
|
not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite.
|
||||||
|
Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by
|
||||||
|
explicitly notifying it when the next chunk is possible to be written or buffered.
|
||||||
|
|
||||||
|
Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled
|
||||||
|
by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by
|
||||||
|
the worker to notify the writer about the success.
|
||||||
|
|
||||||
|
If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in
|
||||||
|
the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that
|
||||||
|
in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a
|
||||||
|
write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack
|
||||||
|
protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the
|
||||||
|
I/O driver is ready to accept a new one.
|
||||||
|
|
||||||
|
ByteString
|
||||||
|
^^^^^^^^^^
|
||||||
|
|
||||||
|
A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed.
|
||||||
|
|
||||||
|
``ByteString`` is a `Rope-like <http://en.wikipedia.org/wiki/Rope_(computer_science)>`_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice.
|
||||||
|
|
||||||
|
``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc.
|
||||||
|
|
||||||
|
``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods:
|
||||||
|
|
||||||
|
Compatibility with java.io
|
||||||
|
..........................
|
||||||
|
|
||||||
|
A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams.
|
||||||
|
|
||||||
|
Using TCP
|
||||||
|
---------
|
||||||
|
|
||||||
|
The following imports are assumed throughout this section:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#imports
|
||||||
|
|
||||||
|
As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#manager
|
||||||
|
|
||||||
|
This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for
|
||||||
|
specific tasks, like listening to incoming connections.
|
||||||
|
|
||||||
|
Connecting
|
||||||
|
^^^^^^^^^^
|
||||||
|
|
||||||
|
The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#connect
|
||||||
|
|
||||||
|
After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the
|
||||||
|
connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the
|
||||||
|
``Connect`` command.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#connected
|
||||||
|
|
||||||
|
When receiving the :class:`Connected` message there is still no listener
|
||||||
|
associated with the connection. To finish the connection setup a ``Register``
|
||||||
|
has to be sent to the connection actor with the listener ``ActorRef`` as a
|
||||||
|
parameter, which therefore done in the last line above.
|
||||||
|
|
||||||
|
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||||
|
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||||
|
lifetime the listener may receive various event notifications:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#received
|
||||||
|
|
||||||
|
The last line handles all connection close events in the same way. It is possible to listen for more fine-grained
|
||||||
|
connection events, see the appropriate section below.
|
||||||
|
|
||||||
|
|
||||||
|
Accepting connections
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#bind
|
||||||
|
|
||||||
|
The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept
|
||||||
|
incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when
|
||||||
|
an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose
|
||||||
|
sender is the connection actor:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/io/IODocTest.java#connected
|
||||||
|
|
||||||
|
When receiving the :class:`Connected` message there is still no listener
|
||||||
|
associated with the connection. To finish the connection setup a ``Register``
|
||||||
|
has to be sent to the connection actor with the listener ``ActorRef`` as a
|
||||||
|
parameter, which therefore done in the last line above.
|
||||||
|
|
||||||
|
After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor.
|
||||||
|
If the listener stops, the connection is closed, and all resources allocated for the connection released. During the
|
||||||
|
lifetime the listener will receive various event notifications in the same way as we has seen in the outbound
|
||||||
|
connection case.
|
||||||
|
|
||||||
|
Closing connections
|
||||||
|
^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
A connection can be closed by sending one of the commands ``Close``, ``ConfirmedClose`` or ``Abort`` to the connection
|
||||||
|
actor.
|
||||||
|
|
||||||
|
``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from
|
||||||
|
the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with
|
||||||
|
``Closed``
|
||||||
|
|
||||||
|
``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives
|
||||||
|
will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is
|
||||||
|
successful, the listener will be notified with ``ConfirmedClosed``
|
||||||
|
|
||||||
|
``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending
|
||||||
|
writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted``
|
||||||
|
|
||||||
|
``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint.
|
||||||
|
|
||||||
|
``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed.
|
||||||
|
|
||||||
|
All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events
|
||||||
|
may handle all close events in the same way.
|
||||||
|
|
||||||
|
Throttling Reads and Writes
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
*This section is not yet ready. More coming soon*
|
||||||
|
|
||||||
|
Using UDP
|
||||||
|
---------
|
||||||
|
|
||||||
|
UDP support comes in two flavors: connectionless, and connection based:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
import akka.io.IO
|
||||||
|
import akka.io.UdpFF
|
||||||
|
val connectionLessUdp = IO(UdpFF)
|
||||||
|
// ... or ...
|
||||||
|
import akka.io.UdpConn
|
||||||
|
val connectionBasedUdp = IO(UdpConn)
|
||||||
|
|
||||||
|
UDP servers can be only implemented by the connectionless API, but clients can use both.
|
||||||
|
|
||||||
|
Connectionless UDP
|
||||||
|
^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
Simple Send
|
||||||
|
............
|
||||||
|
|
||||||
|
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
|
||||||
|
manager:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
IO(UdpFF) ! SimpleSender
|
||||||
|
// or with socket options:
|
||||||
|
import akka.io.Udp._
|
||||||
|
IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true)))
|
||||||
|
|
||||||
|
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
case SimpleSendReady =>
|
||||||
|
simpleSender = sender
|
||||||
|
|
||||||
|
After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple
|
||||||
|
message send:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
simpleSender ! Send(data, serverAddress)
|
||||||
|
|
||||||
|
|
||||||
|
Bind (and Send)
|
||||||
|
...............
|
||||||
|
|
||||||
|
To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP
|
||||||
|
manager
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
IO(UdpFF) ! Bind(handler, localAddress)
|
||||||
|
|
||||||
|
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
|
||||||
|
this message is the worker for the UDP channel bound to the local address.
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
case Bound =>
|
||||||
|
udpWorker = sender // Save the worker ref for later use
|
||||||
|
|
||||||
|
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
case Received(dataByteString, remoteAddress) => // Do something with the data
|
||||||
|
|
||||||
|
The ``Received`` message contains the payload of the datagram and the address of the sender.
|
||||||
|
|
||||||
|
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
udpWorker ! Send(data, serverAddress)
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case
|
||||||
|
the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined
|
||||||
|
ephemeral port.
|
||||||
|
|
||||||
|
Connection based UDP
|
||||||
|
^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but
|
||||||
|
the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will
|
||||||
|
receive datagrams only from that address.
|
||||||
|
|
||||||
|
Connecting is similar to what we have seen in the previous section:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
IO(UdpConn) ! Connect(handler, remoteAddress)
|
||||||
|
// or, with more options:
|
||||||
|
IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
|
||||||
|
|
||||||
|
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
|
||||||
|
this message is the worker for the UDP connection.
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
case Connected =>
|
||||||
|
udpConnectionActor = sender // Save the worker ref for later use
|
||||||
|
|
||||||
|
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
case Received(dataByteString) => // Do something with the data
|
||||||
|
|
||||||
|
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
|
||||||
|
will be provided, as an UDP connection only receives messages from the endpoint it has been connected to.
|
||||||
|
|
||||||
|
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
|
||||||
|
|
||||||
|
.. code-block:: scala
|
||||||
|
|
||||||
|
udpConnectionActor ! Send(data)
|
||||||
|
|
||||||
|
Again, the send does not contain a remote address, as it is always the endpoint we have been connected to.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
There is a small performance benefit in using connection based UDP API over the connectionless one.
|
||||||
|
If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security
|
||||||
|
check, while in the case of connection-based UDP the security check is cached after connect, thus writes does
|
||||||
|
not suffer an additional performance penalty.
|
||||||
|
|
||||||
|
Throttling Reads and Writes
|
||||||
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
*This section is not yet ready. More coming soon*
|
||||||
|
|
||||||
|
|
||||||
|
Architecture in-depth
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
For further details on the design and internal architecture see :ref:`io-layer`.
|
||||||
|
|
||||||
|
.. _spray.io: http://spray.io
|
||||||
|
|
@ -13,6 +13,10 @@ more general consumption as an actor-based service.
|
||||||
|
|
||||||
This documentation is in progress and some sections may be incomplete. More will be coming.
|
This documentation is in progress and some sections may be incomplete. More will be coming.
|
||||||
|
|
||||||
|
.. toctree::
|
||||||
|
|
||||||
|
io-old
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old`
|
The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old`
|
||||||
|
|
||||||
|
|
@ -378,4 +382,4 @@ Architecture in-depth
|
||||||
|
|
||||||
For further details on the design and internal architecture see :ref:`io-layer`.
|
For further details on the design and internal architecture see :ref:`io-layer`.
|
||||||
|
|
||||||
.. _spray.io: http://spray.io
|
.. _spray.io: http://spray.io
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue