diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index 91742b8860..dfaffcd00d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -21,7 +21,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil)) + commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil)) commander.expectMsg(UdpConn.Connected) commander.sender } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index b52c684bae..59adbc4b06 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -44,7 +44,11 @@ object Udp { size.toInt } } - +} + +object UdpSO extends SoJavaFactories { + import Udp.SO._ + def broadcast(on: Boolean) = Broadcast(on) } object UdpSO extends SoJavaFactories { diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConn.scala index aee429a716..6fff5a864f 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConn.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConn.scala @@ -9,6 +9,7 @@ import akka.io.Udp.UdpSettings import akka.util.ByteString import java.net.InetSocketAddress import scala.collection.immutable +import java.lang.{ Iterable ⇒ JIterable } object UdpConn extends ExtensionKey[UdpConnExt] { // Java API @@ -18,19 +19,21 @@ object UdpConn extends ExtensionKey[UdpConnExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString): Send = Send(data, NoAck) } case class Connect(handler: ActorRef, - localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command case object StopReading extends Command @@ -40,8 +43,12 @@ object UdpConn extends ExtensionKey[UdpConnExt] { case class Received(data: ByteString) extends Event case class CommandFailed(cmd: Command) extends Event - case object Connected extends Event - case object Disconnected extends Event + + sealed trait Connected extends Event + case object Connected extends Connected + + sealed trait Disconnected extends Event + case object Disconnected extends Disconnected case object Close extends Command @@ -61,4 +68,38 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) -} \ No newline at end of file +} + +/** + * Java API + */ +object UdpConnMessage { + import language.implicitConversions + import UdpConn._ + + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil) + + def send(data: ByteString): Command = Send(data) + def send(data: ByteString, ack: AnyRef): Command = Send(data, ack) + + def close: Command = Close + + def noAck: NoAck = NoAck + def noAck(token: AnyRef): NoAck = NoAck(token) + + def stopReading: Command = StopReading + def resumeReading: Command = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java new file mode 100644 index 0000000000..11b0e3601f --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.UdpConn; +import akka.io.UdpConnMessage; +import akka.io.UdpSO; +import akka.util.ByteString; +//#imports + +public class UdpConnDocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef handler = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef udp = UdpConn.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = + new InetSocketAddress("127.0.0.1", 12345); + udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = + new InetSocketAddress("127.0.0.1", 1234); + final List options = + new ArrayList(); + options.add(UdpSO.broadcast(true)); + udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof UdpConn.Connected) { + final UdpConn.Connected conn = (UdpConn.Connected) msg; + connectionActor = getSender(); // Save the worker ref for later use + } + //#connected + else + //#received + if (msg instanceof UdpConn.Received) { + final UdpConn.Received recv = (UdpConn.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof UdpConn.CommandFailed) { + final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg; + final UdpConn.Command command = failed.cmd(); + // react to failed connect, etc. + } else if (msg instanceof UdpConn.Disconnected) { + // do something on disconnect + } + //#received + else + if ("send".equals(msg)) { + ByteString data = ByteString.empty(); + //#send + connectionActor.tell(UdpConnMessage.send(data), getSelf()); + //#send + } + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("UdpConnDocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index e2ac83545e..dbcd9571cd 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -244,34 +244,23 @@ receive datagrams only from that address. Connecting is similar to what we have seen in the previous section: -.. code-block:: scala - - IO(UdpConn) ! Connect(handler, remoteAddress) - // or, with more options: - IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) +.. includecode:: code/docs/io/UdpConnDocTest.java#connect After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of this message is the worker for the UDP connection. -.. code-block:: scala - - case Connected => - udpConnectionActor = sender // Save the worker ref for later use +.. includecode:: code/docs/io/UdpConnDocTest.java#connected The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. code-block:: scala - - case Received(dataByteString) => // Do something with the data +.. includecode:: code/docs/io/UdpConnDocTest.java#received The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address will be provided, as an UDP connection only receives messages from the endpoint it has been connected to. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: -.. code-block:: scala - - udpConnectionActor ! Send(data) +.. includecode:: code/docs/io/UdpConnDocTest.java#send Again, the send does not contain a remote address, as it is always the endpoint we have been connected to.