rename UdpFF->Udp and the other UdpConnected, see #3058

This commit is contained in:
Roland 2013-03-25 10:02:50 +01:00
parent 0d510ff031
commit 309f2c2f91
17 changed files with 267 additions and 274 deletions

View file

@ -10,21 +10,21 @@ import akka.util.ByteString
import java.net.InetSocketAddress
import akka.actor.ActorRef
class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
val addresses = temporaryServerAddresses(3)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), UdpFF.Bind(handler, address))
commander.expectMsg(UdpFF.Bound)
commander.send(IO(Udp), Udp.Bind(handler, address))
commander.expectMsg(Udp.Bound)
commander.sender
}
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil))
commander.expectMsg(UdpConn.Connected)
commander.send(IO(UdpConnected), UdpConnected.Connect(handler, remoteAddress, localAddress, Nil))
commander.expectMsg(UdpConnected.Connected)
commander.sender
}
@ -35,19 +35,19 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
val server = bindUdp(serverAddress, testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(localAddress = None, serverAddress, testActor) ! UdpConn.Send(data1)
connectUdp(localAddress = None, serverAddress, testActor) ! UdpConnected.Send(data1)
val clientAddress = expectMsgPF() {
case UdpFF.Received(d, a)
case Udp.Received(d, a)
d must be === data1
a
}
server ! UdpFF.Send(data2, clientAddress)
server ! Udp.Send(data2, clientAddress)
// FIXME: Currently this line fails
expectMsgPF() {
case UdpConn.Received(d) d must be === data2
case UdpConnected.Received(d) d must be === data2
}
}
@ -57,19 +57,19 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
val server = bindUdp(serverAddress, testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConn.Send(data1)
connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConnected.Send(data1)
expectMsgPF() {
case UdpFF.Received(d, a)
case Udp.Received(d, a)
d must be === data1
a must be === clientAddress
}
server ! UdpFF.Send(data2, clientAddress)
server ! Udp.Send(data2, clientAddress)
// FIXME: Currently this line fails
expectMsgPF() {
case UdpConn.Received(d) d must be === data2
case UdpConnected.Received(d) d must be === data2
}
}

View file

@ -4,27 +4,26 @@
package akka.io
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
import akka.io.UdpFF._
import akka.TestUtils
import TestUtils._
import akka.io.Udp._
import akka.TestUtils._
import akka.util.ByteString
import java.net.InetSocketAddress
import akka.actor.ActorRef
class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
val addresses = temporaryServerAddresses(3)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), Bind(handler, address))
commander.send(IO(Udp), Bind(handler, address))
commander.expectMsg(Bound)
commander.sender
}
val simpleSender: ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), SimpleSender)
commander.send(IO(Udp), SimpleSender)
commander.expectMsg(SimpleSendReady)
commander.sender
}

View file

