Java API for UdpConn

This commit is contained in:
Björn Antonsson 2013-02-15 11:22:39 +01:00 committed by Roland
parent 6d61a59a0f
commit cd2b499b79
5 changed files with 153 additions and 23 deletions

View file

@ -21,7 +21,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil))
commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil))
commander.expectMsg(UdpConn.Connected)
commander.sender
}

View file

@ -44,7 +44,11 @@ object Udp {
size.toInt
}
}
}
object UdpSO extends SoJavaFactories {
import Udp.SO._
def broadcast(on: Boolean) = Broadcast(on)
}
object UdpSO extends SoJavaFactories {

View file

@ -9,6 +9,7 @@ import akka.io.Udp.UdpSettings
import akka.util.ByteString
import java.net.InetSocketAddress
import scala.collection.immutable
import java.lang.{ Iterable JIterable }
object UdpConn extends ExtensionKey[UdpConnExt] {
// Java API
@ -18,19 +19,21 @@ object UdpConn extends ExtensionKey[UdpConnExt] {
def failureMessage = CommandFailed(this)
}
case object NoAck
case class NoAck(token: Any)
object NoAck extends NoAck(null)
case class Send(payload: ByteString, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = ack != NoAck
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
object Send {
def apply(data: ByteString): Send = Send(data, NoAck)
}
case class Connect(handler: ActorRef,
localAddress: Option[InetSocketAddress],
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command
case object StopReading extends Command
@ -40,8 +43,12 @@ object UdpConn extends ExtensionKey[UdpConnExt] {
case class Received(data: ByteString) extends Event
case class CommandFailed(cmd: Command) extends Event
case object Connected extends Event
case object Disconnected extends Event
sealed trait Connected extends Event
case object Connected extends Connected
sealed trait Disconnected extends Event
case object Disconnected extends Disconnected
case object Close extends Command
@ -61,4 +68,38 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension {
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
}
/**
* Java API
*/
object UdpConnMessage {
import language.implicitConversions
import UdpConn._
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options)
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options)
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil)
def send(data: ByteString): Command = Send(data)
def send(data: ByteString, ack: AnyRef): Command = Send(data, ack)
def close: Command = Close
def noAck: NoAck = NoAck
def noAck(token: AnyRef): NoAck = NoAck(token)
def stopReading: Command = StopReading
def resumeReading: Command = ResumeReading
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._
coll.asScala.to
}
}

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
//#imports
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
import akka.io.Inet;
import akka.io.UdpConn;
import akka.io.UdpConnMessage;
import akka.io.UdpSO;
import akka.util.ByteString;
//#imports
public class UdpConnDocTest {
static public class Demo extends UntypedActor {
ActorRef connectionActor = null;
ActorRef handler = getSelf();
@Override
public void onReceive(Object msg) {
if ("connect".equals(msg)) {
//#manager
final ActorRef udp = UdpConn.get(system).manager();
//#manager
//#connect
final InetSocketAddress remoteAddr =
new InetSocketAddress("127.0.0.1", 12345);
udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf());
// or with socket options
final InetSocketAddress localAddr =
new InetSocketAddress("127.0.0.1", 1234);
final List<Inet.SocketOption> options =
new ArrayList<Inet.SocketOption>();
options.add(UdpSO.broadcast(true));
udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf());
//#connect
} else
//#connected
if (msg instanceof UdpConn.Connected) {
final UdpConn.Connected conn = (UdpConn.Connected) msg;
connectionActor = getSender(); // Save the worker ref for later use
}
//#connected
else
//#received
if (msg instanceof UdpConn.Received) {
final UdpConn.Received recv = (UdpConn.Received) msg;
final ByteString data = recv.data();
// and do something with the received data ...
} else if (msg instanceof UdpConn.CommandFailed) {
final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg;
final UdpConn.Command command = failed.cmd();
// react to failed connect, etc.
} else if (msg instanceof UdpConn.Disconnected) {
// do something on disconnect
}
//#received
else
if ("send".equals(msg)) {
ByteString data = ByteString.empty();
//#send
connectionActor.tell(UdpConnMessage.send(data), getSelf());
//#send
}
}
}
static ActorSystem system;
@BeforeClass
static public void setup() {
system = ActorSystem.create("UdpConnDocTest");
}
@AfterClass
static public void teardown() {
system.shutdown();
}
@Test
public void demonstrateConnect() {
}
}

View file

@ -244,34 +244,23 @@ receive datagrams only from that address.
Connecting is similar to what we have seen in the previous section:
.. code-block:: scala
IO(UdpConn) ! Connect(handler, remoteAddress)
// or, with more options:
IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
.. includecode:: code/docs/io/UdpConnDocTest.java#connect
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
this message is the worker for the UDP connection.
.. code-block:: scala
case Connected =>
udpConnectionActor = sender // Save the worker ref for later use
.. includecode:: code/docs/io/UdpConnDocTest.java#connected
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. code-block:: scala
case Received(dataByteString) => // Do something with the data
.. includecode:: code/docs/io/UdpConnDocTest.java#received
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
will be provided, as an UDP connection only receives messages from the endpoint it has been connected to.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
.. code-block:: scala
udpConnectionActor ! Send(data)
.. includecode:: code/docs/io/UdpConnDocTest.java#send
Again, the send does not contain a remote address, as it is always the endpoint we have been connected to.