From b6efd467e87ad125a38fd3d6bad452f6a3e864b0 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 May 2013 18:29:23 +0200 Subject: [PATCH] add ScalaDocs and correct UDP docs, see 3268 --- .../scala/akka/io/UdpIntegrationSpec.scala | 2 +- akka-actor/src/main/scala/akka/io/IO.scala | 49 +- akka-actor/src/main/scala/akka/io/Inet.scala | 50 ++ .../main/scala/akka/io/SelectionHandler.scala | 35 +- .../main/scala/akka/io/SslTlsSupport.scala | 5 +- akka-actor/src/main/scala/akka/io/Tcp.scala | 437 +++++++++++++++++- .../src/main/scala/akka/io/TcpListener.scala | 3 +- .../src/main/scala/akka/io/TcpManager.scala | 4 +- akka-actor/src/main/scala/akka/io/Udp.scala | 228 ++++++++- .../src/main/scala/akka/io/UdpConnected.scala | 164 ++++++- .../scala/akka/io/UdpConnectedManager.scala | 3 +- .../main/scala/akka/io/UdpConnection.scala | 4 +- .../src/main/scala/akka/io/UdpListener.scala | 2 +- .../src/main/scala/akka/io/UdpManager.scala | 7 +- .../src/main/scala/akka/io/UdpSender.scala | 4 +- .../src/main/scala/akka/io/WithUdpSend.scala | 10 +- .../rst/java/code/docs/io/UdpDocTest.java | 215 ++++++--- akka-docs/rst/java/io.rst | 138 +++--- .../rst/scala/code/docs/io/UdpDocSpec.scala | 167 +++++++ akka-docs/rst/scala/io.rst | 170 +++---- .../scala/akka/io/ssl/SslTlsSupportSpec.scala | 2 +- 21 files changed, 1331 insertions(+), 368 deletions(-) create mode 100644 akka-docs/rst/scala/code/docs/io/UdpDocSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 31e103a82f..4b278b4eeb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -24,7 +24,7 @@ class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitS val simpleSender: ActorRef = { val commander = TestProbe() commander.send(IO(Udp), SimpleSender) - commander.expectMsg(SimpleSendReady) + commander.expectMsg(SimpleSenderReady) commander.sender } diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index f876b8ba9d..b87c93b9b3 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -10,44 +10,29 @@ import akka.routing.RandomRouter import akka.io.SelectionHandler.WorkerForCommand import akka.event.Logging +/** + * Entry point to Akka’s IO layer. + * + * All contents of the `akka.io` package is marked “experimental”. + * + * This marker signifies that APIs may still change in response to user feedback + * through-out the 2.2 release cycle. The implementation itself is considered + * stable and ready for production use. + * + * @see the Akka online documentation + */ object IO { trait Extension extends akka.actor.Extension { def manager: ActorRef } + /** + * Scala API: obtain a reference to the manager actor for the given IO extension, + * for example [[Tcp]] or [[Udp]]. + * + * For the Java API please refer to the individual extensions directly. + */ def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager - // What is this? It's public API so I think it deserves a mention - trait HasFailureMessage { - def failureMessage: Any - } - - abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { - - override def supervisorStrategy = connectionSupervisorStrategy - - val selectorPool = context.actorOf( - props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)), - name = "selectors") - - final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = { - case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd)) - } - } - - /** - * Special supervisor strategy for parents of TCP connection and listener actors. - * Stops the child on all errors and logs DeathPactExceptions only at debug level. - */ - private[io] final val connectionSupervisorStrategy: SupervisorStrategy = - new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) { - override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, - decision: SupervisorStrategy.Directive): Unit = - if (cause.isInstanceOf[DeathPactException]) { - try context.system.eventStream.publish { - Logging.Debug(child.path.toString, getClass, "Closed after handler termination") - } catch { case NonFatal(_) ⇒ } - } else super.logFailure(context, child, cause, decision) - } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 18c96594d6..e33b7ee6d1 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -80,17 +80,67 @@ object Inet { } trait SoForwarders { + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option + * + * For more information see [[java.net.Socket.setReceiveBufferSize]] + */ val ReceiveBufferSize = SO.ReceiveBufferSize + + /** + * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR + * + * For more information see [[java.net.Socket.setReuseAddress]] + */ val ReuseAddress = SO.ReuseAddress + + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option. + * + * For more information see [[java.net.Socket.setSendBufferSize]] + */ val SendBufferSize = SO.SendBufferSize + + /** + * [[akka.io.Tcp.SocketOption]] to set the traffic class or + * type-of-service octet in the IP header for packets sent from this + * socket. + * + * For more information see [[java.net.Socket.setTrafficClass]] + */ val TrafficClass = SO.TrafficClass } trait SoJavaFactories { import SO._ + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option + * + * For more information see [[java.net.Socket.setReceiveBufferSize]] + */ def receiveBufferSize(size: Int) = ReceiveBufferSize(size) + + /** + * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR + * + * For more information see [[java.net.Socket.setReuseAddress]] + */ def reuseAddress(on: Boolean) = ReuseAddress(on) + + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option. + * + * For more information see [[java.net.Socket.setSendBufferSize]] + */ def sendBufferSize(size: Int) = SendBufferSize(size) + + /** + * [[akka.io.Tcp.SocketOption]] to set the traffic class or + * type-of-service octet in the IP header for packets sent from this + * socket. + * + * For more information see [[java.net.Socket.setTrafficClass]] + */ def trafficClass(tc: Int) = TrafficClass(tc) } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index dc522b657d..10852f4372 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -15,10 +15,11 @@ import scala.util.control.NonFatal import scala.concurrent.ExecutionContext import akka.event.LoggingAdapter import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } -import akka.io.IO.HasFailureMessage import akka.util.Helpers.Requiring import akka.util.SerializedSuspendableExecutionContext import akka.actor._ +import akka.routing.RandomRouter +import akka.event.Logging abstract class SelectionHandlerSettings(config: Config) { import config._ @@ -60,6 +61,10 @@ private[io] trait ChannelRegistration { private[io] object SelectionHandler { + trait HasFailureMessage { + def failureMessage: Any + } + case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry ⇒ Props) case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) } @@ -69,6 +74,34 @@ private[io] object SelectionHandler { case object ChannelReadable case object ChannelWritable + private[io] abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { + + override def supervisorStrategy = connectionSupervisorStrategy + + val selectorPool = context.actorOf( + props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)), + name = "selectors") + + final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = { + case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd)) + } + } + + /** + * Special supervisor strategy for parents of TCP connection and listener actors. + * Stops the child on all errors and logs DeathPactExceptions only at debug level. + */ + private[io] final val connectionSupervisorStrategy: SupervisorStrategy = + new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) { + override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, + decision: SupervisorStrategy.Directive): Unit = + if (cause.isInstanceOf[DeathPactException]) { + try context.system.eventStream.publish { + Logging.Debug(child.path.toString, getClass, "Closed after handler termination") + } catch { case NonFatal(_) ⇒ } + } else super.logFailure(context, child, cause, decision) + } + private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry { private[this] val selector = SelectorProvider.provider.openSelector private[this] val wakeUp = new AtomicBoolean(false) diff --git a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala index 1114c64cec..af7ac2c7cd 100644 --- a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala +++ b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala @@ -47,7 +47,10 @@ object SslTlsSupport { * This pipeline stage implements SSL / TLS support, using an externally * configured [[SSLEngine]]. It operates on the level of [[Tcp.Event]] and * [[Tcp.Command]] messages, which means that it will typically be one of - * the lowest stages in a protocol stack. + * the lowest stages in a protocol stack. Since SSLEngine relies on contiguous + * transmission of a data stream you will need to handle backpressure from + * the TCP connection actor, for example by using a [[BackpressureBuffer]] + * underneath the SSL stage. * * Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB * allocated which is used by the SSLEngine. diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index a333037fb9..1f462995f9 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -15,6 +15,20 @@ import akka.util.Helpers.Requiring import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } +/** + * TCP Extension for Akka’s IO layer. + * + * For a full description of the design and philosophy behind this IO + * implementation please refer to {@see the Akka online documentation}. + * + * In order to open an outbound connection send a [[Tcp.Connect]] message + * to the [[TcpExt#manager]]. + * + * In order to start listening for inbound connetions send a [[Tcp.Bind]] + * message to the [[TcpExt#manager]]. + * + * The Java API for generating TCP commands is available at [[TcpMessage]]. + */ object Tcp extends ExtensionKey[TcpExt] { /** @@ -22,7 +36,11 @@ object Tcp extends ExtensionKey[TcpExt] { */ override def get(system: ActorSystem): TcpExt = super.get(system) - // shared socket options + /** + * Scala API: this object contains all applicable socket options for TCP. + * + * For the Java API see [[TcpSO]]. + */ object SO extends Inet.SoForwarders { // general socket options @@ -63,60 +81,180 @@ object Tcp extends ExtensionKey[TcpExt] { } - trait Message + /** + * The common interface for [[Command]] and [[Event]]. + */ + sealed trait Message /// COMMANDS /** * This is the common trait for all commands understood by TCP actors. */ - trait Command extends Message with IO.HasFailureMessage { + trait Command extends Message with SelectionHandler.HasFailureMessage { def failureMessage = CommandFailed(this) } /** - * The Connect message is sent to the [[TcpManager]], which is obtained via - * [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]] + * The Connect message is sent to the TCP manager actor, which is obtained via + * [[TcpExt#manager]]. Either the manager replies with a [[CommandFailed]] * or the actor handling the new connection replies with a [[Connected]] * message. + * + * @param remoteAddress is the address to connect to + * @param localAddress optionally specifies a specific address to bind to + * @param options Please refer to the [[SO]] object for a list of all supported options. */ case class Connect(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command + + /** + * The Bind message is send to the TCP manager actor, which is obtained via + * [[TcpExt#manager]] in order to bind to a listening socket. The manager + * replies either with a [[CommandFailed]] or the actor handling the listen + * socket replies with a [[Bound]] message. If the local port is set to 0 in + * the Bind message, then the [[Bound]] message should be inspected to find + * the actual port which was bound to. + * + * @param handler The actor which will receive all incoming connection requests + * in the form of [[Connected]] messages. + * + * @param localAddress The socket address to bind to; use port zero for + * automatic assignment (i.e. an ephemeral port, see [[Bound]]) + * + * @param backlog This specifies the number of unaccepted connections the O/S + * kernel will hold for this port before refusing connections. + * + * @param options Please refer to the [[SO]] object for a list of all supported options. + */ case class Bind(handler: ActorRef, localAddress: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command + /** + * This message must be sent to a TCP connection actor after receiving the + * [[Connected]] message. The connection will not read any data from the + * socket until this message is received, because this message defines the + * actor which will receive all inbound data. + * + * @param handler The actor which will receive all incoming data and which + * will be informed when the connection is closed. + * + * @param keepOpenOnPeerClosed If this is set to true then the connection + * is not automatically closed when the peer closes its half, + * requiring an explicit [[Closed]] from our side when finished. + * + * @param useResumeWriting If this is set to true then the connection actor + * will refuse all further writes after issuing a [[CommandFailed]] + * notification until [[ResumeWriting]] is received. This can + * be used to implement NACK-based write backpressure. + */ case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) extends Command + + /** + * In order to close down a listening socket, send this message to that socket’s + * actor (that is the actor which previously had sent the [[Bound]] message). The + * listener socket actor will reply with a [[Unbound]] message. + */ case object Unbind extends Command + /** + * Common interface for all commands which aim to close down an open connection. + */ sealed trait CloseCommand extends Command { + /** + * The corresponding event which is sent as an acknowledgment once the + * close operation is finished. + */ def event: ConnectionClosed } + + /** + * A normal close operation will first flush pending writes and then close the + * socket. The sender of this command and the registered handler for incoming + * data will both be notified once the socket is closed using a [[Closed]] + * message. + */ case object Close extends CloseCommand { + /** + * The corresponding event which is sent as an acknowledgment once the + * close operation is finished. + */ override def event = Closed } + + /** + * A confirmed close operation will flush pending writes and half-close the + * connection, waiting for the peer to close the other half. The sender of this + * command and the registered handler for incoming data will both be notified + * once the socket is closed using a [[ConfirmedClosed]] message. + */ case object ConfirmedClose extends CloseCommand { + /** + * The corresponding event which is sent as an acknowledgment once the + * close operation is finished. + */ override def event = ConfirmedClosed } + + /** + * An abort operation will not flush pending writes and will issue a TCP ABORT + * command to the O/S kernel which should result in a TCP_RST packet being sent + * to the peer. The sender of this command and the registered handler for + * incoming data will both be notified once the socket is closed using a + * [[Aborted]] message. + */ case object Abort extends CloseCommand { + /** + * The corresponding event which is sent as an acknowledgment once the + * close operation is finished. + */ override def event = Aborted } + /** + * Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[WriteCommand#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ case class NoAck(token: Any) extends Event + + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ object NoAck extends NoAck(null) + /** + * Common interface for all write commands, currently [[Write]] and [[WriteFile]]. + */ sealed trait WriteCommand extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def ack: Any + /** + * The acknowledgment token associated with this write command. + */ + def ack: Event + + /** + * An acknowledgment is only sent if this write command “wants an ack”, which is + * equivalent to the [[#ack]] token not being a of type [[NoAck]]. + */ def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } /** * Write data to the TCP connection. If no ack is needed use the special - * `NoAck` object. + * `NoAck` object. The connection actor will reply with a [[CommandFailed]] + * message if the write could not be enqueued. If [[WriteCommand#wantsAck]] + * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]] + * token once the write has been successfully enqueued to the O/S kernel. + * Note that this does not in any way guarantee that the data will be + * or have been sent! Unfortunately there is no way to determine whether + * a particular write has been sent by the O/S. */ case class Write(data: ByteString, ack: Event) extends WriteCommand object Write { @@ -137,50 +275,147 @@ object Tcp extends ExtensionKey[TcpExt] { /** * Write `count` bytes starting at `position` from file at `filePath` to the connection. - * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The - * count must be > 0. + * The count must be > 0. The connection actor will reply with a [[CommandFailed]] + * message if the write could not be enqueued. If [[WriteCommand#wantsAck]] + * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]] + * token once the write has been successfully enqueued to the O/S kernel. + * Note that this does not in any way guarantee that the data will be + * or have been sent! Unfortunately there is no way to determine whether + * a particular write has been sent by the O/S. */ - case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand { + case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand { require(position >= 0, "WriteFile.position must be >= 0") require(count > 0, "WriteFile.count must be > 0") } + /** + * When `useResumeWriting` is in effect as was indicated in the [[Register]] message + * then this command needs to be sent to the connection actor in order to re-enable + * writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the + * connection actor between the first [[CommandFailed]] and subsequent reception of + * this message will also be rejected with [[CommandFailed]]. + */ case object ResumeWriting extends Command + /** + * Sending this command to the connection actor will disable reading from the TCP + * socket. TCP flow-control will then propagate backpressure to the sender side + * as buffers fill up on either end. To re-enable reading send [[ResumeReading]]. + */ case object SuspendReading extends Command + + /** + * This command needs to be sent to the connection actor after a [[SuspendReading]] + * command in order to resume reading from the socket. + */ case object ResumeReading extends Command /// EVENTS + /** + * Common interface for all events generated by the TCP layer actors. + */ trait Event extends Message + /** + * Whenever data are read from a socket they will be transferred within this + * class to the handler actor which was designated in the [[Register]] message. + */ case class Received(data: ByteString) extends Event + + /** + * The connection actor sends this message either to the sender of a [[Connect]] + * command (for outbound) or to the handler for incoming connections designated + * in the [[Bind]] message. The connection is characterized by the `remoteAddress` + * and `localAddress` TCP endpoints. + */ case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event + + /** + * Whenever a command cannot be completed, the queried actor will reply with + * this message, wrapping the original command which failed. + */ case class CommandFailed(cmd: Command) extends Event + /** + * When `useResumeWriting` is in effect as indicated in the [[Register]] message, + * the [[ResumeWriting]] command will be acknowledged by this message type, upon + * which it is safe to send at least one write. This means that all writes preceding + * the first [[CommandFailed]] message have been enqueued to the O/S kernel at this + * point. + */ sealed trait WritingResumed extends Event case object WritingResumed extends WritingResumed + /** + * The sender of a [[Bind]] command will—in case of success—receive confirmation + * in this form. If the bind address indicated a 0 port number, then the contained + * `localAddress` can be used to find out which port was automatically assigned. + */ case class Bound(localAddress: InetSocketAddress) extends Event + + /** + * The sender of an [[Unbind]] command will receive confirmation through this + * message once the listening socket has been closed. + */ sealed trait Unbound extends Event case object Unbound extends Unbound + /** + * This is the common interface for all events which indicate that a connection + * has been closed or half-closed. + */ sealed trait ConnectionClosed extends Event { + /** + * `true` iff the connection has been closed in response to an [[Abort]] command. + */ def isAborted: Boolean = false + /** + * `true` iff the connection has been fully closed in response to a + * [[ConfirmedClose]] command. + */ def isConfirmed: Boolean = false + /** + * `true` iff the connection has been closed by the peer; in case + * `keepOpenOnPeerClosed` is in effect as per the [[Register]] command, + * this connection’s reading half is now closed. + */ def isPeerClosed: Boolean = false + /** + * `true` iff the connection has been closed due to an IO error. + */ def isErrorClosed: Boolean = false + /** + * If `isErrorClosed` returns true, then the error condition can be + * retrieved by this method. + */ def getErrorCause: String = null } + /** + * The connection has been closed normally in response to a [[Close]] command. + */ case object Closed extends ConnectionClosed + /** + * The connection has been aborted in response to an [[Abort]] command. + */ case object Aborted extends ConnectionClosed { override def isAborted = true } + /** + * The connection has been half-closed by us and then half-close by the peer + * in response to a [[ConfirmedClose]] command. + */ case object ConfirmedClosed extends ConnectionClosed { override def isConfirmed = true } + /** + * The peer has closed its writing half of the connection. + */ case object PeerClosed extends ConnectionClosed { override def isPeerClosed = true } + /** + * The connection has been closed due to an IO error. + */ case class ErrorClosed(cause: String) extends ConnectionClosed { override def isErrorClosed = true override def getErrorCause = cause @@ -218,10 +453,14 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { private[this] def getIntBytes(path: String): Int = { val size = getBytes(path) require(size < Int.MaxValue, s"$path must be < 2 GiB") + require(size >= 0, s"$path must be non-negative") size.toInt } } + /** + * + */ val manager: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf( props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher), @@ -237,10 +476,36 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher) } +/** + * Java API for accessing socket options. + */ object TcpSO extends SoJavaFactories { import Tcp.SO._ + + /** + * [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE + * + * For more information see [[java.net.Socket.setKeepAlive]] + */ def keepAlive(on: Boolean) = KeepAlive(on) + + /** + * [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt + * of TCP urgent data) By default, this option is disabled and TCP urgent + * data is silently discarded. + * + * For more information see [[java.net.Socket.setOOBInline]] + */ def oobInline(on: Boolean) = OOBInline(on) + + /** + * [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY + * (disable or enable Nagle's algorithm) + * + * Please note, that TCP_NODELAY is enabled by default. + * + * For more information see [[java.net.Socket.setTcpNoDelay]] + */ def tcpNoDelay(on: Boolean) = TcpNoDelay(on) } @@ -248,43 +513,183 @@ object TcpMessage { import language.implicitConversions import Tcp._ + /** + * The Connect message is sent to the TCP manager actor, which is obtained via + * [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]] + * or the actor handling the new connection replies with a [[Connected]] + * message. + * + * @param remoteAddress is the address to connect to + * @param localAddress optionally specifies a specific address to bind to + * @param options Please refer to [[TcpSO]] for a list of all supported options. + */ def connect(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(remoteAddress, Some(localAddress), options) + /** + * Connect to the given `remoteAddress` without binding to a local address. + */ def connect(remoteAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options) + /** + * Connect to the given `remoteAddress` without binding to a local address and without + * specifying options. + */ def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil) + /** + * The Bind message is send to the TCP manager actor, which is obtained via + * [[TcpExt#getManager]] in order to bind to a listening socket. The manager + * replies either with a [[CommandFailed]] or the actor handling the listen + * socket replies with a [[Bound]] message. If the local port is set to 0 in + * the Bind message, then the [[Bound]] message should be inspected to find + * the actual port which was bound to. + * + * @param handler The actor which will receive all incoming connection requests + * in the form of [[Connected]] messages. + * + * @param localAddress The socket address to bind to; use port zero for + * automatic assignment (i.e. an ephemeral port, see [[Bound]]) + * + * @param backlog This specifies the number of unaccepted connections the O/S + * kernel will hold for this port before refusing connections. + * + * @param options Please refer to [[TcpSO]] for a list of all supported options. + */ def bind(handler: ActorRef, endpoint: InetSocketAddress, backlog: Int, options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options) + /** + * Open a listening socket without specifying options. + */ def bind(handler: ActorRef, endpoint: InetSocketAddress, backlog: Int): Command = Bind(handler, endpoint, backlog, Nil) - def register(handler: ActorRef): Command = Register(handler) + /** + * This message must be sent to a TCP connection actor after receiving the + * [[Connected]] message. The connection will not read any data from the + * socket until this message is received, because this message defines the + * actor which will receive all inbound data. + * + * @param handler The actor which will receive all incoming data and which + * will be informed when the connection is closed. + * + * @param keepOpenOnPeerClosed If this is set to true then the connection + * is not automatically closed when the peer closes its half, + * requiring an explicit [[Closed]] from our side when finished. + * + * @param useResumeWriting If this is set to true then the connection actor + * will refuse all further writes after issuing a [[CommandFailed]] + * notification until [[ResumeWriting]] is received. This can + * be used to implement NACK-based write backpressure. + */ def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, useResumeWriting: Boolean): Command = Register(handler, keepOpenOnPeerClosed, useResumeWriting) + /** + * The same as `register(handler, false, false)`. + */ + def register(handler: ActorRef): Command = Register(handler) + + /** + * In order to close down a listening socket, send this message to that socket’s + * actor (that is the actor which previously had sent the [[Bound]] message). The + * listener socket actor will reply with a [[Unbound]] message. + */ def unbind: Command = Unbind + /** + * A normal close operation will first flush pending writes and then close the + * socket. The sender of this command and the registered handler for incoming + * data will both be notified once the socket is closed using a [[Closed]] + * message. + */ def close: Command = Close + + /** + * A confirmed close operation will flush pending writes and half-close the + * connection, waiting for the peer to close the other half. The sender of this + * command and the registered handler for incoming data will both be notified + * once the socket is closed using a [[ConfirmedClosed]] message. + */ def confirmedClose: Command = ConfirmedClose + + /** + * An abort operation will not flush pending writes and will issue a TCP ABORT + * command to the O/S kernel which should result in a TCP_RST packet being sent + * to the peer. The sender of this command and the registered handler for + * incoming data will both be notified once the socket is closed using a + * [[Aborted]] message. + */ def abort: Command = Abort - def noAck: NoAck = NoAck + /** + * Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[WriteCommand#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ def noAck(token: AnyRef): NoAck = NoAck(token) + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ + def noAck: NoAck = NoAck - def write(data: ByteString): Command = Write(data) + /** + * Write data to the TCP connection. If no ack is needed use the special + * `NoAck` object. The connection actor will reply with a [[CommandFailed]] + * message if the write could not be enqueued. If [[WriteCommand#wantsAck]] + * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]] + * token once the write has been successfully enqueued to the O/S kernel. + * Note that this does not in any way guarantee that the data will be + * or have been sent! Unfortunately there is no way to determine whether + * a particular write has been sent by the O/S. + */ def write(data: ByteString, ack: Event): Command = Write(data, ack) + /** + * The same as `write(data, noAck())`. + */ + def write(data: ByteString): Command = Write(data) - def suspendReading: Command = SuspendReading - def resumeReading: Command = ResumeReading + /** + * Write `count` bytes starting at `position` from file at `filePath` to the connection. + * The count must be > 0. The connection actor will reply with a [[CommandFailed]] + * message if the write could not be enqueued. If [[WriteCommand#wantsAck]] + * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]] + * token once the write has been successfully enqueued to the O/S kernel. + * Note that this does not in any way guarantee that the data will be + * or have been sent! Unfortunately there is no way to determine whether + * a particular write has been sent by the O/S. + */ + def writeFile(filePath: String, position: Long, count: Long, ack: Event): Command = + WriteFile(filePath, position, count, ack) + /** + * When `useResumeWriting` is in effect as was indicated in the [[Register]] message + * then this command needs to be sent to the connection actor in order to re-enable + * writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the + * connection actor between the first [[CommandFailed]] and subsequent reception of + * this message will also be rejected with [[CommandFailed]]. + */ def resumeWriting: Command = ResumeWriting + /** + * Sending this command to the connection actor will disable reading from the TCP + * socket. TCP flow-control will then propagate backpressure to the sender side + * as buffers fill up on either end. To re-enable reading send [[ResumeReading]]. + */ + def suspendReading: Command = SuspendReading + + /** + * This command needs to be sent to the connection actor after a [[SuspendReading]] + * command in order to resume reading from the socket. + */ + def resumeReading: Command = ResumeReading + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { import scala.collection.JavaConverters._ - coll.asScala.to + coll.asScala.to[immutable.Traversable] } } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 946c63c07e..6bd93baa60 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -11,7 +11,6 @@ import scala.util.control.NonFatal import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ import akka.io.Tcp._ -import akka.io.IO.HasFailureMessage import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** @@ -64,7 +63,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, context.stop(self) } - override def supervisorStrategy = IO.connectionSupervisorStrategy + override def supervisorStrategy = SelectionHandler.connectionSupervisorStrategy def receive: Receive = { case registration: ChannelRegistration ⇒ diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index d4dc1d25a3..237011c8f3 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -6,7 +6,6 @@ package akka.io import Tcp._ import akka.actor.{ ActorLogging, Props } -import akka.io.IO.SelectorBasedManager /** * INTERNAL API @@ -45,7 +44,8 @@ import akka.io.IO.SelectorBasedManager * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. * */ -private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { +private[io] class TcpManager(tcp: TcpExt) + extends SelectionHandler.SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { def receive = workerForCommandHandler { case c: Connect ⇒ diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 04779c47b7..4126f603ec 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -12,6 +12,18 @@ import akka.util.Helpers.Requiring import akka.util.ByteString import akka.actor._ +/** + * UDP Extension for Akka’s IO layer. + * + * This extension implements the connectionless UDP protocol without + * calling `connect` on the underlying sockets, i.e. without restricting + * from whom data can be received. For “connected” UDP mode see [[UdpConnected]]. + * + * For a full description of the design and philosophy behind this IO + * implementation please refer to {@see the Akka online documentation}. + * + * The Java API for generating UDP commands is available at [[UdpMessage]]. + */ object Udp extends ExtensionKey[UdpExt] { /** @@ -19,14 +31,49 @@ object Udp extends ExtensionKey[UdpExt] { */ override def get(system: ActorSystem): UdpExt = super.get(system) - trait Command extends IO.HasFailureMessage { + /** + * The common interface for [[Command]] and [[Event]]. + */ + sealed trait Message + + /** + * The common type of all commands supported by the UDP implementation. + */ + trait Command extends SelectionHandler.HasFailureMessage with Message { def failureMessage = CommandFailed(this) } - case class NoAck(token: Any) + /** + * Each [[Send]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[Send#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ + case class NoAck(token: Any) extends Event + + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ object NoAck extends NoAck(null) - case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { + /** + * This message is understood by the “simple sender” which can be obtained by + * sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by + * the listener actors which are created in response to [[Bind]]. It will send + * the given payload data as one UDP datagram to the given target address. The + * UDP actor will respond with [[CommandFailed]] if the send could not be + * enqueued to the O/S kernel because the send buffer was full. If the given + * `ack` is not of type [[NoAck]] the UDP actor will reply with the given + * object as soon as the datagram has been successfully enqueued to the O/S + * kernel. + * + * The sending UDP socket’s address belongs to the “simple sender” which does + * not handle inbound datagrams and sends from an ephemeral port; therefore + * sending using this mechanism is not suitable if replies are expected, use + * [[Bind]] in that case. + */ + case class Send(payload: ByteString, target: InetSocketAddress, ack: Event) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") def wantsAck: Boolean = !ack.isInstanceOf[NoAck] @@ -35,32 +82,92 @@ object Udp extends ExtensionKey[UdpExt] { def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) } + /** + * Send this message to the [[UdpExt#manager]] in order to bind to the given + * local port (or an automatically assigned one if the port number is zero). + * The listener actor for the newly bound port will reply with a [[Bound]] + * message, or the manager will reply with a [[CommandFailed]] message. + */ case class Bind(handler: ActorRef, localAddress: InetSocketAddress, options: immutable.Traversable[SocketOption] = Nil) extends Command + + /** + * Send this message to the listener actor that previously sent a [[Bound]] + * message in order to close the listening socket. The recipient will reply + * with an [[Unbound]] message. + */ case object Unbind extends Command + /** + * Retrieve a reference to a “simple sender” actor of the UDP extension. + * The newly created “simple sender” will reply with the [[SimpleSenderReady]] notification. + * + * The “simple sender” is a convenient service for being able to send datagrams + * when the originating address is meaningless, i.e. when no reply is expected. + * + * The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]] + * when you want to close the socket. + */ case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command object SimpleSender extends SimpleSender(Nil) - case object StopReading extends Command + /** + * Send this message to a listener actor (which sent a [[Bound]] message) to + * have it stop reading datagrams from the network. If the O/S kernel’s receive + * buffer runs full then subsequent datagrams will be silently discarded. + * Re-enable reading from the socket using the [[ResumeReading]] command. + */ + case object SuspendReading extends Command + + /** + * This message must be sent to the listener actor to re-enable reading from + * the socket after a [[SuspendReading]] command. + */ case object ResumeReading extends Command - trait Event + /** + * The common type of all events emitted by the UDP implementation. + */ + trait Event extends Message + /** + * When a listener actor receives a datagram from its socket it will send + * it to the handler designated in the [[Bind]] message using this message type. + */ case class Received(data: ByteString, sender: InetSocketAddress) extends Event + + /** + * When a command fails it will be replied to with this message type, + * wrapping the failing command object. + */ case class CommandFailed(cmd: Command) extends Event + /** + * This message is sent by the listener actor in response to a [[Bind]] command. + * If the address to bind to specified a port number of zero, then this message + * can be inspected to find out which port was automatically assigned. + */ case class Bound(localAddress: InetSocketAddress) extends Event - sealed trait SimpleSendReady extends Event - case object SimpleSendReady extends SimpleSendReady + /** + * The “simple sender” sends this message type in response to a [[SimpleSender]] query. + */ + sealed trait SimpleSenderReady extends Event + case object SimpleSenderReady extends SimpleSenderReady + /** + * This message is sent by the listener actor in response to an [[Unbind]] command + * after the socket has been closed. + */ sealed trait Unbound case object Unbound extends Unbound - case class SendFailed(cause: Throwable) extends Event - + /** + * Scala API: This object provides access to all socket options applicable to UDP sockets. + * + * For the Java API see [[UdpSO]]. + */ object SO extends Inet.SoForwarders { /** @@ -106,7 +213,15 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension { name = "IO-UDP-FF") } - val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) + /** + * Java API: retrieve the UDP manager actor’s reference. + */ + def getManager: ActorRef = manager + + /** + * INTERNAL API + */ + private[io] val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) } /** @@ -118,24 +233,99 @@ object UdpMessage { import scala.collection.JavaConverters._ import language.implicitConversions - def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target) - def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack) + /** + * Each [[Send]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[Send#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ + def noAck(token: AnyRef): NoAck = NoAck(token) + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ + def noAck: NoAck = NoAck - def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind = + /** + * This message is understood by the “simple sender” which can be obtained by + * sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by + * the listener actors which are created in response to [[Bind]]. It will send + * the given payload data as one UDP datagram to the given target address. The + * UDP actor will respond with [[CommandFailed]] if the send could not be + * enqueued to the O/S kernel because the send buffer was full. If the given + * `ack` is not of type [[NoAck]] the UDP actor will reply with the given + * object as soon as the datagram has been successfully enqueued to the O/S + * kernel. + * + * The sending UDP socket’s address belongs to the “simple sender” which does + * not handle inbound datagrams and sends from an ephemeral port; therefore + * sending using this mechanism is not suitable if replies are expected, use + * [[Bind]] in that case. + */ + def send(payload: ByteString, target: InetSocketAddress, ack: Event): Command = Send(payload, target, ack) + /** + * The same as `send(payload, target, noAck())`. + */ + def send(payload: ByteString, target: InetSocketAddress): Command = Send(payload, target) + + /** + * Send this message to the [[UdpExt#manager]] in order to bind to the given + * local port (or an automatically assigned one if the port number is zero). + * The listener actor for the newly bound port will reply with a [[Bound]] + * message, or the manager will reply with a [[CommandFailed]] message. + */ + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Command = Bind(handler, endpoint, options.asScala.to) + /** + * Bind without specifying options. + */ + def bind(handler: ActorRef, endpoint: InetSocketAddress): Command = Bind(handler, endpoint, Nil) - def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil) + /** + * Send this message to the listener actor that previously sent a [[Bound]] + * message in order to close the listening socket. The recipient will reply + * with an [[Unbound]] message. + */ + def unbind: Command = Unbind - def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to) - def simpleSender: SimpleSender = SimpleSender + /** + * Retrieve a reference to a “simple sender” actor of the UDP extension. + * The newly created “simple sender” will reply with the [[SimpleSenderReady]] notification. + * + * The “simple sender” is a convenient service for being able to send datagrams + * when the originating address is meaningless, i.e. when no reply is expected. + * + * The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]] + * when you want to close the socket. + */ + def simpleSender(options: JIterable[SocketOption]): Command = SimpleSender(options.asScala.to) + /** + * Retrieve a simple sender without specifying options. + */ + def simpleSender: Command = SimpleSender - def unbind: Unbind.type = Unbind + /** + * Send this message to a listener actor (which sent a [[Bound]] message) to + * have it stop reading datagrams from the network. If the O/S kernel’s receive + * buffer runs full then subsequent datagrams will be silently discarded. + * Re-enable reading from the socket using the [[ResumeReading]] command. + */ + def suspendReading: Command = SuspendReading - def stopReading: StopReading.type = StopReading - def resumeReading: ResumeReading.type = ResumeReading + /** + * This message must be sent to the listener actor to re-enable reading from + * the socket after a [[SuspendReading]] command. + */ + def resumeReading: Command = ResumeReading } object UdpSO extends SoJavaFactories { import Udp.SO._ + + /** + * [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option + * + * For more information see [[java.net.DatagramSocket#setBroadcast]] + */ def broadcast(on: Boolean) = Broadcast(on) } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnected.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala index 600317a8b9..3498bce045 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnected.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala @@ -11,21 +11,61 @@ import akka.io.Udp.UdpSettings import akka.util.ByteString import akka.actor._ +/** + * UDP Extension for Akka’s IO layer. + * + * This extension implements the connectionless UDP protocol with + * calling `connect` on the underlying sockets, i.e. with restricting + * from whom data can be received. For “unconnected” UDP mode see [[Udp]]. + * + * For a full description of the design and philosophy behind this IO + * implementation please refer to {@see the Akka online documentation}. + * + * The Java API for generating UDP commands is available at [[UdpConnectedMessage]]. + */ object UdpConnected extends ExtensionKey[UdpConnectedExt] { /** * Java API: retrieve the UdpConnected extension for the given system. */ override def get(system: ActorSystem): UdpConnectedExt = super.get(system) - trait Command extends IO.HasFailureMessage { + /** + * The common interface for [[Command]] and [[Event]]. + */ + sealed trait Message + + /** + * The common type of all commands supported by the UDP implementation. + */ + trait Command extends SelectionHandler.HasFailureMessage with Message { def failureMessage = CommandFailed(this) } - case class NoAck(token: Any) + /** + * Each [[Send]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[Send#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ + case class NoAck(token: Any) extends Event + + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ object NoAck extends NoAck(null) + /** + * This message is understood by the connection actors to send data to their + * designated destination. The connection actor will respond with + * [[CommandFailed]] if the send could not be enqueued to the O/S kernel + * because the send buffer was full. If the given `ack` is not of type [[NoAck]] + * the connection actor will reply with the given object as soon as the datagram + * has been successfully enqueued to the O/S kernel. + */ case class Send(payload: ByteString, ack: Any) extends Command { - require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") + require(ack + != null, "ack must be non-null. Use NoAck if you don't want acks.") def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } @@ -33,29 +73,70 @@ object UdpConnected extends ExtensionKey[UdpConnectedExt] { def apply(data: ByteString): Send = Send(data, NoAck) } + /** + * Send this message to the [[UdpExt#manager]] in order to bind to a local + * port (optionally with the chosen `localAddress`) and create a UDP socket + * which is restricted to sending to and receiving from the given `remoteAddress`. + * All received datagrams will be sent to the designated `handler` actor. + */ case class Connect(handler: ActorRef, remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command - case object StopReading extends Command + /** + * Send this message to a connection actor (which had previously sent the + * [[Connected]] message) in order to close the socket. The connection actor + * will reply with a [[Disconnected]] message. + */ + case object Disconnect extends Command + + /** + * Send this message to a listener actor (which sent a [[Bound]] message) to + * have it stop reading datagrams from the network. If the O/S kernel’s receive + * buffer runs full then subsequent datagrams will be silently discarded. + * Re-enable reading from the socket using the [[ResumeReading]] command. + */ + case object SuspendReading extends Command + + /** + * This message must be sent to the listener actor to re-enable reading from + * the socket after a [[SuspendReading]] command. + */ case object ResumeReading extends Command - trait Event + /** + * The common type of all events emitted by the UDP implementation. + */ + trait Event extends Message + /** + * When a connection actor receives a datagram from its socket it will send + * it to the handler designated in the [[Bind]] message using this message type. + */ case class Received(data: ByteString) extends Event + + /** + * When a command fails it will be replied to with this message type, + * wrapping the failing command object. + */ case class CommandFailed(cmd: Command) extends Event + /** + * This message is sent by the connection actor to the actor which sent the + * [[Connect]] message when the UDP socket has been bound to the local and + * remote addresses given. + */ sealed trait Connected extends Event case object Connected extends Connected + /** + * This message is sent by the connection actor to the actor which sent the + * [[Disconnect]] message when the UDP socket has been closed. + */ sealed trait Disconnected extends Event case object Disconnected extends Disconnected - case object Close extends Command - - case class SendFailed(cause: Throwable) extends Event - } class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { @@ -68,6 +149,11 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension { name = "IO-UDP-CONN") } + /** + * Java API: retrieve the UDP manager actor’s reference. + */ + def getManager: ActorRef = manager + val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) } @@ -79,29 +165,79 @@ object UdpConnectedMessage { import language.implicitConversions import UdpConnected._ + /** + * Send this message to the [[UdpExt#manager]] in order to bind to a local + * port (optionally with the chosen `localAddress`) and create a UDP socket + * which is restricted to sending to and receiving from the given `remoteAddress`. + * All received datagrams will be sent to the designated `handler` actor. + */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress, localAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options) + /** + * Connect without specifying the `localAddress`. + */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress, options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options) + /** + * Connect without specifying the `localAddress` or `options`. + */ def connect(handler: ActorRef, remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil) - def send(data: ByteString): Command = Send(data) + /** + * This message is understood by the connection actors to send data to their + * designated destination. The connection actor will respond with + * [[CommandFailed]] if the send could not be enqueued to the O/S kernel + * because the send buffer was full. If the given `ack` is not of type [[NoAck]] + * the connection actor will reply with the given object as soon as the datagram + * has been successfully enqueued to the O/S kernel. + */ def send(data: ByteString, ack: AnyRef): Command = Send(data, ack) + /** + * Send without requesting acknowledgment. + */ + def send(data: ByteString): Command = Send(data) - def close: Command = Close + /** + * Send this message to a connection actor (which had previously sent the + * [[Connected]] message) in order to close the socket. The connection actor + * will reply with a [[Disconnected]] message. + */ + def disconnect: Command = Disconnect - def noAck: NoAck = NoAck + /** + * Each [[Send]] can optionally request a positive acknowledgment to be sent + * to the commanding actor. If such notification is not desired the [[Send#ack]] + * must be set to an instance of this class. The token contained within can be used + * to recognize which write failed when receiving a [[CommandFailed]] message. + */ def noAck(token: AnyRef): NoAck = NoAck(token) - def stopReading: Command = StopReading + /** + * Default [[NoAck]] instance which is used when no acknowledgment information is + * explicitly provided. Its “token” is `null`. + */ + def noAck: NoAck = NoAck + + /** + * Send this message to a listener actor (which sent a [[Bound]] message) to + * have it stop reading datagrams from the network. If the O/S kernel’s receive + * buffer runs full then subsequent datagrams will be silently discarded. + * Re-enable reading from the socket using the [[ResumeReading]] command. + */ + def suspendReading: Command = SuspendReading + + /** + * This message must be sent to the listener actor to re-enable reading from + * the socket after a [[SuspendReading]] command. + */ def resumeReading: Command = ResumeReading implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { import scala.collection.JavaConverters._ - coll.asScala.to + coll.asScala.to[immutable.Traversable] } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala index f2328fdca6..4bacc4db44 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala @@ -4,14 +4,13 @@ package akka.io import akka.actor.Props -import akka.io.IO.SelectorBasedManager import akka.io.UdpConnected.Connect /** * INTERNAL API */ private[io] class UdpConnectedManager(udpConn: UdpConnectedExt) - extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { + extends SelectionHandler.SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { def receive = workerForCommandHandler { case c: Connect ⇒ diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 59bacc495d..3eb6778b25 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -58,11 +58,11 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt, } def connected(registration: ChannelRegistration): Receive = { - case StopReading ⇒ registration.disableInterest(OP_READ) + case SuspendReading ⇒ registration.disableInterest(OP_READ) case ResumeReading ⇒ registration.enableInterest(OP_READ) case ChannelReadable ⇒ doRead(registration, handler) - case Close ⇒ + case Disconnect ⇒ log.debug("Closing UDP connection to [{}]", remoteAddress) channel.close() sender ! Disconnected diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index 667958e5b7..b4ab0e2caf 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -60,7 +60,7 @@ private[io] class UdpListener(val udp: UdpExt, } def readHandlers(registration: ChannelRegistration): Receive = { - case StopReading ⇒ registration.disableInterest(OP_READ) + case SuspendReading ⇒ registration.disableInterest(OP_READ) case ResumeReading ⇒ registration.enableInterest(OP_READ) case ChannelReadable ⇒ doReceive(registration, bind.handler) diff --git a/akka-actor/src/main/scala/akka/io/UdpManager.scala b/akka-actor/src/main/scala/akka/io/UdpManager.scala index 092222a35e..b7e91d6ab6 100644 --- a/akka-actor/src/main/scala/akka/io/UdpManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpManager.scala @@ -4,7 +4,6 @@ package akka.io import akka.actor.Props -import akka.io.IO.SelectorBasedManager import akka.io.Udp._ /** @@ -38,13 +37,13 @@ import akka.io.Udp._ * == Simple send == * * Udp provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor - * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady + * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSenderReady * message that the service is available. UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the - * sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be + * sender of SimpleSenderReady. All the datagrams will contain an ephemeral local port as sender and answers will be * discarded. * */ -private[io] class UdpManager(udp: UdpExt) extends SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) { +private[io] class UdpManager(udp: UdpExt) extends SelectionHandler.SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) { def receive = workerForCommandHandler { case b: Bind ⇒ diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index 921414e6e9..925c6dbf92 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -33,8 +33,8 @@ private[io] class UdpSender(val udp: UdpExt, def receive: Receive = { case registration: ChannelRegistration ⇒ - context.become(sendHandlers(registration), discardOld = true) - commander ! SimpleSendReady + commander ! SimpleSenderReady + context.become(sendHandlers(registration)) } override def postStop(): Unit = if (channel.isOpen) { diff --git a/akka-actor/src/main/scala/akka/io/WithUdpSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala index aadfedd527..37be5c7a68 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala @@ -14,12 +14,12 @@ import akka.io.SelectionHandler._ private[io] trait WithUdpSend { me: Actor with ActorLogging ⇒ - var pendingSend: Send = null - var pendingCommander: ActorRef = null + private var pendingSend: Send = null + private var pendingCommander: ActorRef = null // If send fails first, we allow a second go after selected writable, but no more. This flag signals that // pending send was already tried once. - var retriedSend = false - def hasWritePending = pendingSend ne null + private var retriedSend = false + private def hasWritePending = pendingSend ne null def channel: DatagramChannel def udp: UdpExt @@ -44,7 +44,7 @@ private[io] trait WithUdpSend { case ChannelWritable ⇒ if (hasWritePending) doSend(registration) } - final def doSend(registration: ChannelRegistration): Unit = { + private def doSend(registration: ChannelRegistration): Unit = { val buffer = udp.bufferPool.acquire() try { buffer.clear() diff --git a/akka-docs/rst/java/code/docs/io/UdpDocTest.java b/akka-docs/rst/java/code/docs/io/UdpDocTest.java index 5b61d3b21f..fd03021357 100644 --- a/akka-docs/rst/java/code/docs/io/UdpDocTest.java +++ b/akka-docs/rst/java/code/docs/io/UdpDocTest.java @@ -6,86 +6,171 @@ package docs.io; //#imports import akka.actor.ActorRef; -import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import akka.actor.UntypedActor; -import akka.io.Inet; import akka.io.Udp; +import akka.io.UdpConnected; +import akka.io.UdpConnectedMessage; import akka.io.UdpMessage; -import akka.io.UdpSO; +import akka.japi.Procedure; import akka.util.ByteString; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; //#imports -import akka.testkit.AkkaJUnitActorSystemResource; -import org.junit.ClassRule; -import org.junit.Test; - - public class UdpDocTest { - static public class Demo extends UntypedActor { - ActorSystem system = context().system(); - public void onReceive(Object message) { - //#manager - final ActorRef udp = Udp.get(system).manager(); - //#manager + //#sender + public static class SimpleSender extends UntypedActor { + final InetSocketAddress remote; - //#simplesend - udp.tell(UdpMessage.simpleSender(), getSelf()); + public SimpleSender(InetSocketAddress remote) { + this.remote = remote; + + // request creation of a SimpleSender + final ActorRef mgr = Udp.get(getContext().system()).getManager(); + mgr.tell(UdpMessage.simpleSender(), getSelf()); + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof Udp.SimpleSenderReady) { + getContext().become(ready(getSender())); + //#sender + getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf()); + //#sender + } else unhandled(msg); + } + + private Procedure ready(final ActorRef send) { + return new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof String) { + final String str = (String) msg; + send.tell(UdpMessage.send(ByteString.fromString(str), remote), getSelf()); + //#sender + if (str.equals("world")) { + send.tell(PoisonPill.getInstance(), getSelf()); + } + //#sender - // ... or with socket options: - final List options = new ArrayList(); - options.add(UdpSO.broadcast(true)); - udp.tell(UdpMessage.simpleSender(), getSelf()); - //#simplesend - - ActorRef simpleSender = null; - - //#simplesend-finish - if (message instanceof Udp.SimpleSendReady) { - simpleSender = getSender(); - } - //#simplesend-finish - - final ByteString data = ByteString.empty(); - - //#simplesend-send - simpleSender.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); - //#simplesend-send - - final ActorRef handler = getSelf(); - - //#bind - udp.tell(UdpMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); - //#bind - - ActorRef udpWorker = null; - - //#bind-finish - if (message instanceof Udp.Bound) { - udpWorker = getSender(); - } - //#bind-finish - - //#bind-receive - if (message instanceof Udp.Received) { - final Udp.Received rcvd = (Udp.Received) message; - final ByteString payload = rcvd.data(); - final InetSocketAddress sender = rcvd.sender(); - } - //#bind-receive - - //#bind-send - udpWorker.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); - //#bind-send + } else unhandled(msg); + } + }; } } + //#sender + + //#listener + public static class Listener extends UntypedActor { + final ActorRef nextActor; - @Test - public void demonstrateConnect() { + public Listener(ActorRef nextActor) { + this.nextActor = nextActor; + + // request creation of a bound listen socket + final ActorRef mgr = Udp.get(getContext().system()).getManager(); + mgr.tell( + UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)), + getSelf()); + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof Udp.Bound) { + final Udp.Bound b = (Udp.Bound) msg; + //#listener + nextActor.tell(b.localAddress(), getSender()); + //#listener + getContext().become(ready(getSender())); + } else unhandled(msg); + } + + private Procedure ready(final ActorRef socket) { + return new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof Udp.Received) { + final Udp.Received r = (Udp.Received) msg; + // echo server example: send back the data + socket.tell(UdpMessage.send(r.data(), r.sender()), getSelf()); + // or do some processing and forward it on + final Object processed = // parse data etc., e.g. using PipelineStage + //#listener + r.data().utf8String(); + //#listener + nextActor.tell(processed, getSelf()); + + } else if (msg.equals(UdpMessage.unbind())) { + socket.tell(msg, getSelf()); + + } else if (msg instanceof Udp.Unbound) { + getContext().stop(getSelf()); + + } else unhandled(msg); + } + }; + } } + //#listener + + //#connected + public static class Connected extends UntypedActor { + final InetSocketAddress remote; + + public Connected(InetSocketAddress remote) { + this.remote = remote; + + // create a restricted a.k.a. “connected” socket + final ActorRef mgr = UdpConnected.get(getContext().system()).getManager(); + mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf()); + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof UdpConnected.Connected) { + getContext().become(ready(getSender())); + //#connected + getSender() + .tell(UdpConnectedMessage.send(ByteString.fromString("hello")), + getSelf()); + //#connected + } else unhandled(msg); + } + + private Procedure ready(final ActorRef connection) { + return new Procedure() { + @Override + public void apply(Object msg) throws Exception { + if (msg instanceof UdpConnected.Received) { + final UdpConnected.Received r = (UdpConnected.Received) msg; + // process data, send it on, etc. + // #connected + if (r.data().utf8String().equals("hello")) { + connection.tell( + UdpConnectedMessage.send(ByteString.fromString("world")), + getSelf()); + } + // #connected + + } else if (msg instanceof String) { + final String str = (String) msg; + connection + .tell(UdpConnectedMessage.send(ByteString.fromString(str)), + getSelf()); + + } else if (msg.equals(UdpConnectedMessage.disconnect())) { + connection.tell(msg, getSelf()); + + } else if (msg instanceof UdpConnected.Disconnected) { + getContext().stop(getSelf()); + + } else unhandled(msg); + } + }; + } + } + //#connected } diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index 47fba4e5aa..f78c45f843 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -652,113 +652,85 @@ and forward any replies to some ``listener`` actor. Using UDP --------- -UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams -to any remote address. Connection-based UDP workers are linked to a single remote address. +UDP is a connectionless datagram protocol which offers two different ways of +communication on the JDK level: -The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending -UDP datagrams. - -.. includecode:: code/docs/io/UdpDocTest.java#manager + * sockets which are free to send datagrams to any destination and receive + datagrams from any origin -The connection-based UDP manager is accessed through ``UdpConnected``. + * sockets which are restricted to communication with one specific remote + socket address -.. includecode:: code/docs/io/UdpConnectedDocTest.java#manager +In the low-level API the distinction is made—confusingly—by whether or not +:meth:`connect` has been called on the socket (even when connect has been +called the protocol is still connectionless). These two forms of UDP usage are +offered using distinct IO extensions described below. -UDP servers can be only implemented by the connectionless API, but clients can use both. - -Connectionless UDP -^^^^^^^^^^^^^^^^^^ - -The following imports are assumed in the following sections: - -.. includecode:: code/docs/io/UdpDocTest.java#imports +Unconnected UDP +^^^^^^^^^^^^^^^ Simple Send ............ -To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the -``Udp`` manager: +.. includecode:: code/docs/io/UdpDocTest.java#sender -.. includecode:: code/docs/io/UdpDocTest.java#simplesend +The simplest form of UDP usage is to just send datagrams without the need of +getting a reply. To this end a “simple sender” facility is provided as +demonstrated above. The UDP extension is queried using the +:meth:`simpleSender` message, which is answered by a :class:`SimpleSenderReady` +notification. The sender of this message is the newly created sender actor +which from this point onward can be used to send datagrams to arbitrary +destinations; in this example it will just send any UTF-8 encoded +:class:`String` it receives to a predefined remote address. -The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: - -.. includecode:: code/docs/io/UdpDocTest.java#simplesend-finish - -After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple -message send: - -.. includecode:: code/docs/io/UdpDocTest.java#simplesend-send +.. note:: + The simple sender will not shut itself down because it cannot know when you + are done with it. You will need to send it a :class:`PoisonPill` when you + want to close the ephemeral port the sender is bound to. Bind (and Send) ............... -To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP -manager +.. includecode:: code/docs/io/UdpDocTest.java#listener -.. includecode:: code/docs/io/UdpDocTest.java#bind +If you want to implement a UDP server which listens on a socket for incoming +datagrams then you need to use the :meth:`bind` command as shown above. The +local address specified may have a zero port in which case the operating system +will automatically choose a free port and assign it to the new socket. Which +port was actually bound can be found out by inspecting the :class:`Bound` +message. -After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of -this message is the worker for the UDP channel bound to the local address. +The sender of the :class:`Bound` message is the actor which manages the new +socket. Sending datagrams is achieved by using the :meth:`send` message type +and the socket can be closed by sending a :meth:`unbind` command, in which +case the socket actor will reply with a :class:`Unbound` notification. -.. includecode:: code/docs/io/UdpDocTest.java#bind-finish +Received datagrams are sent to the actor designated in the :meth:`bind` +message, whereas the :class:`Bound` message will be sent to the sender of the +:meth:`bind`. -The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: +Connected UDP +^^^^^^^^^^^^^ -.. includecode:: code/docs/io/UdpDocTest.java#bind-receive +The service provided by the connection based UDP API is similar to the +bind-and-send service we saw earlier, but the main difference is that a +connection is only able to send to the ``remoteAddress`` it was connected to, +and will receive datagrams only from that address. -The ``Received`` message contains the payload of the datagram and the address of the sender. - -It is also possible to send UDP datagrams using the ``ActorRef`` of the worker: - -.. includecode:: code/docs/io/UdpDocTest.java#bind-send +.. includecode:: code/docs/io/UdpDocTest.java#connected +Consequently the example shown here looks quite similar to the previous one, +the biggest difference is the absence of remote address information in +:meth:`send` and :class:`Received` messages. .. note:: - The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case - the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined - ephemeral port. - -Connection based UDP -^^^^^^^^^^^^^^^^^^^^ - -The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but -the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will -receive datagrams only from that address. - -Connecting is similar to what we have seen in the previous section: - -.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect - -Or, with more options: - -.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect-with-options - -After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of -this message is the worker for the UDP connection. - -.. includecode:: code/docs/io/UdpConnectedDocTest.java#connected - -The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: - -.. includecode:: code/docs/io/UdpConnectedDocTest.java#received - -The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address -is provided, as a UDP connection only receives messages from the endpoint it has been connected to. - -It is also possible to send UDP datagrams using the ``ActorRef`` of the worker: - -.. includecode:: code/docs/io/UdpConnectedDocTest.java#send - -Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address -will always be the endpoint we originally connected to. - -.. note:: - There is a small performance benefit in using connection based UDP API over the connectionless one. - If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security - check, while in the case of connection-based UDP the security check is cached after connect, thus writes does - not suffer an additional performance penalty. + + There is a small performance benefit in using connection based UDP API over + the connectionless one. If there is a SecurityManager enabled on the system, + every connectionless message send has to go through a security check, while + in the case of connection-based UDP the security check is cached after + connect, thus writes do not suffer an additional performance penalty. Architecture in-depth --------------------- diff --git a/akka-docs/rst/scala/code/docs/io/UdpDocSpec.scala b/akka-docs/rst/scala/code/docs/io/UdpDocSpec.scala new file mode 100644 index 0000000000..ac0ee7d3d9 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/io/UdpDocSpec.scala @@ -0,0 +1,167 @@ +package docs.io + +import akka.testkit.AkkaSpec +import akka.actor.Actor +import akka.io.IO +import akka.io.Udp +import akka.actor.ActorRef +import java.net.InetSocketAddress +import akka.util.ByteString +import akka.testkit.TestProbe +import akka.actor.Props +import scala.concurrent.duration._ +import akka.actor.PoisonPill +import akka.io.UdpConnected + +object ScalaUdpDocSpec { + + //#sender + class SimpleSender(remote: InetSocketAddress) extends Actor { + import context.system + IO(Udp) ! Udp.SimpleSender + + def receive = { + case Udp.SimpleSenderReady ⇒ + context.become(ready(sender)) + //#sender + sender ! Udp.Send(ByteString("hello"), remote) + //#sender + } + + def ready(send: ActorRef): Receive = { + case msg: String ⇒ + send ! Udp.Send(ByteString(msg), remote) + //#sender + if (msg == "world") send ! PoisonPill + //#sender + } + } + //#sender + + //#listener + class Listener(nextActor: ActorRef) extends Actor { + import context.system + IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0)) + + def receive = { + case Udp.Bound(local) ⇒ + //#listener + nextActor forward local + //#listener + context.become(ready(sender)) + } + + def ready(socket: ActorRef): Receive = { + case Udp.Received(data, remote) ⇒ + val processed = // parse data etc., e.g. using PipelineStage + //#listener + data.utf8String + //#listener + socket ! Udp.Send(data, remote) // example server echoes back + nextActor ! processed + case Udp.Unbind ⇒ socket ! Udp.Unbind + case Udp.Unbound ⇒ context.stop(self) + } + } + //#listener + + //#connected + class Connected(remote: InetSocketAddress) extends Actor { + import context.system + IO(UdpConnected) ! UdpConnected.Connect(self, remote) + + def receive = { + case UdpConnected.Connected ⇒ + context.become(ready(sender)) + //#connected + sender ! UdpConnected.Send(ByteString("hello")) + //#connected + } + + def ready(connection: ActorRef): Receive = { + case UdpConnected.Received(data) ⇒ + // process data, send it on, etc. + //#connected + if (data.utf8String == "hello") + connection ! UdpConnected.Send(ByteString("world")) + //#connected + case msg: String ⇒ + connection ! UdpConnected.Send(ByteString(msg)) + case d @ UdpConnected.Disconnect ⇒ connection ! d + case UdpConnected.Disconnected ⇒ context.stop(self) + } + } + //#connected + +} + +abstract class UdpDocSpec extends AkkaSpec { + + def listenerProps(next: ActorRef): Props + def simpleSenderProps(remote: InetSocketAddress): Props + def connectedProps(remote: InetSocketAddress): Props + + "demonstrate Udp" in { + val probe = TestProbe() + val listen = watch(system.actorOf(listenerProps(probe.ref))) + val local = probe.expectMsgType[InetSocketAddress] + val listener = probe.lastSender + val send = system.actorOf(simpleSenderProps(local)) + probe.expectMsg("hello") + send ! "world" + probe.expectMsg("world") + listen ! Udp.Unbind + expectTerminated(listen) + } + + "demonstrate Udp suspend reading" in { + val probe = TestProbe() + val listen = watch(system.actorOf(listenerProps(probe.ref))) + val local = probe.expectMsgType[InetSocketAddress] + val listener = probe.lastSender + listener ! Udp.SuspendReading + Thread.sleep(1000) // no way to find out when the above is finished + val send = system.actorOf(simpleSenderProps(local)) + probe.expectNoMsg(500.millis) + listener ! Udp.ResumeReading + probe.expectMsg("hello") + send ! "world" + probe.expectMsg("world") + listen ! Udp.Unbind + expectTerminated(listen) + } + + "demonstrate UdpConnected" in { + val probe = TestProbe() + val listen = watch(system.actorOf(listenerProps(probe.ref))) + val local = probe.expectMsgType[InetSocketAddress] + val listener = probe.lastSender + val conn = watch(system.actorOf(connectedProps(local))) + probe.expectMsg("hello") + probe.expectMsg("world") + conn ! "hello" + probe.expectMsg("hello") + probe.expectMsg("world") + listen ! Udp.Unbind + expectTerminated(listen) + conn ! UdpConnected.Disconnect + expectTerminated(conn) + } + +} + +class ScalaUdpDocSpec extends UdpDocSpec { + import ScalaUdpDocSpec._ + + override def listenerProps(next: ActorRef) = Props(new Listener(next)) + override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote)) + override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote)) +} + +class JavaUdpDocSpec extends UdpDocSpec { + import UdpDocTest._ + + override def listenerProps(next: ActorRef) = Props(new Listener(next)) + override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote)) + override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote)) +} diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 10ada05f38..94cad87a67 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -654,145 +654,85 @@ keep things well separated. Using UDP --------- -UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams -to any remote address. Connection-based UDP workers are linked to a single remote address. +UDP is a connectionless datagram protocol which offers two different ways of +communication on the JDK level: -The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending -UDP datagrams. + * sockets which are free to send datagrams to any destination and receive + datagrams from any origin -.. code-block:: scala + * sockets which are restricted to communication with one specific remote + socket address - import akka.io.IO - import akka.io.Udp - val connectionLessUdp = IO(Udp) +In the low-level API the distinction is made—confusingly—by whether or not +:meth:`connect` has been called on the socket (even when connect has been +called the protocol is still connectionless). These two forms of UDP usage are +offered using distinct IO extensions described below. -The connection-based UDP manager is accessed through ``UdpConnected``. - -.. code-block:: scala - - import akka.io.UdpConnected - val connectionBasedUdp = IO(UdpConnected) - -UDP servers can be only implemented by the connectionless API, but clients can use both. - -Connectionless UDP -^^^^^^^^^^^^^^^^^^ +Unconnected UDP +^^^^^^^^^^^^^^^ Simple Send ............ -To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the -``Udp`` manager: +.. includecode:: code/docs/io/UdpDocSpec.scala#sender -.. code-block:: scala +The simplest form of UDP usage is to just send datagrams without the need of +getting a reply. To this end a “simple sender” facility is provided as +demonstrated above. The UDP extension is queried using the +:class:`SimpleSender` message, which is answered by a :class:`SimpleSenderReady` +notification. The sender of this message is the newly created sender actor +which from this point onward can be used to send datagrams to arbitrary +destinations; in this example it will just send any UTF-8 encoded +:class:`String` it receives to a predefined remote address. - IO(Udp) ! SimpleSender - // or with socket options: - import akka.io.Udp._ - IO(Udp) ! SimpleSender(List(SO.Broadcast(true))) - -The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: - -.. code-block:: scala - - case SimpleSendReady => - simpleSender = sender - -After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple -message send: - -.. code-block:: scala - - simpleSender ! Send(data, serverAddress) +.. note:: + The simple sender will not shut itself down because it cannot know when you + are done with it. You will need to send it a :class:`PoisonPill` when you + want to close the ephemeral port the sender is bound to. Bind (and Send) ............... -To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP -manager +.. includecode:: code/docs/io/UdpDocSpec.scala#listener -.. code-block:: scala +If you want to implement a UDP server which listens on a socket for incoming +datagrams then you need to use the :class:`Bind` command as shown above. The +local address specified may have a zero port in which case the operating system +will automatically choose a free port and assign it to the new socket. Which +port was actually bound can be found out by inspecting the :class:`Bound` +message. - IO(Udp) ! Bind(handler, localAddress) +The sender of the :class:`Bound` message is the actor which manages the new +socket. Sending datagrams is achieved by using the :class:`Send` message type +and the socket can be closed by sending a :class:`Unbind` command, in which +case the socket actor will reply with a :class:`Unbound` notification. -After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of -this message is the worker for the UDP channel bound to the local address. +Received datagrams are sent to the actor designated in the :class:`Bind` +message, whereas the :class:`Bound` message will be sent to the sender of the +:class:`Bind`. -.. code-block:: scala +Connected UDP +^^^^^^^^^^^^^ - case Bound => - udpWorker = sender // Save the worker ref for later use +The service provided by the connection based UDP API is similar to the +bind-and-send service we saw earlier, but the main difference is that a +connection is only able to send to the ``remoteAddress`` it was connected to, +and will receive datagrams only from that address. -The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: +.. includecode:: code/docs/io/UdpDocSpec.scala#connected -.. code-block:: scala - - case Received(dataByteString, remoteAddress) => // Do something with the data - -The ``Received`` message contains the payload of the datagram and the address of the sender. - -It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: - -.. code-block:: scala - - udpWorker ! Send(data, serverAddress) +Consequently the example shown here looks quite similar to the previous one, +the biggest difference is the absence of remote address information in +:class:`Send` and :class:`Received` messages. .. note:: - The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case - the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined - ephemeral port. - -Connection based UDP -^^^^^^^^^^^^^^^^^^^^ - -The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but -the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will -receive datagrams only from that address. - -Connecting is similar to what we have seen in the previous section: - -.. code-block:: scala - - IO(UdpConnected) ! Connect(handler, remoteAddress) - -Or, with more options: - -.. code-block:: scala - - IO(UdpConnected) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true))) - -After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of -this message is the worker for the UDP connection. - -.. code-block:: scala - - case Connected => - udpConnectionActor = sender // Save the worker ref for later use - -The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: - -.. code-block:: scala - - case Received(dataByteString) => // Do something with the data - -The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address -is provided, as a UDP connection only receives messages from the endpoint it has been connected to. - -UDP datagrams can be sent by sending a ``Send`` message to the worker actor. - -.. code-block:: scala - - udpConnectionActor ! Send(data) - -Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address -will always be the endpoint we originally connected to. - -.. note:: - There is a small performance benefit in using connection based UDP API over the connectionless one. - If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security - check, while in the case of connection-based UDP the security check is cached after connect, thus writes do - not suffer an additional performance penalty. + + There is a small performance benefit in using connection based UDP API over + the connectionless one. If there is a SecurityManager enabled on the system, + every connectionless message send has to go through a security check, while + in the case of connection-based UDP the security check is cached after + connect, thus writes do not suffer an additional performance penalty. Architecture in-depth --------------------- diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala index 1f2dc7a344..6aa83b80a6 100644 --- a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -185,7 +185,7 @@ class SslTlsSupportSpec extends AkkaSpec { //#handler class AkkaSslHandler(init: Init[WithinActorContext, String, String]) extends Actor with ActorLogging { - + def receive = { case init.Event(data) ⇒ val input = data.dropRight(1)