From d71a541596102d4ec1dc07f99c264cc4611e09a9 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 15 Feb 2013 09:13:39 +0100 Subject: [PATCH 1/6] IO layer: add Java API and docs for TCP --- akka-actor/src/main/scala/akka/io/Inet.scala | 8 + akka-actor/src/main/scala/akka/io/Tcp.scala | 80 ++++- .../rst/java/code/docs/io/IODocTest.java | 99 ++++++ akka-docs/rst/java/index.rst | 1 + akka-docs/rst/java/io.rst | 316 ++++++++++++++++++ akka-docs/rst/scala/io.rst | 6 +- 6 files changed, 502 insertions(+), 8 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/io/IODocTest.java create mode 100644 akka-docs/rst/java/io.rst diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 0b9fb4ca0c..18c96594d6 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -86,4 +86,12 @@ object Inet { val TrafficClass = SO.TrafficClass } + trait SoJavaFactories { + import SO._ + def receiveBufferSize(size: Int) = ReceiveBufferSize(size) + def reuseAddress(on: Boolean) = ReuseAddress(on) + def sendBufferSize(size: Int) = SendBufferSize(size) + def trafficClass(tc: Int) = TrafficClass(tc) + } + } diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 97076477cf..a95e26867a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString import akka.actor._ +import java.lang.{ Iterable ⇒ JIterable } object Tcp extends ExtensionKey[TcpExt] { @@ -69,6 +70,7 @@ object Tcp extends ExtensionKey[TcpExt] { endpoint: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command + case class Register(handler: ActorRef) extends Command case object Unbind extends Command @@ -77,7 +79,8 @@ object Tcp extends ExtensionKey[TcpExt] { case object ConfirmedClose extends CloseCommand case object Abort extends CloseCommand - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) /** * Write data to the TCP connection. If no ack is needed use the special @@ -86,7 +89,7 @@ object Tcp extends ExtensionKey[TcpExt] { case class Write(data: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Write { val Empty: Write = Write(ByteString.empty, NoAck) @@ -106,12 +109,27 @@ object Tcp extends ExtensionKey[TcpExt] { case object Bound extends Event case object Unbound extends Event - sealed trait ConnectionClosed extends Event + sealed trait ConnectionClosed extends Event { + def isAborted: Boolean = false + def isConfirmed: Boolean = false + def isPeerClosed: Boolean = false + def isErrorClosed: Boolean = false + def getErrorCause: String = null + } case object Closed extends ConnectionClosed - case object Aborted extends ConnectionClosed - case object ConfirmedClosed extends ConnectionClosed - case object PeerClosed extends ConnectionClosed - case class ErrorClosed(cause: String) extends ConnectionClosed + case object Aborted extends ConnectionClosed { + override def isAborted = true + } + case object ConfirmedClosed extends ConnectionClosed { + override def isConfirmed = true + } + case object PeerClosed extends ConnectionClosed { + override def isPeerClosed = true + } + case class ErrorClosed(cause: String) extends ConnectionClosed { + override def isErrorClosed = true + override def getErrorCause = cause + } } class TcpExt(system: ExtendedActorSystem) extends IO.Extension { @@ -158,3 +176,51 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) } + +object TcpSO extends SoJavaFactories { + import Tcp.SO._ + def keepAlive(on: Boolean) = KeepAlive(on) + def oobInline(on: Boolean) = OOBInline(on) + def tcpNoDelay(on: Boolean) = TcpNoDelay(on) +} + +object TcpMessage { + import language.implicitConversions + import Tcp._ + + def connect(remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]) = Connect(remoteAddress, Some(localAddress), options) + def connect(remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]) = Connect(remoteAddress, None, options) + def connect(remoteAddress: InetSocketAddress) = Connect(remoteAddress, None, Nil) + + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int, + options: JIterable[SocketOption]) = Bind(handler, endpoint, backlog, options) + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int) = Bind(handler, endpoint, backlog, Nil) + + def register(handler: ActorRef) = Register(handler) + def unbind = Unbind + + def close = Close + def confirmedClose = ConfirmedClose + def abort = Abort + + def noAck = NoAck + def noAck(token: AnyRef) = NoAck(token) + + def write(data: ByteString) = Write(data) + def write(data: ByteString, ack: AnyRef) = Write(data, ack) + + def stopReading = StopReading + def resumeReading = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-docs/rst/java/code/docs/io/IODocTest.java b/akka-docs/rst/java/code/docs/io/IODocTest.java new file mode 100644 index 0000000000..583254e44d --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IODocTest.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.Tcp; +import akka.io.TcpExt; +import akka.io.TcpMessage; +import akka.io.TcpSO; +import akka.util.ByteString; +//#imports + +public class IODocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef listener = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef tcp = Tcp.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1", + 12345); + tcp.tell(TcpMessage.connect(remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.keepAlive(true)); + tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof Tcp.Connected) { + final Tcp.Connected conn = (Tcp.Connected) msg; + connectionActor = getSender(); + connectionActor.tell(TcpMessage.register(listener), getSelf()); + } + //#connected + else + //#received + if (msg instanceof Tcp.Received) { + final Tcp.Received recv = (Tcp.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof Tcp.CommandFailed) { + final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg; + final Tcp.Command command = failed.cmd(); + // react to failed connect, bind, write, etc. + } else if (msg instanceof Tcp.ConnectionClosed) { + final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg; + if (closed.isAborted()) { + // handle close reasons like this + } + } + //#received + else + if ("bind".equals(msg)) { + final ActorRef handler = getSelf(); + //#bind + final ActorRef tcp = Tcp.get(system).manager(); + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.reuseAddress(true)); + tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf()); + //#bind + } + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("IODocTest"); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/index.rst b/akka-docs/rst/java/index.rst index b0f29c27ef..81404409cb 100644 --- a/akka-docs/rst/java/index.rst +++ b/akka-docs/rst/java/index.rst @@ -20,6 +20,7 @@ Java API stm agents transactors + io fsm extending-akka zeromq diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst new file mode 100644 index 0000000000..aef55caba2 --- /dev/null +++ b/akka-docs/rst/java/io.rst @@ -0,0 +1,316 @@ +.. _io-java: + +I/O (Java) +========== + +Introduction +------------ + +The ``akka.io`` package has been developed in collaboration between the Akka +and `spray.io`_ teams. Its design incorporates the experiences with the +``spray-io`` module along with improvements that were jointly developed for +more general consumption as an actor-based service. + +This documentation is in progress and some sections may be incomplete. More will be coming. + +Terminology, Concepts +--------------------- +The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of +direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves +as the entry point for the API. The manager is accessible through an extension, for example the following code +looks up the TCP manager and returns its ``ActorRef``: + +.. includecode:: code/docs/io/IODocTest.java#manager + +For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the +API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates +an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending +messages to the connection actor which announces itself by sending a ``Connected`` message. + +DeathWatch and Resource Management +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections, +incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the +resources assigned to them are automatically released when the listener stops. This design makes the API more robust +against resource leaks. + +Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor +responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates. + +Write models (Ack, Nack) +^^^^^^^^^^^^^^^^^^^^^^^^ + +Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an +application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has +not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite. +Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by +explicitly notifying it when the next chunk is possible to be written or buffered. + +Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled +by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by +the worker to notify the writer about the success. + +If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in +the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that +in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a +write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent. + +.. warning:: + An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack + protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the + I/O driver is ready to accept a new one. + +ByteString +^^^^^^^^^^ + +A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed. + +``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice. + +``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc. + +``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods: + +Compatibility with java.io +.......................... + +A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. + +Using TCP +--------- + +The following imports are assumed throughout this section: + +.. includecode:: code/docs/io/IODocTest.java#imports + +As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager: + +.. includecode:: code/docs/io/IODocTest.java#manager + +This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for +specific tasks, like listening to incoming connections. + +Connecting +^^^^^^^^^^ + +The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#connect + +After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the +connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the +``Connect`` command. + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener may receive various event notifications: + +.. includecode:: code/docs/io/IODocTest.java#received + +The last line handles all connection close events in the same way. It is possible to listen for more fine-grained +connection events, see the appropriate section below. + + +Accepting connections +^^^^^^^^^^^^^^^^^^^^^ + +To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#bind + +The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept +incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when +an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose +sender is the connection actor: + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener will receive various event notifications in the same way as we has seen in the outbound +connection case. + +Closing connections +^^^^^^^^^^^^^^^^^^^ + +A connection can be closed by sending one of the commands ``Close``, ``ConfirmedClose`` or ``Abort`` to the connection +actor. + +``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from +the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with +``Closed`` + +``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives +will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is +successful, the listener will be notified with ``ConfirmedClosed`` + +``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending +writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted`` + +``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. + +``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. + +All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events +may handle all close events in the same way. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + +Using UDP +--------- + +UDP support comes in two flavors: connectionless, and connection based: + +.. code-block:: scala + + import akka.io.IO + import akka.io.UdpFF + val connectionLessUdp = IO(UdpFF) + // ... or ... + import akka.io.UdpConn + val connectionBasedUdp = IO(UdpConn) + +UDP servers can be only implemented by the connectionless API, but clients can use both. + +Connectionless UDP +^^^^^^^^^^^^^^^^^^ + +Simple Send +............ + +To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the +manager: + +.. code-block:: scala + + IO(UdpFF) ! SimpleSender + // or with socket options: + import akka.io.Udp._ + IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true))) + +The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: + +.. code-block:: scala + + case SimpleSendReady => + simpleSender = sender + +After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple +message send: + +.. code-block:: scala + + simpleSender ! Send(data, serverAddress) + + +Bind (and Send) +............... + +To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP +manager + +.. code-block:: scala + + IO(UdpFF) ! Bind(handler, localAddress) + +After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of +this message is the worker for the UDP channel bound to the local address. + +.. code-block:: scala + + case Bound => + udpWorker = sender // Save the worker ref for later use + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. code-block:: scala + + case Received(dataByteString, remoteAddress) => // Do something with the data + +The ``Received`` message contains the payload of the datagram and the address of the sender. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. code-block:: scala + + udpWorker ! Send(data, serverAddress) + +.. note:: + The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case + the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined + ephemeral port. + +Connection based UDP +^^^^^^^^^^^^^^^^^^^^ + +The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but +the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will +receive datagrams only from that address. + +Connecting is similar to what we have seen in the previous section: + +.. code-block:: scala + + IO(UdpConn) ! Connect(handler, remoteAddress) + // or, with more options: + IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) + +After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of +this message is the worker for the UDP connection. + +.. code-block:: scala + + case Connected => + udpConnectionActor = sender // Save the worker ref for later use + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. code-block:: scala + + case Received(dataByteString) => // Do something with the data + +The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address +will be provided, as an UDP connection only receives messages from the endpoint it has been connected to. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. code-block:: scala + + udpConnectionActor ! Send(data) + +Again, the send does not contain a remote address, as it is always the endpoint we have been connected to. + +.. note:: + There is a small performance benefit in using connection based UDP API over the connectionless one. + If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security + check, while in the case of connection-based UDP the security check is cached after connect, thus writes does + not suffer an additional performance penalty. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + + +Architecture in-depth +--------------------- + +For further details on the design and internal architecture see :ref:`io-layer`. + +.. _spray.io: http://spray.io diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 2ebb6d2b49..31dfa6e4ee 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -13,6 +13,10 @@ more general consumption as an actor-based service. This documentation is in progress and some sections may be incomplete. More will be coming. +.. toctree:: + + io-old + .. note:: The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old` @@ -378,4 +382,4 @@ Architecture in-depth For further details on the design and internal architecture see :ref:`io-layer`. -.. _spray.io: http://spray.io \ No newline at end of file +.. _spray.io: http://spray.io From 933c93c05b8d866ab78d816de039cee4f670c414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 15 Feb 2013 11:47:46 +0100 Subject: [PATCH 2/6] Added Java API for UdpFF --- akka-actor/src/main/scala/akka/io/Udp.scala | 7 +- akka-actor/src/main/scala/akka/io/UdpFF.scala | 41 ++++++- .../rst/java/code/docs/io/IOUdpFFDocTest.java | 101 ++++++++++++++++++ akka-docs/rst/java/io.rst | 45 +++----- 4 files changed, 155 insertions(+), 39 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 840dda666d..b52c684bae 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -4,7 +4,7 @@ package akka.io import java.net.DatagramSocket -import akka.io.Inet.SocketOption +import akka.io.Inet.{ SoJavaFactories, SocketOption } import com.typesafe.config.Config import akka.actor.{ Props, ActorSystemImpl } @@ -46,3 +46,8 @@ object Udp { } } + +object UdpSO extends SoJavaFactories { + import Udp.SO._ + def broadcast(on: Boolean) = Broadcast(on) +} diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 838f53a88d..82c51a7df4 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -19,11 +19,13 @@ object UdpFF extends ExtensionKey[UdpFFExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) @@ -44,14 +46,43 @@ object UdpFF extends ExtensionKey[UdpFFExt] { case class Received(data: ByteString, sender: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - case object Bound extends Event - case object SimpleSendReady extends Event - case object Unbound extends Event + + sealed trait Bound extends Event + case object Bound extends Bound + + sealed trait SimpleSendReady extends Event + case object SimpleSendReady extends SimpleSendReady + + sealed trait Unbound + case object Unbound extends Unbound case class SendFailed(cause: Throwable) extends Event } +object UdpFFMessage { + import UdpFF._ + import java.lang.{ Iterable ⇒ JIterable } + import scala.collection.JavaConverters._ + import language.implicitConversions + + def send(payload: ByteString, target: InetSocketAddress) = Send(payload, target) + def send(payload: ByteString, target: InetSocketAddress, ack: Any) = Send(payload, target, ack) + + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]) = + Bind(handler, endpoint, options.asScala.to) + + def bind(handler: ActorRef, endpoint: InetSocketAddress) = Bind(handler, endpoint, Nil) + + def simpleSender(options: JIterable[SocketOption]) = SimpleSender(options.asScala.to) + def simpleSender = SimpleSender + + def unbind = Unbind + + def stopReading = StopReading + def resumeReading = ResumeReading +} + class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) diff --git a/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java new file mode 100644 index 0000000000..6cbcf6ecd2 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +//#imports +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +import akka.io.Inet; +import akka.io.UdpFF; +import akka.io.UdpFFMessage; +import akka.io.UdpSO; +import akka.util.ByteString; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +//#imports + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class IOUdpFFDocTest { + static public class Demo extends UntypedActor { + public void onReceive(Object message) { + //#manager + final ActorRef udpFF = UdpFF.get(system).manager(); + //#manager + + //#simplesend + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + + // ... or with socket options: + final List options = new ArrayList(); + options.add(UdpSO.broadcast(true)); + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + //#simplesend + + ActorRef simpleSender = null; + + //#simplesend-finish + if (message instanceof UdpFF.SimpleSendReady) { + simpleSender = getSender(); + } + //#simplesend-finish + + final ByteString data = ByteString.empty(); + + //#simplesend-send + simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#simplesend-send + + final ActorRef handler = getSelf(); + + //#bind + udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); + //#bind + + ActorRef udpWorker = null; + + //#bind-finish + if (message instanceof UdpFF.Bound) { + udpWorker = getSender(); + } + //#bind-finish + + //#bind-receive + if (message instanceof UdpFF.Received) { + final UdpFF.Received rcvd = (UdpFF.Received) message; + final ByteString payload = rcvd.data(); + final InetSocketAddress sender = rcvd.sender(); + } + //#bind-receive + + //#bind-send + udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#bind-send + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("IODocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index aef55caba2..e2ac83545e 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -177,46 +177,33 @@ Using UDP UDP support comes in two flavors: connectionless, and connection based: -.. code-block:: scala - - import akka.io.IO - import akka.io.UdpFF - val connectionLessUdp = IO(UdpFF) - // ... or ... - import akka.io.UdpConn - val connectionBasedUdp = IO(UdpConn) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager UDP servers can be only implemented by the connectionless API, but clients can use both. Connectionless UDP ^^^^^^^^^^^^^^^^^^ +The following imports are assumed in the following sections: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports + Simple Send ............ To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the manager: -.. code-block:: scala - - IO(UdpFF) ! SimpleSender - // or with socket options: - import akka.io.Udp._ - IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true))) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: -.. code-block:: scala - - case SimpleSendReady => - simpleSender = sender +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple message send: -.. code-block:: scala - - simpleSender ! Send(data, serverAddress) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send Bind (and Send) @@ -225,31 +212,23 @@ Bind (and Send) To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP manager -.. code-block:: scala - - IO(UdpFF) ! Bind(handler, localAddress) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of this message is the worker for the UDP channel bound to the local address. -.. code-block:: scala - - case Bound => - udpWorker = sender // Save the worker ref for later use +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. code-block:: scala - - case Received(dataByteString, remoteAddress) => // Do something with the data +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive The ``Received`` message contains the payload of the datagram and the address of the sender. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: -.. code-block:: scala +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send - udpWorker ! Send(data, serverAddress) .. note:: The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case From 6d61a59a0fde6606fbe1332ae8bbe44e598061ff Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 15 Feb 2013 11:59:01 +0100 Subject: [PATCH 3/6] add INTERNAL API markers and explicit return types --- .../main/scala/akka/io/SelectionHandler.scala | 2 +- akka-actor/src/main/scala/akka/io/Tcp.scala | 39 ++++++++++--------- .../main/scala/akka/io/TcpConnection.scala | 5 +++ .../scala/akka/io/TcpIncomingConnection.scala | 2 + .../src/main/scala/akka/io/TcpListener.scala | 6 +++ .../src/main/scala/akka/io/TcpManager.scala | 2 + .../scala/akka/io/TcpOutgoingConnection.scala | 7 +++- .../main/scala/akka/io/UdpConnManager.scala | 5 ++- .../main/scala/akka/io/UdpConnection.scala | 3 ++ .../main/scala/akka/io/UdpFFListener.scala | 3 ++ .../src/main/scala/akka/io/UdpFFManager.scala | 2 + .../src/main/scala/akka/io/UdpFFSender.scala | 2 + .../main/scala/akka/io/WithUdpFFSend.scala | 3 ++ .../rst/java/code/docs/io/IODocTest.java | 10 +---- 14 files changed, 61 insertions(+), 30 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 0d1c0d439e..b041b6d44e 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2012 Typesafe Inc. + * Copyright (C) 2009-2013 Typesafe Inc. */ package akka.io diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index a95e26867a..989c1e05f1 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -106,8 +106,11 @@ object Tcp extends ExtensionKey[TcpExt] { case class Received(data: ByteString) extends Event case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - case object Bound extends Event - case object Unbound extends Event + + sealed trait Bound extends Event + case object Bound extends Bound + sealed trait Unbound extends Event + case object Unbound extends Unbound sealed trait ConnectionClosed extends Event { def isAborted: Boolean = false @@ -190,34 +193,34 @@ object TcpMessage { def connect(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, - options: JIterable[SocketOption]) = Connect(remoteAddress, Some(localAddress), options) + options: JIterable[SocketOption]): Command = Connect(remoteAddress, Some(localAddress), options) def connect(remoteAddress: InetSocketAddress, - options: JIterable[SocketOption]) = Connect(remoteAddress, None, options) - def connect(remoteAddress: InetSocketAddress) = Connect(remoteAddress, None, Nil) + options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options) + def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil) def bind(handler: ActorRef, endpoint: InetSocketAddress, backlog: Int, - options: JIterable[SocketOption]) = Bind(handler, endpoint, backlog, options) + options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options) def bind(handler: ActorRef, endpoint: InetSocketAddress, - backlog: Int) = Bind(handler, endpoint, backlog, Nil) + backlog: Int): Command = Bind(handler, endpoint, backlog, Nil) - def register(handler: ActorRef) = Register(handler) - def unbind = Unbind + def register(handler: ActorRef): Command = Register(handler) + def unbind: Command = Unbind - def close = Close - def confirmedClose = ConfirmedClose - def abort = Abort + def close: Command = Close + def confirmedClose: Command = ConfirmedClose + def abort: Command = Abort - def noAck = NoAck - def noAck(token: AnyRef) = NoAck(token) + def noAck: NoAck = NoAck + def noAck(token: AnyRef): NoAck = NoAck(token) - def write(data: ByteString) = Write(data) - def write(data: ByteString, ack: AnyRef) = Write(data, ack) + def write(data: ByteString): Command = Write(data) + def write(data: ByteString, ack: AnyRef): Command = Write(data, ack) - def stopReading = StopReading - def resumeReading = ResumeReading + def stopReading: Command = StopReading + def resumeReading: Command = ResumeReading implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { import scala.collection.JavaConverters._ diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 12e73bdaa1..bd98c84fcc 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -20,6 +20,8 @@ import akka.io.SelectionHandler._ /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. + * + * INTERNAL API */ private[io] abstract class TcpConnection(val channel: SocketChannel, val tcp: TcpExt) extends Actor with ActorLogging { @@ -298,6 +300,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } } +/** + * INTERNAL API + */ private[io] object TcpConnection { sealed trait ReadResult object NoData extends ReadResult diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index 2f7cf9c5fa..e1bcc0e399 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -13,6 +13,8 @@ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } /** * An actor handling the connection state machine for an incoming, already connected * SocketChannel. + * + * INTERNAL API */ private[io] class TcpIncomingConnection(_channel: SocketChannel, _tcp: TcpExt, diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index d806fe8490..8631b67c42 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -15,6 +15,9 @@ import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.IO.HasFailureMessage +/** + * INTERNAL API + */ private[io] object TcpListener { case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage { @@ -25,6 +28,9 @@ private[io] object TcpListener { } +/** + * INTERNAL API + */ private[io] class TcpListener(val selectorRouter: ActorRef, val tcp: TcpExt, val bindCommander: ActorRef, diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index aa80e96c10..4dd1d4466f 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -9,6 +9,8 @@ import akka.actor.{ ActorLogging, Props } import akka.io.IO.SelectorBasedManager /** + * INTERNAL API + * * TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections. * * TcpManager is obtainable by calling {{{ IO(Tcp) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]]) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 03d978293e..098ab69b43 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -15,6 +15,8 @@ import scala.collection.immutable /** * An actor handling the connection state machine for an outgoing connection * to be established. + * + * INTERNAL API */ private[io] class TcpOutgoingConnection(_tcp: TcpExt, commander: ActorRef, @@ -53,7 +55,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, } -object TcpOutgoingConnection { +/** + * INTERNAL API + */ +private[io] object TcpOutgoingConnection { private def newSocketChannel() = { val channel = SocketChannel.open() channel.configureBlocking(false) diff --git a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala index 3868289c6b..a362a02f38 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala @@ -7,7 +7,10 @@ import akka.actor.Props import akka.io.IO.SelectorBasedManager import akka.io.UdpConn.Connect -class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { +/** + * INTERNAL API + */ +private[io] class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { def receive = workerForCommandHandler { case c: Connect ⇒ diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 6d52adfb3b..06d4f6d523 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -13,6 +13,9 @@ import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +/** + * INTERNAL API + */ private[io] class UdpConnection(val udpConn: UdpConnExt, val commander: ActorRef, val connect: Connect) extends Actor with ActorLogging { diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index add5775832..25a5551b4c 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -14,6 +14,9 @@ import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +/** + * INTERNAL API + */ private[io] class UdpFFListener(val udpFF: UdpFFExt, val bindCommander: ActorRef, val bind: Bind) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 16d835ae49..679e3253de 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -8,6 +8,8 @@ import akka.io.IO.SelectorBasedManager import akka.io.UdpFF._ /** + * INTERNAL API + * * UdpFFManager is a facade for simple fire-and-forget style UDP operations * * UdpFFManager is obtainable by calling {{{ IO(UdpFF) }}} (see [[akka.io.IO]] and [[akka.io.UdpFF]]) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala index 1120efba33..dc327a1039 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala @@ -13,6 +13,8 @@ import scala.util.control.NonFatal /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. + * + * INTERNAL API */ private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) extends Actor with ActorLogging with WithUdpFFSend { diff --git a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala index 99ed9393e2..841eb690b2 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala @@ -8,6 +8,9 @@ import akka.io.UdpFF.{ CommandFailed, Send } import akka.io.SelectionHandler._ import java.nio.channels.DatagramChannel +/** + * INTERNAL API + */ private[io] trait WithUdpFFSend { me: Actor with ActorLogging ⇒ diff --git a/akka-docs/rst/java/code/docs/io/IODocTest.java b/akka-docs/rst/java/code/docs/io/IODocTest.java index 583254e44d..76e6efb76e 100644 --- a/akka-docs/rst/java/code/docs/io/IODocTest.java +++ b/akka-docs/rst/java/code/docs/io/IODocTest.java @@ -87,13 +87,5 @@ public class IODocTest { static ActorSystem system; - @BeforeClass - static public void setup() { - system = ActorSystem.create("IODocTest"); - } - - @Test - public void demonstrateConnect() { - } - + // This is currently only a compilation test, nothing is run } From cd2b499b792dd32df503fc051fd2286b931288e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Fri, 15 Feb 2013 11:22:39 +0100 Subject: [PATCH 4/6] Java API for UdpConn --- .../akka/io/UdpConnIntegrationSpec.scala | 2 +- akka-actor/src/main/scala/akka/io/Udp.scala | 6 +- .../src/main/scala/akka/io/UdpConn.scala | 53 ++++++++-- .../rst/java/code/docs/io/UdpConnDocTest.java | 96 +++++++++++++++++++ akka-docs/rst/java/io.rst | 19 +--- 5 files changed, 153 insertions(+), 23 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/io/UdpConnDocTest.java diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index 91742b8860..dfaffcd00d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -21,7 +21,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil)) + commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil)) commander.expectMsg(UdpConn.Connected) commander.sender } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index b52c684bae..59adbc4b06 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -44,7 +44,11 @@ object Udp { size.toInt } } - +} + +object UdpSO extends SoJavaFactories { + import Udp.SO._ + def broadcast(on: Boolean) = Broadcast(on) } object UdpSO extends SoJavaFactories { diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConn.scala index aee429a716..6fff5a864f 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConn.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConn.scala @@ -9,6 +9,7 @@ import akka.io.Udp.UdpSettings import akka.util.ByteString import java.net.InetSocketAddress import scala.collection.immutable +import java.lang.{ Iterable ⇒ JIterable } object UdpConn extends ExtensionKey[UdpConnExt] { // Java API @@ -18,19 +19,21 @@ object UdpConn extends ExtensionKey[UdpConnExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString): Send = Send(data, NoAck) } case class Connect(handler: ActorRef, - localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command case object StopReading extends Command @@ -40,8 +43,12 @@ object UdpConn extends ExtensionKey[UdpConnExt] { case class Received(data: ByteString) extends Event case class CommandFailed(cmd: Command) extends Event - case object Connected extends Event - case object Disconnected extends Event + + sealed trait Connected extends Event + case object Connected extends Connected + + sealed trait Disconnected extends Event + case object Disconnected extends Disconnected case object Close extends Command @@ -61,4 +68,38 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) -} \ No newline at end of file +} + +/** + * Java API + */ +object UdpConnMessage { + import language.implicitConversions + import UdpConn._ + + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil) + + def send(data: ByteString): Command = Send(data) + def send(data: ByteString, ack: AnyRef): Command = Send(data, ack) + + def close: Command = Close + + def noAck: NoAck = NoAck + def noAck(token: AnyRef): NoAck = NoAck(token) + + def stopReading: Command = StopReading + def resumeReading: Command = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java new file mode 100644 index 0000000000..11b0e3601f --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.UdpConn; +import akka.io.UdpConnMessage; +import akka.io.UdpSO; +import akka.util.ByteString; +//#imports + +public class UdpConnDocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef handler = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef udp = UdpConn.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = + new InetSocketAddress("127.0.0.1", 12345); + udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = + new InetSocketAddress("127.0.0.1", 1234); + final List options = + new ArrayList(); + options.add(UdpSO.broadcast(true)); + udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof UdpConn.Connected) { + final UdpConn.Connected conn = (UdpConn.Connected) msg; + connectionActor = getSender(); // Save the worker ref for later use + } + //#connected + else + //#received + if (msg instanceof UdpConn.Received) { + final UdpConn.Received recv = (UdpConn.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof UdpConn.CommandFailed) { + final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg; + final UdpConn.Command command = failed.cmd(); + // react to failed connect, etc. + } else if (msg instanceof UdpConn.Disconnected) { + // do something on disconnect + } + //#received + else + if ("send".equals(msg)) { + ByteString data = ByteString.empty(); + //#send + connectionActor.tell(UdpConnMessage.send(data), getSelf()); + //#send + } + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("UdpConnDocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index e2ac83545e..dbcd9571cd 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -244,34 +244,23 @@ receive datagrams only from that address. Connecting is similar to what we have seen in the previous section: -.. code-block:: scala - - IO(UdpConn) ! Connect(handler, remoteAddress) - // or, with more options: - IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) +.. includecode:: code/docs/io/UdpConnDocTest.java#connect After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of this message is the worker for the UDP connection. -.. code-block:: scala - - case Connected => - udpConnectionActor = sender // Save the worker ref for later use +.. includecode:: code/docs/io/UdpConnDocTest.java#connected The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. code-block:: scala - - case Received(dataByteString) => // Do something with the data +.. includecode:: code/docs/io/UdpConnDocTest.java#received The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address will be provided, as an UDP connection only receives messages from the endpoint it has been connected to. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: -.. code-block:: scala - - udpConnectionActor ! Send(data) +.. includecode:: code/docs/io/UdpConnDocTest.java#send Again, the send does not contain a remote address, as it is always the endpoint we have been connected to. From acb927e8708ea675422fe854b977356d5ef3d6db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 15 Feb 2013 12:53:17 +0100 Subject: [PATCH 5/6] Added missing return types in UdpFFMessage --- akka-actor/src/main/scala/akka/io/Udp.scala | 5 ----- akka-actor/src/main/scala/akka/io/UdpFF.scala | 18 +++++++++--------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 59adbc4b06..bfbcbc587c 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -50,8 +50,3 @@ object UdpSO extends SoJavaFactories { import Udp.SO._ def broadcast(on: Boolean) = Broadcast(on) } - -object UdpSO extends SoJavaFactories { - import Udp.SO._ - def broadcast(on: Boolean) = Broadcast(on) -} diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 82c51a7df4..fc9b955b74 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -66,21 +66,21 @@ object UdpFFMessage { import scala.collection.JavaConverters._ import language.implicitConversions - def send(payload: ByteString, target: InetSocketAddress) = Send(payload, target) - def send(payload: ByteString, target: InetSocketAddress, ack: Any) = Send(payload, target, ack) + def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target) + def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack) - def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]) = + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind = Bind(handler, endpoint, options.asScala.to) - def bind(handler: ActorRef, endpoint: InetSocketAddress) = Bind(handler, endpoint, Nil) + def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil) - def simpleSender(options: JIterable[SocketOption]) = SimpleSender(options.asScala.to) - def simpleSender = SimpleSender + def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to) + def simpleSender: SimpleSender = SimpleSender - def unbind = Unbind + def unbind: Unbind.type = Unbind - def stopReading = StopReading - def resumeReading = ResumeReading + def stopReading: StopReading.type = StopReading + def resumeReading: ResumeReading.type = ResumeReading } class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { From 80825d243339a4f78f5f2ecc05b5ada7fa302dcd Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 15 Feb 2013 14:29:37 +0100 Subject: [PATCH 6/6] =?UTF-8?q?fix=20TcpConnection=E2=80=99s=20PendingWrit?= =?UTF-8?q?e.wantsAck?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- akka-actor/src/main/scala/akka/io/TcpConnection.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index bd98c84fcc..0af6497668 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -289,7 +289,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } else this def hasData = buffer.remaining() > 0 || remainingData.size > 0 - def wantsAck = ack != NoAck + def wantsAck = !ack.isInstanceOf[NoAck] } def createWrite(write: Write): PendingWrite = { val buffer = bufferPool.acquire()