Added Java API for UdpFF

This commit is contained in:
Endre Sándor Varga 2013-02-15 11:47:46 +01:00 committed by Roland
parent d71a541596
commit 933c93c05b
4 changed files with 155 additions and 39 deletions

View file

@ -4,7 +4,7 @@
package akka.io
import java.net.DatagramSocket
import akka.io.Inet.SocketOption
import akka.io.Inet.{ SoJavaFactories, SocketOption }
import com.typesafe.config.Config
import akka.actor.{ Props, ActorSystemImpl }
@ -46,3 +46,8 @@ object Udp {
}
}
object UdpSO extends SoJavaFactories {
import Udp.SO._
def broadcast(on: Boolean) = Broadcast(on)
}

View file

@ -19,11 +19,13 @@ object UdpFF extends ExtensionKey[UdpFFExt] {
def failureMessage = CommandFailed(this)
}
case object NoAck
case class NoAck(token: Any)
object NoAck extends NoAck(null)
case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = ack != NoAck
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
object Send {
def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck)
@ -44,14 +46,43 @@ object UdpFF extends ExtensionKey[UdpFFExt] {
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
case class CommandFailed(cmd: Command) extends Event
case object Bound extends Event
case object SimpleSendReady extends Event
case object Unbound extends Event
sealed trait Bound extends Event
case object Bound extends Bound
sealed trait SimpleSendReady extends Event
case object SimpleSendReady extends SimpleSendReady
sealed trait Unbound
case object Unbound extends Unbound
case class SendFailed(cause: Throwable) extends Event
}
object UdpFFMessage {
import UdpFF._
import java.lang.{ Iterable JIterable }
import scala.collection.JavaConverters._
import language.implicitConversions
def send(payload: ByteString, target: InetSocketAddress) = Send(payload, target)
def send(payload: ByteString, target: InetSocketAddress, ack: Any) = Send(payload, target, ack)
def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]) =
Bind(handler, endpoint, options.asScala.to)
def bind(handler: ActorRef, endpoint: InetSocketAddress) = Bind(handler, endpoint, Nil)
def simpleSender(options: JIterable[SocketOption]) = SimpleSender(options.asScala.to)
def simpleSender = SimpleSender
def unbind = Unbind
def stopReading = StopReading
def resumeReading = ResumeReading
}
class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension {
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget"))

View file

@ -0,0 +1,101 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.io.Inet;
import akka.io.UdpFF;
import akka.io.UdpFFMessage;
import akka.io.UdpSO;
import akka.util.ByteString;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
//#imports
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class IOUdpFFDocTest {
static public class Demo extends UntypedActor {
public void onReceive(Object message) {
//#manager
final ActorRef udpFF = UdpFF.get(system).manager();
//#manager
//#simplesend
udpFF.tell(UdpFFMessage.simpleSender(), getSelf());
// ... or with socket options:
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(UdpSO.broadcast(true));
udpFF.tell(UdpFFMessage.simpleSender(), getSelf());
//#simplesend
ActorRef simpleSender = null;
//#simplesend-finish
if (message instanceof UdpFF.SimpleSendReady) {
simpleSender = getSender();
}
//#simplesend-finish
final ByteString data = ByteString.empty();
//#simplesend-send
simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#simplesend-send
final ActorRef handler = getSelf();
//#bind
udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf());
//#bind
ActorRef udpWorker = null;
//#bind-finish
if (message instanceof UdpFF.Bound) {
udpWorker = getSender();
}
//#bind-finish
//#bind-receive
if (message instanceof UdpFF.Received) {
final UdpFF.Received rcvd = (UdpFF.Received) message;
final ByteString payload = rcvd.data();
final InetSocketAddress sender = rcvd.sender();
}
//#bind-receive
//#bind-send
udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#bind-send
}
}
static ActorSystem system;
@BeforeClass
static public void setup() {
system = ActorSystem.create("IODocTest");
}
@AfterClass
static public void teardown() {
system.shutdown();
}
@Test
public void demonstrateConnect() {
}
}

View file

@ -177,46 +177,33 @@ Using UDP
UDP support comes in two flavors: connectionless, and connection based:
.. code-block:: scala
import akka.io.IO
import akka.io.UdpFF
val connectionLessUdp = IO(UdpFF)
// ... or ...
import akka.io.UdpConn
val connectionBasedUdp = IO(UdpConn)
.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager
UDP servers can be only implemented by the connectionless API, but clients can use both.
Connectionless UDP
^^^^^^^^^^^^^^^^^^
The following imports are assumed in the following sections:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports
Simple Send
............
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
manager:
.. code-block:: scala
IO(UdpFF) ! SimpleSender
// or with socket options:
import akka.io.Udp._
IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true)))
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
.. code-block:: scala
case SimpleSendReady =>
simpleSender = sender
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish
After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple
message send:
.. code-block:: scala
simpleSender ! Send(data, serverAddress)
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send
Bind (and Send)
@ -225,31 +212,23 @@ Bind (and Send)
To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP
manager
.. code-block:: scala
IO(UdpFF) ! Bind(handler, localAddress)
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
this message is the worker for the UDP channel bound to the local address.
.. code-block:: scala
case Bound =>
udpWorker = sender // Save the worker ref for later use
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. code-block:: scala
case Received(dataByteString, remoteAddress) => // Do something with the data
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive
The ``Received`` message contains the payload of the datagram and the address of the sender.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
.. code-block:: scala
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send
udpWorker ! Send(data, serverAddress)
.. note::
The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case