From 309f2c2f912c53ea99cb956de270d2fc52cc4ef5 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 25 Mar 2013 10:02:50 +0100 Subject: [PATCH] rename UdpFF->Udp and the other UdpConnected, see #3058 --- ...cala => UdpConnectedIntegrationSpec.scala} | 26 ++--- ...ionSpec.scala => UdpIntegrationSpec.scala} | 11 +- akka-actor/src/main/resources/reference.conf | 4 +- akka-actor/src/main/scala/akka/io/Udp.scala | 98 ++++++++++++++++- .../io/{UdpConn.scala => UdpConnected.scala} | 18 ++-- ...anager.scala => UdpConnectedManager.scala} | 6 +- .../main/scala/akka/io/UdpConnection.scala | 8 +- akka-actor/src/main/scala/akka/io/UdpFF.scala | 102 ------------------ .../src/main/scala/akka/io/UdpFFManager.scala | 58 ---------- ...{UdpFFListener.scala => UdpListener.scala} | 14 +-- .../src/main/scala/akka/io/UdpManager.scala | 58 ++++++++++ .../io/{UdpFFSender.scala => UdpSender.scala} | 6 +- ...{WithUdpFFSend.scala => WithUdpSend.scala} | 12 +-- ...nDocTest.java => UdpConnectedDocTest.java} | 32 +++--- .../{IOUdpFFDocTest.java => UdpDocTest.java} | 26 ++--- akka-docs/rst/java/io.rst | 36 +++---- akka-docs/rst/scala/io.rst | 26 ++--- 17 files changed, 267 insertions(+), 274 deletions(-) rename akka-actor-tests/src/test/scala/akka/io/{UdpConnIntegrationSpec.scala => UdpConnectedIntegrationSpec.scala} (72%) rename akka-actor-tests/src/test/scala/akka/io/{UdpFFIntegrationSpec.scala => UdpIntegrationSpec.scala} (88%) rename akka-actor/src/main/scala/akka/io/{UdpConn.scala => UdpConnected.scala} (86%) rename akka-actor/src/main/scala/akka/io/{UdpConnManager.scala => UdpConnectedManager.scala} (54%) delete mode 100644 akka-actor/src/main/scala/akka/io/UdpFF.scala delete mode 100644 akka-actor/src/main/scala/akka/io/UdpFFManager.scala rename akka-actor/src/main/scala/akka/io/{UdpFFListener.scala => UdpListener.scala} (89%) create mode 100644 akka-actor/src/main/scala/akka/io/UdpManager.scala rename akka-actor/src/main/scala/akka/io/{UdpFFSender.scala => UdpSender.scala} (86%) rename akka-actor/src/main/scala/akka/io/{WithUdpFFSend.scala => WithUdpSend.scala} (90%) rename akka-docs/rst/java/code/docs/io/{UdpConnDocTest.java => UdpConnectedDocTest.java} (65%) rename akka-docs/rst/java/code/docs/io/{IOUdpFFDocTest.java => UdpDocTest.java} (69%) diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala similarity index 72% rename from akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala rename to akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index 4651700ed1..aa1e871d00 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -10,21 +10,21 @@ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { val addresses = temporaryServerAddresses(3) def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpFF), UdpFF.Bind(handler, address)) - commander.expectMsg(UdpFF.Bound) + commander.send(IO(Udp), Udp.Bind(handler, address)) + commander.expectMsg(Udp.Bound) commander.sender } def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil)) - commander.expectMsg(UdpConn.Connected) + commander.send(IO(UdpConnected), UdpConnected.Connect(handler, remoteAddress, localAddress, Nil)) + commander.expectMsg(UdpConnected.Connected) commander.sender } @@ -35,19 +35,19 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli val server = bindUdp(serverAddress, testActor) val data1 = ByteString("To infinity and beyond!") val data2 = ByteString("All your datagram belong to us") - connectUdp(localAddress = None, serverAddress, testActor) ! UdpConn.Send(data1) + connectUdp(localAddress = None, serverAddress, testActor) ! UdpConnected.Send(data1) val clientAddress = expectMsgPF() { - case UdpFF.Received(d, a) ⇒ + case Udp.Received(d, a) ⇒ d must be === data1 a } - server ! UdpFF.Send(data2, clientAddress) + server ! Udp.Send(data2, clientAddress) // FIXME: Currently this line fails expectMsgPF() { - case UdpConn.Received(d) ⇒ d must be === data2 + case UdpConnected.Received(d) ⇒ d must be === data2 } } @@ -57,19 +57,19 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli val server = bindUdp(serverAddress, testActor) val data1 = ByteString("To infinity and beyond!") val data2 = ByteString("All your datagram belong to us") - connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConn.Send(data1) + connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConnected.Send(data1) expectMsgPF() { - case UdpFF.Received(d, a) ⇒ + case Udp.Received(d, a) ⇒ d must be === data1 a must be === clientAddress } - server ! UdpFF.Send(data2, clientAddress) + server ! Udp.Send(data2, clientAddress) // FIXME: Currently this line fails expectMsgPF() { - case UdpConn.Received(d) ⇒ d must be === data2 + case UdpConnected.Received(d) ⇒ d must be === data2 } } diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala similarity index 88% rename from akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala rename to akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 9bcf1d4400..6dba329661 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -4,27 +4,26 @@ package akka.io import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } -import akka.io.UdpFF._ -import akka.TestUtils -import TestUtils._ +import akka.io.Udp._ +import akka.TestUtils._ import akka.util.ByteString import java.net.InetSocketAddress import akka.actor.ActorRef -class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { +class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender { val addresses = temporaryServerAddresses(3) def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpFF), Bind(handler, address)) + commander.send(IO(Udp), Bind(handler, address)) commander.expectMsg(Bound) commander.sender } val simpleSender: ActorRef = { val commander = TestProbe() - commander.send(IO(UdpFF), SimpleSender) + commander.send(IO(Udp), SimpleSender) commander.expectMsg(SimpleSendReady) commander.sender } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index bff860665e..5a044d941d 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -479,7 +479,7 @@ akka { management-dispatcher = "akka.actor.default-dispatcher" } - udp-fire-and-forget { + udp { # The number of selectors to stripe the served channels over; each of # these will use one select loop on the selector-dispatcher. @@ -540,7 +540,7 @@ akka { management-dispatcher = "akka.actor.default-dispatcher" } - udp-connection { + udp-connected { # The number of selectors to stripe the served channels over; each of # these will use one select loop on the selector-dispatcher. diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index bfbcbc587c..11c3402ed4 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -7,8 +7,63 @@ import java.net.DatagramSocket import akka.io.Inet.{ SoJavaFactories, SocketOption } import com.typesafe.config.Config import akka.actor.{ Props, ActorSystemImpl } +import akka.actor.ExtendedActorSystem +import akka.actor.ActorRef +import akka.actor.ExtensionKey +import akka.actor.ActorSystem +import akka.util.ByteString +import java.net.InetSocketAddress +import scala.collection.immutable -object Udp { +object Udp extends ExtensionKey[UdpExt] { + + /** + * Java API: retrieve the Udp extension for the given system. + */ + override def get(system: ActorSystem): UdpExt = super.get(system) + + trait Command extends IO.HasFailureMessage { + def failureMessage = CommandFailed(this) + } + + case class NoAck(token: Any) + object NoAck extends NoAck(null) + + case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { + require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") + + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] + } + object Send { + def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) + } + + case class Bind(handler: ActorRef, + endpoint: InetSocketAddress, + options: immutable.Traversable[SocketOption] = Nil) extends Command + case object Unbind extends Command + + case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command + object SimpleSender extends SimpleSender(Nil) + + case object StopReading extends Command + case object ResumeReading extends Command + + trait Event + + case class Received(data: ByteString, sender: InetSocketAddress) extends Event + case class CommandFailed(cmd: Command) extends Event + + sealed trait Bound extends Event + case object Bound extends Bound + + sealed trait SimpleSendReady extends Event + case object SimpleSendReady extends SimpleSendReady + + sealed trait Unbound + case object Unbound extends Unbound + + case class SendFailed(cause: Throwable) extends Event object SO extends Inet.SoForwarders { @@ -46,6 +101,47 @@ object Udp { } } +class UdpExt(system: ExtendedActorSystem) extends IO.Extension { + + import Udp.UdpSettings + + val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp")) + + val manager: ActorRef = { + system.asInstanceOf[ActorSystemImpl].systemActorOf( + props = Props(new UdpManager(this)), + name = "IO-UDP-FF") + } + + val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) +} + +/** + * Java API: factory methods for the message types used when communicating with the Udp service. + */ +object UdpMessage { + import Udp._ + import java.lang.{ Iterable ⇒ JIterable } + import scala.collection.JavaConverters._ + import language.implicitConversions + + def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target) + def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack) + + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind = + Bind(handler, endpoint, options.asScala.to) + + def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil) + + def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to) + def simpleSender: SimpleSender = SimpleSender + + def unbind: Unbind.type = Unbind + + def stopReading: StopReading.type = StopReading + def resumeReading: ResumeReading.type = ResumeReading +} + object UdpSO extends SoJavaFactories { import Udp.SO._ def broadcast(on: Boolean) = Broadcast(on) diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala similarity index 86% rename from akka-actor/src/main/scala/akka/io/UdpConn.scala rename to akka-actor/src/main/scala/akka/io/UdpConnected.scala index ec2cee36dc..23d67fded6 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConn.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala @@ -11,11 +11,11 @@ import java.net.InetSocketAddress import scala.collection.immutable import java.lang.{ Iterable ⇒ JIterable } -object UdpConn extends ExtensionKey[UdpConnExt] { +object UdpConnected extends ExtensionKey[UdpConnectedExt] { /** - * Java API: retrieve the UdpConn extension for the given system. + * Java API: retrieve the UdpConnected extension for the given system. */ - override def get(system: ActorSystem): UdpConnExt = super.get(system) + override def get(system: ActorSystem): UdpConnectedExt = super.get(system) trait Command extends IO.HasFailureMessage { def failureMessage = CommandFailed(this) @@ -58,13 +58,13 @@ object UdpConn extends ExtensionKey[UdpConnExt] { } -class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { +class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { - val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) + val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected")) val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(new UdpConnManager(this)), + props = Props(new UdpConnectedManager(this)), name = "IO-UDP-CONN") } @@ -73,11 +73,11 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { } /** - * Java API: factory methods for the message types used when communicating with the UdpConn service. + * Java API: factory methods for the message types used when communicating with the UdpConnected service. */ -object UdpConnMessage { +object UdpConnectedMessage { import language.implicitConversions - import UdpConn._ + import UdpConnected._ def connect(handler: ActorRef, remoteAddress: InetSocketAddress, diff --git a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala similarity index 54% rename from akka-actor/src/main/scala/akka/io/UdpConnManager.scala rename to akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala index a362a02f38..284d1b9679 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala @@ -5,17 +5,17 @@ package akka.io import akka.actor.Props import akka.io.IO.SelectorBasedManager -import akka.io.UdpConn.Connect +import akka.io.UdpConnected.Connect /** * INTERNAL API */ -private[io] class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { +private[io] class UdpConnectedManager(udpConn: UdpConnectedExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { def receive = workerForCommandHandler { case c: Connect ⇒ val commander = sender - Props(new UdpConnection(udpConn, commander, c)) + Props(new UdpConnectedection(udpConn, commander, c)) } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 06d4f6d523..8e7869a53c 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -5,7 +5,7 @@ package akka.io import akka.actor.{ Actor, ActorLogging, ActorRef } import akka.io.SelectionHandler._ -import akka.io.UdpConn._ +import akka.io.UdpConnected._ import akka.util.ByteString import java.nio.ByteBuffer import java.nio.channels.DatagramChannel @@ -16,9 +16,9 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[io] class UdpConnection(val udpConn: UdpConnExt, - val commander: ActorRef, - val connect: Connect) extends Actor with ActorLogging { +private[io] class UdpConnectedection(val udpConn: UdpConnectedExt, + val commander: ActorRef, + val connect: Connect) extends Actor with ActorLogging { def selector: ActorRef = context.parent diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala deleted file mode 100644 index 743df53d60..0000000000 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.io - -import akka.actor._ -import akka.io.Inet.SocketOption -import akka.io.Udp.UdpSettings -import akka.util.ByteString -import java.net.InetSocketAddress -import scala.collection.immutable - -object UdpFF extends ExtensionKey[UdpFFExt] { - - /** - * Java API: retrieve the UdpFF extension for the given system. - */ - override def get(system: ActorSystem): UdpFFExt = super.get(system) - - trait Command extends IO.HasFailureMessage { - def failureMessage = CommandFailed(this) - } - - case class NoAck(token: Any) - object NoAck extends NoAck(null) - - case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { - require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - - def wantsAck: Boolean = !ack.isInstanceOf[NoAck] - } - object Send { - def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) - } - - case class Bind(handler: ActorRef, - endpoint: InetSocketAddress, - options: immutable.Traversable[SocketOption] = Nil) extends Command - case object Unbind extends Command - - case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command - object SimpleSender extends SimpleSender(Nil) - - case object StopReading extends Command - case object ResumeReading extends Command - - trait Event - - case class Received(data: ByteString, sender: InetSocketAddress) extends Event - case class CommandFailed(cmd: Command) extends Event - - sealed trait Bound extends Event - case object Bound extends Bound - - sealed trait SimpleSendReady extends Event - case object SimpleSendReady extends SimpleSendReady - - sealed trait Unbound - case object Unbound extends Unbound - - case class SendFailed(cause: Throwable) extends Event - -} - -/** - * Java API: factory methods for the message types used when communicating with the UdpConn service. - */ -object UdpFFMessage { - import UdpFF._ - import java.lang.{ Iterable ⇒ JIterable } - import scala.collection.JavaConverters._ - import language.implicitConversions - - def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target) - def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack) - - def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind = - Bind(handler, endpoint, options.asScala.to) - - def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil) - - def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to) - def simpleSender: SimpleSender = SimpleSender - - def unbind: Unbind.type = Unbind - - def stopReading: StopReading.type = StopReading - def resumeReading: ResumeReading.type = ResumeReading -} - -class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { - - val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) - - val manager: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf( - props = Props(new UdpFFManager(this)), - name = "IO-UDP-FF") - } - - val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala deleted file mode 100644 index 679e3253de..0000000000 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.io - -import akka.actor.Props -import akka.io.IO.SelectorBasedManager -import akka.io.UdpFF._ - -/** - * INTERNAL API - * - * UdpFFManager is a facade for simple fire-and-forget style UDP operations - * - * UdpFFManager is obtainable by calling {{{ IO(UdpFF) }}} (see [[akka.io.IO]] and [[akka.io.UdpFF]]) - * - * *Warning!* UdpFF uses [[java.nio.channels.DatagramChannel#send]] to deliver datagrams, and as a consequence if a - * security manager has been installed then for each datagram it will verify if the target address and port number are - * permitted. If this performance overhead is undesirable use the connection style Udp extension. - * - * == Bind and send == - * - * To bind and listen to a local address, a [[akka.io.UdpFF..Bind]] command must be sent to this actor. If the binding - * was successful, the sender of the [[akka.io.UdpFF.Bind]] will be notified with a [[akka.io.UdpFF.Bound]] - * message. The sender of the [[akka.io.UdpFF.Bound]] message is the Listener actor (an internal actor responsible for - * listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor. - * - * If the bind request is rejected because the Udp system is not able to register more channels (see the nr-of-selectors - * and max-channels configuration options in the akka.io.udpFF section of the configuration) the sender will be notified - * with a [[akka.io.UdpFF.CommandFailed]] message. This message contains the original command for reference. - * - * The handler provided in the [[akka.io.UdpFF.Bind]] message will receive inbound datagrams to the bound port - * wrapped in [[akka.io.UdpFF.Received]] messages which contain the payload of the datagram and the sender address. - * - * UDP datagrams can be sent by sending [[akka.io.UdpFF.Send]] messages to the Listener actor. The sender port of the - * outbound datagram will be the port to which the Listener is bound. - * - * == Simple send == - * - * UdpFF provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor - * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady - * message that the service is available. UDP datagrams can be sent by sending [[akka.io.UdpFF.Send]] messages to the - * sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be - * discarded. - * - */ -private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) { - - def receive = workerForCommandHandler { - case b: Bind ⇒ - val commander = sender - Props(new UdpFFListener(udpFF, commander, b)) - case SimpleSender(options) ⇒ - val commander = sender - Props(new UdpFFSender(udpFF, options, commander)) - } - -} diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala similarity index 89% rename from akka-actor/src/main/scala/akka/io/UdpFFListener.scala rename to akka-actor/src/main/scala/akka/io/UdpListener.scala index 25a5551b4c..827903a6d3 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -5,7 +5,7 @@ package akka.io import akka.actor.{ ActorLogging, Actor, ActorRef } import akka.io.SelectionHandler._ -import akka.io.UdpFF._ +import akka.io.Udp._ import akka.util.ByteString import java.net.InetSocketAddress import java.nio.ByteBuffer @@ -17,14 +17,14 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[io] class UdpFFListener(val udpFF: UdpFFExt, - val bindCommander: ActorRef, - val bind: Bind) - extends Actor with ActorLogging with WithUdpFFSend { +private[io] class UdpListener(val udp: UdpExt, + val bindCommander: ActorRef, + val bind: Bind) + extends Actor with ActorLogging with WithUdpSend { import bind._ - import udpFF.bufferPool - import udpFF.settings._ + import udp.bufferPool + import udp.settings._ def selector: ActorRef = context.parent diff --git a/akka-actor/src/main/scala/akka/io/UdpManager.scala b/akka-actor/src/main/scala/akka/io/UdpManager.scala new file mode 100644 index 0000000000..fcf9c5c97c --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/UdpManager.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.io + +import akka.actor.Props +import akka.io.IO.SelectorBasedManager +import akka.io.Udp._ + +/** + * INTERNAL API + * + * UdpManager is a facade for simple fire-and-forget style UDP operations + * + * UdpManager is obtainable by calling {{{ IO(Udp) }}} (see [[akka.io.IO]] and [[akka.io.Udp]]) + * + * *Warning!* Udp uses [[java.nio.channels.DatagramChannel#send]] to deliver datagrams, and as a consequence if a + * security manager has been installed then for each datagram it will verify if the target address and port number are + * permitted. If this performance overhead is undesirable use the connection style Udp extension. + * + * == Bind and send == + * + * To bind and listen to a local address, a [[akka.io.Udp..Bind]] command must be sent to this actor. If the binding + * was successful, the sender of the [[akka.io.Udp.Bind]] will be notified with a [[akka.io.Udp.Bound]] + * message. The sender of the [[akka.io.Udp.Bound]] message is the Listener actor (an internal actor responsible for + * listening to server events). To unbind the port an [[akka.io.Tcp.Unbind]] message must be sent to the Listener actor. + * + * If the bind request is rejected because the Udp system is not able to register more channels (see the nr-of-selectors + * and max-channels configuration options in the akka.io.udp section of the configuration) the sender will be notified + * with a [[akka.io.Udp.CommandFailed]] message. This message contains the original command for reference. + * + * The handler provided in the [[akka.io.Udp.Bind]] message will receive inbound datagrams to the bound port + * wrapped in [[akka.io.Udp.Received]] messages which contain the payload of the datagram and the sender address. + * + * UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the Listener actor. The sender port of the + * outbound datagram will be the port to which the Listener is bound. + * + * == Simple send == + * + * Udp provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor + * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady + * message that the service is available. UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the + * sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be + * discarded. + * + */ +private[io] class UdpManager(udp: UdpExt) extends SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) { + + def receive = workerForCommandHandler { + case b: Bind ⇒ + val commander = sender + Props(new UdpListener(udp, commander, b)) + case SimpleSender(options) ⇒ + val commander = sender + Props(new UdpSender(udp, options, commander)) + } + +} diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala similarity index 86% rename from akka-actor/src/main/scala/akka/io/UdpFFSender.scala rename to akka-actor/src/main/scala/akka/io/UdpSender.scala index dc327a1039..af3df740c5 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -5,7 +5,7 @@ package akka.io import akka.actor._ import java.nio.channels.DatagramChannel -import akka.io.UdpFF._ +import akka.io.Udp._ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } import scala.collection.immutable import akka.io.Inet.SocketOption @@ -16,8 +16,8 @@ import scala.util.control.NonFatal * * INTERNAL API */ -private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) - extends Actor with ActorLogging with WithUdpFFSend { +private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) + extends Actor with ActorLogging with WithUdpSend { def selector: ActorRef = context.parent diff --git a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala similarity index 90% rename from akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala rename to akka-actor/src/main/scala/akka/io/WithUdpSend.scala index 0fe714b6b6..df39efa59c 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala @@ -4,14 +4,14 @@ package akka.io import akka.actor.{ ActorRef, ActorLogging, Actor } -import akka.io.UdpFF.{ CommandFailed, Send } +import akka.io.Udp.{ CommandFailed, Send } import akka.io.SelectionHandler._ import java.nio.channels.DatagramChannel /** * INTERNAL API */ -private[io] trait WithUdpFFSend { +private[io] trait WithUdpSend { me: Actor with ActorLogging ⇒ var pendingSend: Send = null @@ -23,8 +23,8 @@ private[io] trait WithUdpFFSend { def selector: ActorRef def channel: DatagramChannel - def udpFF: UdpFFExt - val settings = udpFF.settings + def udp: UdpExt + val settings = udp.settings import settings._ @@ -49,7 +49,7 @@ private[io] trait WithUdpFFSend { final def doSend(): Unit = { - val buffer = udpFF.bufferPool.acquire() + val buffer = udp.bufferPool.acquire() try { buffer.clear() pendingSend.payload.copyToBuffer(buffer) @@ -76,7 +76,7 @@ private[io] trait WithUdpFFSend { } } finally { - udpFF.bufferPool.release(buffer) + udp.bufferPool.release(buffer) } } diff --git a/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java b/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java similarity index 65% rename from akka-docs/rst/java/code/docs/io/UdpConnDocTest.java rename to akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java index 8827ff6820..c0aff89891 100644 --- a/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java +++ b/akka-docs/rst/java/code/docs/io/UdpConnectedDocTest.java @@ -16,13 +16,13 @@ import java.util.ArrayList; import java.util.List; import akka.actor.ActorRef; import akka.io.Inet; -import akka.io.UdpConn; -import akka.io.UdpConnMessage; +import akka.io.UdpConnected; +import akka.io.UdpConnectedMessage; import akka.io.UdpSO; import akka.util.ByteString; //#imports -public class UdpConnDocTest { +public class UdpConnectedDocTest { static public class Demo extends UntypedActor { ActorRef connectionActor = null; @@ -32,12 +32,12 @@ public class UdpConnDocTest { public void onReceive(Object msg) { if ("connect".equals(msg)) { //#manager - final ActorRef udp = UdpConn.get(system).manager(); + final ActorRef udp = UdpConnected.get(system).manager(); //#manager //#connect final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1", 12345); - udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf()); + udp.tell(UdpConnectedMessage.connect(handler, remoteAddr), getSelf()); //#connect //#connect-with-options final InetSocketAddress localAddr = @@ -45,26 +45,26 @@ public class UdpConnDocTest { final List options = new ArrayList(); options.add(UdpSO.broadcast(true)); - udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf()); + udp.tell(UdpConnectedMessage.connect(handler, remoteAddr, localAddr, options), getSelf()); //#connect-with-options } else //#connected - if (msg instanceof UdpConn.Connected) { - final UdpConn.Connected conn = (UdpConn.Connected) msg; + if (msg instanceof UdpConnected.Connected) { + final UdpConnected.Connected conn = (UdpConnected.Connected) msg; connectionActor = getSender(); // Save the worker ref for later use } //#connected else //#received - if (msg instanceof UdpConn.Received) { - final UdpConn.Received recv = (UdpConn.Received) msg; + if (msg instanceof UdpConnected.Received) { + final UdpConnected.Received recv = (UdpConnected.Received) msg; final ByteString data = recv.data(); // and do something with the received data ... - } else if (msg instanceof UdpConn.CommandFailed) { - final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg; - final UdpConn.Command command = failed.cmd(); + } else if (msg instanceof UdpConnected.CommandFailed) { + final UdpConnected.CommandFailed failed = (UdpConnected.CommandFailed) msg; + final UdpConnected.Command command = failed.cmd(); // react to failed connect, etc. - } else if (msg instanceof UdpConn.Disconnected) { + } else if (msg instanceof UdpConnected.Disconnected) { // do something on disconnect } //#received @@ -72,7 +72,7 @@ public class UdpConnDocTest { if ("send".equals(msg)) { ByteString data = ByteString.empty(); //#send - connectionActor.tell(UdpConnMessage.send(data), getSelf()); + connectionActor.tell(UdpConnectedMessage.send(data), getSelf()); //#send } } @@ -82,7 +82,7 @@ public class UdpConnDocTest { @BeforeClass static public void setup() { - system = ActorSystem.create("UdpConnDocTest"); + system = ActorSystem.create("UdpConnectedDocTest"); } @AfterClass diff --git a/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java b/akka-docs/rst/java/code/docs/io/UdpDocTest.java similarity index 69% rename from akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java rename to akka-docs/rst/java/code/docs/io/UdpDocTest.java index 6cbcf6ecd2..d1a7cd91ba 100644 --- a/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java +++ b/akka-docs/rst/java/code/docs/io/UdpDocTest.java @@ -9,8 +9,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.UntypedActor; import akka.io.Inet; -import akka.io.UdpFF; -import akka.io.UdpFFMessage; +import akka.io.Udp; +import akka.io.UdpMessage; import akka.io.UdpSO; import akka.util.ByteString; @@ -24,26 +24,26 @@ import org.junit.BeforeClass; import org.junit.Test; -public class IOUdpFFDocTest { +public class UdpDocTest { static public class Demo extends UntypedActor { public void onReceive(Object message) { //#manager - final ActorRef udpFF = UdpFF.get(system).manager(); + final ActorRef udp = Udp.get(system).manager(); //#manager //#simplesend - udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + udp.tell(UdpMessage.simpleSender(), getSelf()); // ... or with socket options: final List options = new ArrayList(); options.add(UdpSO.broadcast(true)); - udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + udp.tell(UdpMessage.simpleSender(), getSelf()); //#simplesend ActorRef simpleSender = null; //#simplesend-finish - if (message instanceof UdpFF.SimpleSendReady) { + if (message instanceof Udp.SimpleSendReady) { simpleSender = getSender(); } //#simplesend-finish @@ -51,33 +51,33 @@ public class IOUdpFFDocTest { final ByteString data = ByteString.empty(); //#simplesend-send - simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + simpleSender.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); //#simplesend-send final ActorRef handler = getSelf(); //#bind - udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); + udp.tell(UdpMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); //#bind ActorRef udpWorker = null; //#bind-finish - if (message instanceof UdpFF.Bound) { + if (message instanceof Udp.Bound) { udpWorker = getSender(); } //#bind-finish //#bind-receive - if (message instanceof UdpFF.Received) { - final UdpFF.Received rcvd = (UdpFF.Received) message; + if (message instanceof Udp.Received) { + final Udp.Received rcvd = (Udp.Received) message; final ByteString payload = rcvd.data(); final InetSocketAddress sender = rcvd.sender(); } //#bind-receive //#bind-send - udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + udpWorker.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); //#bind-send } } diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index ac78ee052e..2da259b0b1 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -216,14 +216,14 @@ Using UDP UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams to any remote address. Connection-based UDP workers are linked to a single remote address. -The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending +The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending UDP datagrams. -.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager +.. includecode:: code/docs/io/UdpDocTest.java#manager -The connection-based UDP manager is accessed through ``UdpConn``. +The connection-based UDP manager is accessed through ``UdpConnected``. -.. includecode:: code/docs/io/UdpConnDocTest.java#manager +.. includecode:: code/docs/io/UdpConnectedDocTest.java#manager UDP servers can be only implemented by the connectionless API, but clients can use both. @@ -232,24 +232,24 @@ Connectionless UDP The following imports are assumed in the following sections: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports +.. includecode:: code/docs/io/UdpDocTest.java#imports Simple Send ............ To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the -``UdpFF`` manager: +``Udp`` manager: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend +.. includecode:: code/docs/io/UdpDocTest.java#simplesend The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish +.. includecode:: code/docs/io/UdpDocTest.java#simplesend-finish After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple message send: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send +.. includecode:: code/docs/io/UdpDocTest.java#simplesend-send Bind (and Send) @@ -258,22 +258,22 @@ Bind (and Send) To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP manager -.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind +.. includecode:: code/docs/io/UdpDocTest.java#bind After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of this message is the worker for the UDP channel bound to the local address. -.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish +.. includecode:: code/docs/io/UdpDocTest.java#bind-finish The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive +.. includecode:: code/docs/io/UdpDocTest.java#bind-receive The ``Received`` message contains the payload of the datagram and the address of the sender. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker: -.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send +.. includecode:: code/docs/io/UdpDocTest.java#bind-send .. note:: @@ -290,27 +290,27 @@ receive datagrams only from that address. Connecting is similar to what we have seen in the previous section: -.. includecode:: code/docs/io/UdpConnDocTest.java#connect +.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect Or, with more options: -.. includecode:: code/docs/io/UdpConnDocTest.java#connect-with-options +.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect-with-options After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of this message is the worker for the UDP connection. -.. includecode:: code/docs/io/UdpConnDocTest.java#connected +.. includecode:: code/docs/io/UdpConnectedDocTest.java#connected The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: -.. includecode:: code/docs/io/UdpConnDocTest.java#received +.. includecode:: code/docs/io/UdpConnectedDocTest.java#received The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address is provided, as a UDP connection only receives messages from the endpoint it has been connected to. It is also possible to send UDP datagrams using the ``ActorRef`` of the worker: -.. includecode:: code/docs/io/UdpConnDocTest.java#send +.. includecode:: code/docs/io/UdpConnectedDocTest.java#send Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address will always be the endpoint we originally connected to. diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index f4041be50d..7235abbb9e 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -136,7 +136,7 @@ the ``toSeq`` method. No bytes are copied. Because of immutability the underlyin .. includecode:: code/docs/io/BinaryCoding.scala :include: rest-to-seq -In general, conversions from ``ByteString`` to ``ByteIterator`` and vice versa are O(1) for non-chunked ``ByteString``s and (at worst) O(nChunks) for chunked ``ByteString``s. +In general, conversions from ``ByteString`` to ``ByteIterator`` and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings. Encoding of data also is very natural, using ``ByteStringBuilder`` @@ -283,21 +283,21 @@ Using UDP UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams to any remote address. Connection-based UDP workers are linked to a single remote address. -The connectionless UDP manager is accessed through ``UdpFF``. ``UdpFF`` refers to the "fire-and-forget" style of sending +The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending UDP datagrams. .. code-block:: scala import akka.io.IO - import akka.io.UdpFF - val connectionLessUdp = IO(UdpFF) + import akka.io.Udp + val connectionLessUdp = IO(Udp) -The connection-based UDP manager is accessed through ``UdpConn``. +The connection-based UDP manager is accessed through ``UdpConnected``. .. code-block:: scala - import akka.io.UdpConn - val connectionBasedUdp = IO(UdpConn) + import akka.io.UdpConnected + val connectionBasedUdp = IO(UdpConnected) UDP servers can be only implemented by the connectionless API, but clients can use both. @@ -308,14 +308,14 @@ Simple Send ............ To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the -``UdpFF`` manager: +``Udp`` manager: .. code-block:: scala - IO(UdpFF) ! SimpleSender + IO(Udp) ! SimpleSender // or with socket options: import akka.io.Udp._ - IO(UdpFF) ! SimpleSender(List(SO.Broadcast(true))) + IO(Udp) ! SimpleSender(List(SO.Broadcast(true))) The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: @@ -340,7 +340,7 @@ manager .. code-block:: scala - IO(UdpFF) ! Bind(handler, localAddress) + IO(Udp) ! Bind(handler, localAddress) After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of this message is the worker for the UDP channel bound to the local address. @@ -380,13 +380,13 @@ Connecting is similar to what we have seen in the previous section: .. code-block:: scala - IO(UdpConn) ! Connect(handler, remoteAddress) + IO(UdpConnected) ! Connect(handler, remoteAddress) Or, with more options: .. code-block:: scala - IO(UdpConn) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) + IO(UdpConnected) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of this message is the worker for the UDP connection.