@ -479,7 +479,7 @@ akka {
management-dispatcher = "akka.actor.default-dispatcher"
}
udp-fire-and-forget {
udp {
# The number of selectors to stripe the served channels over; each of
# these will use one select loop on the selector-dispatcher.
@ -540,7 +540,7 @@ akka {
management-dispatcher = "akka.actor.default-dispatcher"
}
udp-connection {
udp-connected {
# The number of selectors to stripe the served channels over; each of
# these will use one select loop on the selector-dispatcher.

View file

@ -7,8 +7,63 @@ import java.net.DatagramSocket
import akka.io.Inet.{ SoJavaFactories, SocketOption }
import com.typesafe.config.Config
import akka.actor.{ Props, ActorSystemImpl }
import akka.actor.ExtendedActorSystem
import akka.actor.ActorRef
import akka.actor.ExtensionKey
import akka.actor.ActorSystem
import akka.util.ByteString
import java.net.InetSocketAddress
import scala.collection.immutable
object Udp {
object Udp extends ExtensionKey[UdpExt] {
/**
* Java API: retrieve the Udp extension for the given system.
*/
override def get(system: ActorSystem): UdpExt = super.get(system)
trait Command extends IO.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
case class NoAck(token: Any)
object NoAck extends NoAck(null)
case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
object Send {
def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck)
}
case class Bind(handler: ActorRef,
endpoint: InetSocketAddress,
options: immutable.Traversable[SocketOption] = Nil) extends Command
case object Unbind extends Command
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
object SimpleSender extends SimpleSender(Nil)
case object StopReading extends Command
case object ResumeReading extends Command
trait Event
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
case class CommandFailed(cmd: Command) extends Event
sealed trait Bound extends Event
case object Bound extends Bound
sealed trait SimpleSendReady extends Event
case object SimpleSendReady extends SimpleSendReady
sealed trait Unbound
case object Unbound extends Unbound
case class SendFailed(cause: Throwable) extends Event
object SO extends Inet.SoForwarders {
@ -46,6 +101,47 @@ object Udp {
}
}
class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
import Udp.UdpSettings
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp"))
val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props(new UdpManager(this)),
name = "IO-UDP-FF")
}
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
/**
* Java API: factory methods for the message types used when communicating with the Udp service.
*/
object UdpMessage {
import Udp._
import java.lang.{ Iterable JIterable }
import scala.collection.JavaConverters._
import language.implicitConversions
def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target)
def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack)
def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind =
Bind(handler, endpoint, options.asScala.to)
def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil)
def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to)
def simpleSender: SimpleSender = SimpleSender
def unbind: Unbind.type = Unbind
def stopReading: StopReading.type = StopReading
def resumeReading: ResumeReading.type = ResumeReading
}
object UdpSO extends SoJavaFactories {
import Udp.SO._
def broadcast(on: Boolean) = Broadcast(on)

View file

@ -11,11 +11,11 @@ import java.net.InetSocketAddress
import scala.collection.immutable
import java.lang.{ Iterable JIterable }
object UdpConn extends ExtensionKey[UdpConnExt] {
object UdpConnected extends ExtensionKey[UdpConnectedExt] {
/**
* Java API: retrieve the UdpConn extension for the given system.
* Java API: retrieve the UdpConnected extension for the given system.
*/
override def get(system: ActorSystem): UdpConnExt = super.get(system)
override def get(system: ActorSystem): UdpConnectedExt = super.get(system)
trait Command extends IO.HasFailureMessage {
def failureMessage = CommandFailed(this)
@ -58,13 +58,13 @@ object UdpConn extends ExtensionKey[UdpConnExt] {
}
class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension {
class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget"))
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected"))
val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props(new UdpConnManager(this)),
props = Props(new UdpConnectedManager(this)),
name = "IO-UDP-CONN")
}
@ -73,11 +73,11 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension {
}
/**
* Java API: factory methods for the message types used when communicating with the UdpConn service.
* Java API: factory methods for the message types used when communicating with the UdpConnected service.
*/
object UdpConnMessage {
object UdpConnectedMessage {
import language.implicitConversions
import UdpConn._
import UdpConnected._
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,

View file

@ -5,17 +5,17 @@ package akka.io
import akka.actor.Props
import akka.io.IO.SelectorBasedManager
import akka.io.UdpConn.Connect
import akka.io.UdpConnected.Connect
/**
* INTERNAL API
*/
private[io] class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
private[io] class UdpConnectedManager(udpConn: UdpConnectedExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case c: Connect
val commander = sender
Props(new UdpConnection(udpConn, commander, c))
Props(new UdpConnectedection(udpConn, commander, c))
}
}

View file

@ -5,7 +5,7 @@ package akka.io
import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.io.SelectionHandler._
import akka.io.UdpConn._
import akka.io.UdpConnected._
import akka.util.ByteString
import java.nio.ByteBuffer
import java.nio.channels.DatagramChannel
@ -16,9 +16,9 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[io] class UdpConnection(val udpConn: UdpConnExt,
val commander: ActorRef,
val connect: Connect) extends Actor with ActorLogging {
private[io] class UdpConnectedection(val udpConn: UdpConnectedExt,
val commander: ActorRef,
val connect: Connect) extends Actor with ActorLogging {
def selector: ActorRef = context.parent

View file

@ -1,102 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.actor._
import akka.io.Inet.SocketOption
import akka.io.Udp.UdpSettings
import akka.util.ByteString
import java.net.InetSocketAddress
import scala.collection.immutable
object UdpFF extends ExtensionKey[UdpFFExt] {
/**
* Java API: retrieve the UdpFF extension for the given system.
*/
override def get(system: ActorSystem): UdpFFExt = super.get(system)
trait Command extends IO.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
case class NoAck(token: Any)
object NoAck extends NoAck(null)
case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
object Send {
def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck)
}
case class Bind(handler: ActorRef,
endpoint: InetSocketAddress,
options: immutable.Traversable[SocketOption] = Nil) extends Command
case object Unbind extends Command
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
object SimpleSender extends SimpleSender(Nil)
case object StopReading extends Command
case object ResumeReading extends Command
trait Event
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
case class CommandFailed(cmd: Command) extends Event
sealed trait Bound extends Event
case object Bound extends Bound
sealed trait SimpleSendReady extends Event
case object SimpleSendReady extends SimpleSendReady
sealed trait Unbound
case object Unbound extends Unbound
case class SendFailed(cause: Throwable) extends Event
}
/**
* Java API: factory methods for the message types used when communicating with the UdpConn service.
*/
object UdpFFMessage {
import UdpFF._
import java.lang.{ Iterable JIterable }
import scala.collection.JavaConverters._
import language.implicitConversions
def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target)
def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack)
def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind =
Bind(handler, endpoint, options.asScala.to)
def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil)
def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to)
def simpleSender: SimpleSender = SimpleSender
def unbind: Unbind.type = Unbind
def stopReading: StopReading.type = StopReading
def resumeReading: ResumeReading.type = ResumeReading
}
class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension {
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget"))
val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props(new UdpFFManager(this)),
name = "IO-UDP-FF")
}
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.actor.Props
import akka.io.IO.SelectorBasedManager
import akka.io.UdpFF._
/**
* INTERNAL API
*
* UdpFFManager is a facade for simple fire-and-forget style UDP operations
*
* UdpFFManager is obtainable by calling {{{ IO(UdpFF) }}} (see [[akka.io.IO]] and [[akka.io.UdpFF]])
*
* *Warning!* UdpFF uses [[java.nio.channels.DatagramChannel#send]] to deliver datagrams, and as a consequence if a
* security manager has been installed then for each datagram it will verify if the target address and port number are
* permitted. If this performance overhead is undesirable use the connection style Udp extension.
*
* == Bind and send ==
*
* To bind and listen to a local address, a [[akka.io.UdpFF..Bind]] command must be sent to this actor. If the binding
* was successful, the sender of the [[akka.io.UdpFF.Bind]] will be notified with a [[akka.io.UdpFF.Bound]]
* message. The sender of the [[akka.io.UdpFF.Bound]] message is the Listener actor (an internal actor responsible for
* listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor.
*
* If the bind request is rejected because the Udp system is not able to register more channels (see the nr-of-selectors
* and max-channels configuration options in the akka.io.udpFF section of the configuration) the sender will be notified
* with a [[akka.io.UdpFF.CommandFailed]] message. This message contains the original command for reference.
*
* The handler provided in the [[akka.io.UdpFF.Bind]] message will receive inbound datagrams to the bound port
* wrapped in [[akka.io.UdpFF.Received]] messages which contain the payload of the datagram and the sender address.
*
* UDP datagrams can be sent by sending [[akka.io.UdpFF.Send]] messages to the Listener actor. The sender port of the
* outbound datagram will be the port to which the Listener is bound.
*
* == Simple send ==
*
* UdpFF provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor
* a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady
* message that the service is available. UDP datagrams can be sent by sending [[akka.io.UdpFF.Send]] messages to the
* sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be
* discarded.
*
*/
private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case b: Bind
val commander = sender
Props(new UdpFFListener(udpFF, commander, b))
case SimpleSender(options)
val commander = sender
Props(new UdpFFSender(udpFF, options, commander))
}
}

