From 933c93c05b8d866ab78d816de039cee4f670c414 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 15 Feb 2013 11:47:46 +0100 Subject: [PATCH] Added Java API for UdpFF --- akka-actor/src/main/scala/akka/io/Udp.scala | 7 +- akka-actor/src/main/scala/akka/io/UdpFF.scala | 41 ++++++- .../rst/java/code/docs/io/IOUdpFFDocTest.java | 101 ++++++++++++++++++ akka-docs/rst/java/io.rst | 45 +++----- 4 files changed, 155 insertions(+), 39 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 840dda666d..b52c684bae 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -4,7 +4,7 @@ package akka.io import java.net.DatagramSocket -import akka.io.Inet.SocketOption +import akka.io.Inet.{ SoJavaFactories, SocketOption } import com.typesafe.config.Config import akka.actor.{ Props, ActorSystemImpl } @@ -46,3 +46,8 @@ object Udp { } } + +object UdpSO extends SoJavaFactories { + import Udp.SO._ + def broadcast(on: Boolean) = Broadcast(on) +} diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 838f53a88d..82c51a7df4 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -19,11 +19,13 @@ object UdpFF extends ExtensionKey[UdpFFExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) @@ -44,14 +46,43 @@ object UdpFF extends ExtensionKey[UdpFFExt] { case class Received(data: ByteString, sender: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - case object Bound extends Event - case object SimpleSendReady extends Event - case object Unbound extends Event + + sealed trait Bound extends Event + case object Bound extends Bound + + sealed trait SimpleSendReady extends Event + case object SimpleSendReady extends SimpleSendReady + + sealed trait Unbound + case object Unbound extends Unbound case class SendFailed(cause: Throwable) extends Event } +object UdpFFMessage { + import UdpFF._ + import java.lang.{ Iterable ⇒ JIterable } + import scala.collection.JavaConverters._ + import language.implicitConversions + + def send(payload: ByteString, target: InetSocketAddress) = Send(payload, target) + def send(payload: ByteString, target: InetSocketAddress, ack: Any) = Send(payload, target, ack) + + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]) = + Bind(handler, endpoint, options.asScala.to) + + def bind(handler: ActorRef, endpoint: InetSocketAddress) = Bind(handler, endpoint, Nil) + + def simpleSender(options: JIterable[SocketOption]) = SimpleSender(options.asScala.to) + def simpleSender = SimpleSender + + def unbind = Unbind + + def stopReading = StopReading + def resumeReading = ResumeReading +} + class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) diff --git a/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java new file mode 100644 index 0000000000..6cbcf6ecd2 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +//#imports +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +import akka.io.Inet; +import akka.io.UdpFF; +import akka.io.UdpFFMessage; +import akka.io.UdpSO; +import akka.util.ByteString; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +//#imports + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class IOUdpFFDocTest { + static public class Demo extends UntypedActor { + public void onReceive(Object message) { + //#manager + final ActorRef udpFF = UdpFF.get(system).manager(); + //#manager + + //#simplesend + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + + // ... or with socket options: + final List options = new ArrayList(); + options.add(UdpSO.broadcast(true)); + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + //#simplesend + + ActorRef simpleSender = null; + + //#simplesend-finish + if (message instanceof UdpFF.SimpleSendReady) { + simpleSender = getSender(); + } + //#simplesend-finish + + final ByteString data = ByteString.empty(); + + //#simplesend-send + simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#simplesend-send + + final ActorRef handler = getSelf(); + + //#bind + udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); + //#bind + + ActorRef udpWorker = null; + + //#bind-finish + if (message instanceof UdpFF.Bound) { + udpWorker = getSender(); + } + //#bind-finish + + //#bind-receive + if (message instanceof UdpFF.Received) { + final UdpFF.Received rcvd = (UdpFF.Received) message; + final ByteString payload = rcvd.data(); + final InetSocketAddress sender = rcvd.sender(); + } + //#bind-receive + + //#bind-send + udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#bind-send + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("IODocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index aef55caba2..e2ac83545e 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -177,46 +177,33 @@ Using UDP UDP support comes in two flavors: connectionless, and connection based: -.. code-block:: scala - - import akka.io.IO - import akka.io.UdpFF - val connectionLessUdp = IO(UdpFF) - // ... or ... - import akka.io.UdpConn - val connectionBasedUdp = IO(UdpConn) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager UDP servers can be only implemented by the connectionless API, but clients can use both. Connectionless UDP ^^^^^^^^^^^^^^^^^^ +The following imports are assumed in the following sections: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports + Simple Send ............ To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the manager: -.. code-block:: scala - - IO(UdpFF) ! SimpleSender - // or with socket options: - import akka.io.Udp._ - IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true))) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: -.. code-block:: scala - - case SimpleSendReady => - simpleSender = sender +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple message send: -.. code-block:: scala - - simpleSender ! Send(data, serverAddress) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send Bind (and Send) @@ -225,31 +212,23 @@ Bind (and Send) To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP manager -.. code-block:: scala - - IO(UdpFF) ! Bind(handler, localAddress) +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of this message is the worker for the UDP channel bound to the local address. -.. code-block:: scala - - case Bound => - udpWorker = sender // Save the worker ref for later use +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. code-block:: scala - - case Received(dataByteString, remoteAddress) => // Do something with the data +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive The ``Received`` message contains the payload of the datagram and the address of the sender. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: -.. code-block:: scala +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send - udpWorker ! Send(data, serverAddress) .. note:: The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case