View file

@ -5,7 +5,7 @@ package akka.io
import akka.actor.{ ActorLogging, Actor, ActorRef }
import akka.io.SelectionHandler._
import akka.io.UdpFF._
import akka.io.Udp._
import akka.util.ByteString
import java.net.InetSocketAddress
import java.nio.ByteBuffer
@ -17,14 +17,14 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[io] class UdpFFListener(val udpFF: UdpFFExt,
val bindCommander: ActorRef,
val bind: Bind)
extends Actor with ActorLogging with WithUdpFFSend {
private[io] class UdpListener(val udp: UdpExt,
val bindCommander: ActorRef,
val bind: Bind)
extends Actor with ActorLogging with WithUdpSend {
import bind._
import udpFF.bufferPool
import udpFF.settings._
import udp.bufferPool
import udp.settings._
def selector: ActorRef = context.parent

View file

@ -0,0 +1,58 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.actor.Props
import akka.io.IO.SelectorBasedManager
import akka.io.Udp._
/**
* INTERNAL API
*
* UdpManager is a facade for simple fire-and-forget style UDP operations
*
* UdpManager is obtainable by calling {{{ IO(Udp) }}} (see [[akka.io.IO]] and [[akka.io.Udp]])
*
* *Warning!* Udp uses [[java.nio.channels.DatagramChannel#send]] to deliver datagrams, and as a consequence if a
* security manager has been installed then for each datagram it will verify if the target address and port number are
* permitted. If this performance overhead is undesirable use the connection style Udp extension.
*
* == Bind and send ==
*
* To bind and listen to a local address, a [[akka.io.Udp..Bind]] command must be sent to this actor. If the binding
* was successful, the sender of the [[akka.io.Udp.Bind]] will be notified with a [[akka.io.Udp.Bound]]
* message. The sender of the [[akka.io.Udp.Bound]] message is the Listener actor (an internal actor responsible for
* listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor.
*
* If the bind request is rejected because the Udp system is not able to register more channels (see the nr-of-selectors
* and max-channels configuration options in the akka.io.udp section of the configuration) the sender will be notified
* with a [[akka.io.Udp.CommandFailed]] message. This message contains the original command for reference.
*
* The handler provided in the [[akka.io.Udp.Bind]] message will receive inbound datagrams to the bound port
* wrapped in [[akka.io.Udp.Received]] messages which contain the payload of the datagram and the sender address.
*
* UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the Listener actor. The sender port of the
* outbound datagram will be the port to which the Listener is bound.
*
* == Simple send ==
*
* Udp provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor
* a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady
* message that the service is available. UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the
* sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be
* discarded.
*
*/
private[io] class UdpManager(udp: UdpExt) extends SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case b: Bind
val commander = sender
Props(new UdpListener(udp, commander, b))
case SimpleSender(options)
val commander = sender
Props(new UdpSender(udp, options, commander))
}
}

View file

@ -5,7 +5,7 @@ package akka.io
import akka.actor._
import java.nio.channels.DatagramChannel
import akka.io.UdpFF._
import akka.io.Udp._
import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel }
import scala.collection.immutable
import akka.io.Inet.SocketOption
@ -16,8 +16,8 @@ import scala.util.control.NonFatal
*
* INTERNAL API
*/
private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef)
extends Actor with ActorLogging with WithUdpFFSend {
private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef)
extends Actor with ActorLogging with WithUdpSend {
def selector: ActorRef = context.parent

View file

@ -4,14 +4,14 @@
package akka.io
import akka.actor.{ ActorRef, ActorLogging, Actor }
import akka.io.UdpFF.{ CommandFailed, Send }
import akka.io.Udp.{ CommandFailed, Send }
import akka.io.SelectionHandler._
import java.nio.channels.DatagramChannel
/**
* INTERNAL API
*/
private[io] trait WithUdpFFSend {
private[io] trait WithUdpSend {
me: Actor with ActorLogging
var pendingSend: Send = null
@ -23,8 +23,8 @@ private[io] trait WithUdpFFSend {
def selector: ActorRef
def channel: DatagramChannel
def udpFF: UdpFFExt
val settings = udpFF.settings
def udp: UdpExt
val settings = udp.settings
import settings._
@ -49,7 +49,7 @@ private[io] trait WithUdpFFSend {
final def doSend(): Unit = {
val buffer = udpFF.bufferPool.acquire()
val buffer = udp.bufferPool.acquire()
try {
buffer.clear()
pendingSend.payload.copyToBuffer(buffer)
@ -76,7 +76,7 @@ private[io] trait WithUdpFFSend {
}
} finally {
udpFF.bufferPool.release(buffer)
udp.bufferPool.release(buffer)
}
}

View file

@ -16,13 +16,13 @@ import java.util.ArrayList;
import java.util.List;
import akka.actor.ActorRef;
import akka.io.Inet;
import akka.io.UdpConn;
import akka.io.UdpConnMessage;
import akka.io.UdpConnected;
import akka.io.UdpConnectedMessage;
import akka.io.UdpSO;
import akka.util.ByteString;
//#imports
public class UdpConnDocTest {
public class UdpConnectedDocTest {
static public class Demo extends UntypedActor {
ActorRef connectionActor = null;
@ -32,12 +32,12 @@ public class UdpConnDocTest {
public void onReceive(Object msg) {
if ("connect".equals(msg)) {
//#manager
final ActorRef udp = UdpConn.get(system).manager();
final ActorRef udp = UdpConnected.get(system).manager();
//#manager
//#connect
final InetSocketAddress remoteAddr =
new InetSocketAddress("127.0.0.1", 12345);
udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf());
udp.tell(UdpConnectedMessage.connect(handler, remoteAddr), getSelf());
//#connect
//#connect-with-options
final InetSocketAddress localAddr =
@ -45,26 +45,26 @@ public class UdpConnDocTest {
final List<Inet.SocketOption> options =
new ArrayList<Inet.SocketOption>();
options.add(UdpSO.broadcast(true));
udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf());
udp.tell(UdpConnectedMessage.connect(handler, remoteAddr, localAddr, options), getSelf());
//#connect-with-options
} else
//#connected
if (msg instanceof UdpConn.Connected) {
final UdpConn.Connected conn = (UdpConn.Connected) msg;
if (msg instanceof UdpConnected.Connected) {
final UdpConnected.Connected conn = (UdpConnected.Connected) msg;
connectionActor = getSender(); // Save the worker ref for later use
}
//#connected
else
//#received
if (msg instanceof UdpConn.Received) {
final UdpConn.Received recv = (UdpConn.Received) msg;
if (msg instanceof UdpConnected.Received) {
final UdpConnected.Received recv = (UdpConnected.Received) msg;
final ByteString data = recv.data();
// and do something with the received data ...
} else if (msg instanceof UdpConn.CommandFailed) {
final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg;
final UdpConn.Command command = failed.cmd();
} else if (msg instanceof UdpConnected.CommandFailed) {
final UdpConnected.CommandFailed failed = (UdpConnected.CommandFailed) msg;
final UdpConnected.Command command = failed.cmd();
// react to failed connect, etc.
} else if (msg instanceof UdpConn.Disconnected) {
} else if (msg instanceof UdpConnected.Disconnected) {
// do something on disconnect
}
//#received
@ -72,7 +72,7 @@ public class UdpConnDocTest {
if ("send".equals(msg)) {
ByteString data = ByteString.empty();
//#send
connectionActor.tell(UdpConnMessage.send(data), getSelf());
connectionActor.tell(UdpConnectedMessage.send(data), getSelf());
//#send
}
}
@ -82,7 +82,7 @@ public class UdpConnDocTest {
@BeforeClass
static public void setup() {
system = ActorSystem.create("UdpConnDocTest");
system = ActorSystem.create("UdpConnectedDocTest");
}
@AfterClass

View file

@ -9,8 +9,8 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import akka.io.Inet;
import akka.io.UdpFF;
import akka.io.UdpFFMessage;
import akka.io.Udp;
import akka.io.UdpMessage;
import akka.io.UdpSO;
import akka.util.ByteString;
@ -24,26 +24,26 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class IOUdpFFDocTest {
public class UdpDocTest {
static public class Demo extends UntypedActor {
public void onReceive(Object message) {
//#manager
final ActorRef udpFF = UdpFF.get(system).manager();
final ActorRef udp = Udp.get(system).manager();
//#manager
//#simplesend
udpFF.tell(UdpFFMessage.simpleSender(), getSelf());
udp.tell(UdpMessage.simpleSender(), getSelf());
// ... or with socket options:
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(UdpSO.broadcast(true));
udpFF.tell(UdpFFMessage.simpleSender(), getSelf());
udp.tell(UdpMessage.simpleSender(), getSelf());
//#simplesend
ActorRef simpleSender = null;
//#simplesend-finish
if (message instanceof UdpFF.SimpleSendReady) {
if (message instanceof Udp.SimpleSendReady) {
simpleSender = getSender();
}
//#simplesend-finish
@ -51,33 +51,33 @@ public class IOUdpFFDocTest {
final ByteString data = ByteString.empty();
//#simplesend-send
simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
simpleSender.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#simplesend-send
final ActorRef handler = getSelf();
//#bind
udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf());
udp.tell(UdpMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf());
//#bind
ActorRef udpWorker = null;
//#bind-finish
if (message instanceof UdpFF.Bound) {
if (message instanceof Udp.Bound) {
udpWorker = getSender();
}
//#bind-finish
//#bind-receive
if (message instanceof UdpFF.Received) {
final UdpFF.Received rcvd = (UdpFF.Received) message;
if (message instanceof Udp.Received) {
final Udp.Received rcvd = (Udp.Received) message;
final ByteString payload = rcvd.data();
final InetSocketAddress sender = rcvd.sender();
}
//#bind-receive
//#bind-send
udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
udpWorker.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#bind-send
}
}

View file

@ -216,14 +216,14 @@ Using UDP
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
to any remote address. Connection-based UDP workers are linked to a single remote address.
The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending
The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending
UDP datagrams.
.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager
.. includecode:: code/docs/io/UdpDocTest.java#manager
The connection-based UDP manager is accessed through ``UdpConn``.
The connection-based UDP manager is accessed through ``UdpConnected``.
.. includecode:: code/docs/io/UdpConnDocTest.java#manager
.. includecode:: code/docs/io/UdpConnectedDocTest.java#manager
UDP servers can be only implemented by the connectionless API, but clients can use both.
@ -232,24 +232,24 @@ Connectionless UDP
The following imports are assumed in the following sections:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports
.. includecode:: code/docs/io/UdpDocTest.java#imports
Simple Send
............
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
``UdpFF`` manager:
``Udp`` manager:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend
.. includecode:: code/docs/io/UdpDocTest.java#simplesend
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish
.. includecode:: code/docs/io/UdpDocTest.java#simplesend-finish
After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple
message send:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send
.. includecode:: code/docs/io/UdpDocTest.java#simplesend-send
Bind (and Send)
@ -258,22 +258,22 @@ Bind (and Send)
To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP
manager
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind
.. includecode:: code/docs/io/UdpDocTest.java#bind
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
this message is the worker for the UDP channel bound to the local address.
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish
.. includecode:: code/docs/io/UdpDocTest.java#bind-finish
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive
.. includecode:: code/docs/io/UdpDocTest.java#bind-receive
The ``Received`` message contains the payload of the datagram and the address of the sender.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send
.. includecode:: code/docs/io/UdpDocTest.java#bind-send
.. note::
@ -290,27 +290,27 @@ receive datagrams only from that address.
Connecting is similar to what we have seen in the previous section:
.. includecode:: code/docs/io/UdpConnDocTest.java#connect
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect
Or, with more options:
.. includecode:: code/docs/io/UdpConnDocTest.java#connect-with-options
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect-with-options
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
this message is the worker for the UDP connection.
.. includecode:: code/docs/io/UdpConnDocTest.java#connected
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connected
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. includecode:: code/docs/io/UdpConnDocTest.java#received
.. includecode:: code/docs/io/UdpConnectedDocTest.java#received
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
is provided, as a UDP connection only receives messages from the endpoint it has been connected to.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
.. includecode:: code/docs/io/UdpConnDocTest.java#send
.. includecode:: code/docs/io/UdpConnectedDocTest.java#send
Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address
will always be the endpoint we originally connected to.

View file

@ -136,7 +136,7 @@ the ``toSeq`` method. No bytes are copied. Because of immutability the underlyin
.. includecode:: code/docs/io/BinaryCoding.scala
:include: rest-to-seq
In general, conversions from ``ByteString`` to ``ByteIterator`` and vice versa are O(1) for non-chunked ``ByteString``s and (at worst) O(nChunks) for chunked ``ByteString``s.
In general, conversions from ``ByteString`` to ``ByteIterator`` and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings.
Encoding of data also is very natural, using ``ByteStringBuilder``
@ -283,21 +283,21 @@ Using UDP
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
to any remote address. Connection-based UDP workers are linked to a single remote address.
The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending
The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending
UDP datagrams.
.. code-block:: scala
import akka.io.IO
import akka.io.UdpFF
val connectionLessUdp = IO(UdpFF)
import akka.io.Udp
val connectionLessUdp = IO(Udp)
The connection-based UDP manager is accessed through ``UdpConn``.
The connection-based UDP manager is accessed through ``UdpConnected``.
.. code-block:: scala
import akka.io.UdpConn
val connectionBasedUdp = IO(UdpConn)
import akka.io.UdpConnected
val connectionBasedUdp = IO(UdpConnected)
UDP servers can be only implemented by the connectionless API, but clients can use both.
@ -308,14 +308,14 @@ Simple Send
............
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
``UdpFF`` manager:
``Udp`` manager:
.. code-block:: scala
IO(UdpFF) ! SimpleSender
IO(Udp) ! SimpleSender
// or with socket options:
import akka.io.Udp._
IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true)))
IO(Udp) ! SimpleSender(List(SO.Broadcast(true)))
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
@ -340,7 +340,7 @@ manager
.. code-block:: scala
IO(UdpFF) ! Bind(handler, localAddress)
IO(Udp) ! Bind(handler, localAddress)
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
this message is the worker for the UDP channel bound to the local address.
@ -380,13 +380,13 @@ Connecting is similar to what we have seen in the previous section:
.. code-block:: scala
IO(UdpConn) ! Connect(handler, remoteAddress)
IO(UdpConnected) ! Connect(handler, remoteAddress)
Or, with more options:
.. code-block:: scala
IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
IO(UdpConnected) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
this message is the worker for the UDP connection.