add ScalaDocs and correct UDP docs, see 3268

This commit is contained in:
Roland 2013-05-26 18:29:23 +02:00
parent ea5b79e562
commit b6efd467e8
21 changed files with 1331 additions and 368 deletions

View file

@ -24,7 +24,7 @@ class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitS
val simpleSender: ActorRef = {
val commander = TestProbe()
commander.send(IO(Udp), SimpleSender)
commander.expectMsg(SimpleSendReady)
commander.expectMsg(SimpleSenderReady)
commander.sender
}

View file

@ -10,44 +10,29 @@ import akka.routing.RandomRouter
import akka.io.SelectionHandler.WorkerForCommand
import akka.event.Logging
/**
* Entry point to Akkas IO layer.
*
* <b>All contents of the `akka.io` package is marked experimental.</b>
*
* This marker signifies that APIs may still change in response to user feedback
* through-out the 2.2 release cycle. The implementation itself is considered
* stable and ready for production use.
*
* @see <a href="http://doc.akka.io/">the Akka online documentation</a>
*/
object IO {
trait Extension extends akka.actor.Extension {
def manager: ActorRef
}
/**
* Scala API: obtain a reference to the manager actor for the given IO extension,
* for example [[Tcp]] or [[Udp]].
*
* For the Java API please refer to the individual extensions directly.
*/
def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager
// What is this? It's public API so I think it deserves a mention
trait HasFailureMessage {
def failureMessage: Any
}
abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
override def supervisorStrategy = connectionSupervisorStrategy
val selectorPool = context.actorOf(
props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors")
final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry Props]): Receive = {
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
}
}
/**
* Special supervisor strategy for parents of TCP connection and listener actors.
* Stops the child on all errors and logs DeathPactExceptions only at debug level.
*/
private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
decision: SupervisorStrategy.Directive): Unit =
if (cause.isInstanceOf[DeathPactException]) {
try context.system.eventStream.publish {
Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
} catch { case NonFatal(_) }
} else super.logFailure(context, child, cause, decision)
}
}

View file

@ -80,17 +80,67 @@ object Inet {
}
trait SoForwarders {
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
*
* For more information see [[java.net.Socket.setReceiveBufferSize]]
*/
val ReceiveBufferSize = SO.ReceiveBufferSize
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
*
* For more information see [[java.net.Socket.setReuseAddress]]
*/
val ReuseAddress = SO.ReuseAddress
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
*
* For more information see [[java.net.Socket.setSendBufferSize]]
*/
val SendBufferSize = SO.SendBufferSize
/**
* [[akka.io.Tcp.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* socket.
*
* For more information see [[java.net.Socket.setTrafficClass]]
*/
val TrafficClass = SO.TrafficClass
}
trait SoJavaFactories {
import SO._
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
*
* For more information see [[java.net.Socket.setReceiveBufferSize]]
*/
def receiveBufferSize(size: Int) = ReceiveBufferSize(size)
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
*
* For more information see [[java.net.Socket.setReuseAddress]]
*/
def reuseAddress(on: Boolean) = ReuseAddress(on)
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
*
* For more information see [[java.net.Socket.setSendBufferSize]]
*/
def sendBufferSize(size: Int) = SendBufferSize(size)
/**
* [[akka.io.Tcp.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* socket.
*
* For more information see [[java.net.Socket.setTrafficClass]]
*/
def trafficClass(tc: Int) = TrafficClass(tc)
}

View file

@ -15,10 +15,11 @@ import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext
import akka.event.LoggingAdapter
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.io.IO.HasFailureMessage
import akka.util.Helpers.Requiring
import akka.util.SerializedSuspendableExecutionContext
import akka.actor._
import akka.routing.RandomRouter
import akka.event.Logging
abstract class SelectionHandlerSettings(config: Config) {
import config._
@ -60,6 +61,10 @@ private[io] trait ChannelRegistration {
private[io] object SelectionHandler {
trait HasFailureMessage {
def failureMessage: Any
}
case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry Props)
case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) }
@ -69,6 +74,34 @@ private[io] object SelectionHandler {
case object ChannelReadable
case object ChannelWritable
private[io] abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
override def supervisorStrategy = connectionSupervisorStrategy
val selectorPool = context.actorOf(
props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors")
final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry Props]): Receive = {
case cmd: HasFailureMessage if pf.isDefinedAt(cmd) selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
}
}
/**
* Special supervisor strategy for parents of TCP connection and listener actors.
* Stops the child on all errors and logs DeathPactExceptions only at debug level.
*/
private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
decision: SupervisorStrategy.Directive): Unit =
if (cause.isInstanceOf[DeathPactException]) {
try context.system.eventStream.publish {
Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
} catch { case NonFatal(_) }
} else super.logFailure(context, child, cause, decision)
}
private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry {
private[this] val selector = SelectorProvider.provider.openSelector
private[this] val wakeUp = new AtomicBoolean(false)

View file

@ -47,7 +47,10 @@ object SslTlsSupport {
* This pipeline stage implements SSL / TLS support, using an externally
* configured [[SSLEngine]]. It operates on the level of [[Tcp.Event]] and
* [[Tcp.Command]] messages, which means that it will typically be one of
* the lowest stages in a protocol stack.
* the lowest stages in a protocol stack. Since SSLEngine relies on contiguous
* transmission of a data stream you will need to handle backpressure from
* the TCP connection actor, for example by using a [[BackpressureBuffer]]
* underneath the SSL stage.
*
* Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB
* allocated which is used by the SSLEngine.

View file

@ -15,6 +15,20 @@ import akka.util.Helpers.Requiring
import akka.actor._
import java.lang.{ Iterable JIterable }
/**
* TCP Extension for Akkas IO layer.
*
* For a full description of the design and philosophy behind this IO
* implementation please refer to {@see <a href="http://doc.akka.io/">the Akka online documentation</a>}.
*
* In order to open an outbound connection send a [[Tcp.Connect]] message
* to the [[TcpExt#manager]].
*
* In order to start listening for inbound connetions send a [[Tcp.Bind]]
* message to the [[TcpExt#manager]].
*
* The Java API for generating TCP commands is available at [[TcpMessage]].
*/
object Tcp extends ExtensionKey[TcpExt] {
/**
@ -22,7 +36,11 @@ object Tcp extends ExtensionKey[TcpExt] {
*/
override def get(system: ActorSystem): TcpExt = super.get(system)
// shared socket options
/**
* Scala API: this object contains all applicable socket options for TCP.
*
* For the Java API see [[TcpSO]].
*/
object SO extends Inet.SoForwarders {
// general socket options
@ -63,60 +81,180 @@ object Tcp extends ExtensionKey[TcpExt] {
}
trait Message
/**
* The common interface for [[Command]] and [[Event]].
*/
sealed trait Message
/// COMMANDS
/**
* This is the common trait for all commands understood by TCP actors.
*/
trait Command extends Message with IO.HasFailureMessage {
trait Command extends Message with SelectionHandler.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
/**
* The Connect message is sent to the [[TcpManager]], which is obtained via
* [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]]
* The Connect message is sent to the TCP manager actor, which is obtained via
* [[TcpExt#manager]]. Either the manager replies with a [[CommandFailed]]
* or the actor handling the new connection replies with a [[Connected]]
* message.
*
* @param remoteAddress is the address to connect to
* @param localAddress optionally specifies a specific address to bind to
* @param options Please refer to the [[SO]] object for a list of all supported options.
*/
case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command
/**
* The Bind message is send to the TCP manager actor, which is obtained via
* [[TcpExt#manager]] in order to bind to a listening socket. The manager
* replies either with a [[CommandFailed]] or the actor handling the listen
* socket replies with a [[Bound]] message. If the local port is set to 0 in
* the Bind message, then the [[Bound]] message should be inspected to find
* the actual port which was bound to.
*
* @param handler The actor which will receive all incoming connection requests
* in the form of [[Connected]] messages.
*
* @param localAddress The socket address to bind to; use port zero for
* automatic assignment (i.e. an ephemeral port, see [[Bound]])
*
* @param backlog This specifies the number of unaccepted connections the O/S
* kernel will hold for this port before refusing connections.
*
* @param options Please refer to the [[SO]] object for a list of all supported options.
*/
case class Bind(handler: ActorRef,
localAddress: InetSocketAddress,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil) extends Command
/**
* This message must be sent to a TCP connection actor after receiving the
* [[Connected]] message. The connection will not read any data from the
* socket until this message is received, because this message defines the
* actor which will receive all inbound data.
*
* @param handler The actor which will receive all incoming data and which
* will be informed when the connection is closed.
*
* @param keepOpenOnPeerClosed If this is set to true then the connection
* is not automatically closed when the peer closes its half,
* requiring an explicit [[Closed]] from our side when finished.
*
* @param useResumeWriting If this is set to true then the connection actor
* will refuse all further writes after issuing a [[CommandFailed]]
* notification until [[ResumeWriting]] is received. This can
* be used to implement NACK-based write backpressure.
*/
case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) extends Command
/**
* In order to close down a listening socket, send this message to that sockets
* actor (that is the actor which previously had sent the [[Bound]] message). The
* listener socket actor will reply with a [[Unbound]] message.
*/
case object Unbind extends Command
/**
* Common interface for all commands which aim to close down an open connection.
*/
sealed trait CloseCommand extends Command {
/**
* The corresponding event which is sent as an acknowledgment once the
* close operation is finished.
*/
def event: ConnectionClosed
}
/**
* A normal close operation will first flush pending writes and then close the
* socket. The sender of this command and the registered handler for incoming
* data will both be notified once the socket is closed using a [[Closed]]
* message.
*/
case object Close extends CloseCommand {
/**
* The corresponding event which is sent as an acknowledgment once the
* close operation is finished.
*/
override def event = Closed
}
/**
* A confirmed close operation will flush pending writes and half-close the
* connection, waiting for the peer to close the other half. The sender of this
* command and the registered handler for incoming data will both be notified
* once the socket is closed using a [[ConfirmedClosed]] message.
*/
case object ConfirmedClose extends CloseCommand {
/**
* The corresponding event which is sent as an acknowledgment once the
* close operation is finished.
*/
override def event = ConfirmedClosed
}
/**
* An abort operation will not flush pending writes and will issue a TCP ABORT
* command to the O/S kernel which should result in a TCP_RST packet being sent
* to the peer. The sender of this command and the registered handler for
* incoming data will both be notified once the socket is closed using a
* [[Aborted]] message.
*/
case object Abort extends CloseCommand {
/**
* The corresponding event which is sent as an acknowledgment once the
* close operation is finished.
*/
override def event = Aborted
}
/**
* Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[WriteCommand#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
case class NoAck(token: Any) extends Event
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
object NoAck extends NoAck(null)
/**
* Common interface for all write commands, currently [[Write]] and [[WriteFile]].
*/
sealed trait WriteCommand extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def ack: Any
/**
* The acknowledgment token associated with this write command.
*/
def ack: Event
/**
* An acknowledgment is only sent if this write command wants an ack, which is
* equivalent to the [[#ack]] token not being a of type [[NoAck]].
*/
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
/**
* Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object.
* `NoAck` object. The connection actor will reply with a [[CommandFailed]]
* message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
* returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
* token once the write has been successfully enqueued to the O/S kernel.
* <b>Note that this does not in any way guarantee that the data will be
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
case class Write(data: ByteString, ack: Event) extends WriteCommand
object Write {
@ -137,50 +275,147 @@ object Tcp extends ExtensionKey[TcpExt] {
/**
* Write `count` bytes starting at `position` from file at `filePath` to the connection.
* When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The
* count must be > 0.
* The count must be > 0. The connection actor will reply with a [[CommandFailed]]
* message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
* returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
* token once the write has been successfully enqueued to the O/S kernel.
* <b>Note that this does not in any way guarantee that the data will be
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand {
case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
/**
* When `useResumeWriting` is in effect as was indicated in the [[Register]] message
* then this command needs to be sent to the connection actor in order to re-enable
* writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the
* connection actor between the first [[CommandFailed]] and subsequent reception of
* this message will also be rejected with [[CommandFailed]].
*/
case object ResumeWriting extends Command
/**
* Sending this command to the connection actor will disable reading from the TCP
* socket. TCP flow-control will then propagate backpressure to the sender side
* as buffers fill up on either end. To re-enable reading send [[ResumeReading]].
*/
case object SuspendReading extends Command
/**
* This command needs to be sent to the connection actor after a [[SuspendReading]]
* command in order to resume reading from the socket.
*/
case object ResumeReading extends Command
/// EVENTS
/**
* Common interface for all events generated by the TCP layer actors.
*/
trait Event extends Message
/**
* Whenever data are read from a socket they will be transferred within this
* class to the handler actor which was designated in the [[Register]] message.
*/
case class Received(data: ByteString) extends Event
/**
* The connection actor sends this message either to the sender of a [[Connect]]
* command (for outbound) or to the handler for incoming connections designated
* in the [[Bind]] message. The connection is characterized by the `remoteAddress`
* and `localAddress` TCP endpoints.
*/
case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
/**
* Whenever a command cannot be completed, the queried actor will reply with
* this message, wrapping the original command which failed.
*/
case class CommandFailed(cmd: Command) extends Event
/**
* When `useResumeWriting` is in effect as indicated in the [[Register]] message,
* the [[ResumeWriting]] command will be acknowledged by this message type, upon
* which it is safe to send at least one write. This means that all writes preceding
* the first [[CommandFailed]] message have been enqueued to the O/S kernel at this
* point.
*/
sealed trait WritingResumed extends Event
case object WritingResumed extends WritingResumed
/**
* The sender of a [[Bind]] command willin case of successreceive confirmation
* in this form. If the bind address indicated a 0 port number, then the contained
* `localAddress` can be used to find out which port was automatically assigned.
*/
case class Bound(localAddress: InetSocketAddress) extends Event
/**
* The sender of an [[Unbind]] command will receive confirmation through this
* message once the listening socket has been closed.
*/
sealed trait Unbound extends Event
case object Unbound extends Unbound
/**
* This is the common interface for all events which indicate that a connection
* has been closed or half-closed.
*/
sealed trait ConnectionClosed extends Event {
/**
* `true` iff the connection has been closed in response to an [[Abort]] command.
*/
def isAborted: Boolean = false
/**
* `true` iff the connection has been fully closed in response to a
* [[ConfirmedClose]] command.
*/
def isConfirmed: Boolean = false
/**
* `true` iff the connection has been closed by the peer; in case
* `keepOpenOnPeerClosed` is in effect as per the [[Register]] command,
* this connections reading half is now closed.
*/
def isPeerClosed: Boolean = false
/**
* `true` iff the connection has been closed due to an IO error.
*/
def isErrorClosed: Boolean = false
/**
* If `isErrorClosed` returns true, then the error condition can be
* retrieved by this method.
*/
def getErrorCause: String = null
}
/**
* The connection has been closed normally in response to a [[Close]] command.
*/
case object Closed extends ConnectionClosed
/**
* The connection has been aborted in response to an [[Abort]] command.
*/
case object Aborted extends ConnectionClosed {
override def isAborted = true
}
/**
* The connection has been half-closed by us and then half-close by the peer
* in response to a [[ConfirmedClose]] command.
*/
case object ConfirmedClosed extends ConnectionClosed {
override def isConfirmed = true
}
/**
* The peer has closed its writing half of the connection.
*/
case object PeerClosed extends ConnectionClosed {
override def isPeerClosed = true
}
/**
* The connection has been closed due to an IO error.
*/
case class ErrorClosed(cause: String) extends ConnectionClosed {
override def isErrorClosed = true
override def getErrorCause = cause
@ -218,10 +453,14 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
private[this] def getIntBytes(path: String): Int = {
val size = getBytes(path)
require(size < Int.MaxValue, s"$path must be < 2 GiB")
require(size >= 0, s"$path must be non-negative")
size.toInt
}
}
/**
*
*/
val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher),
@ -237,10 +476,36 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher)
}
/**
* Java API for accessing socket options.
*/
object TcpSO extends SoJavaFactories {
import Tcp.SO._
/**
* [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE
*
* For more information see [[java.net.Socket.setKeepAlive]]
*/
def keepAlive(on: Boolean) = KeepAlive(on)
/**
* [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt
* of TCP urgent data) By default, this option is disabled and TCP urgent
* data is silently discarded.
*
* For more information see [[java.net.Socket.setOOBInline]]
*/
def oobInline(on: Boolean) = OOBInline(on)
/**
* [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY
* (disable or enable Nagle's algorithm)
*
* Please note, that TCP_NODELAY is enabled by default.
*
* For more information see [[java.net.Socket.setTcpNoDelay]]
*/
def tcpNoDelay(on: Boolean) = TcpNoDelay(on)
}
@ -248,43 +513,183 @@ object TcpMessage {
import language.implicitConversions
import Tcp._
/**
* The Connect message is sent to the TCP manager actor, which is obtained via
* [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]]
* or the actor handling the new connection replies with a [[Connected]]
* message.
*
* @param remoteAddress is the address to connect to
* @param localAddress optionally specifies a specific address to bind to
* @param options Please refer to [[TcpSO]] for a list of all supported options.
*/
def connect(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, Some(localAddress), options)
/**
* Connect to the given `remoteAddress` without binding to a local address.
*/
def connect(remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options)
/**
* Connect to the given `remoteAddress` without binding to a local address and without
* specifying options.
*/
def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil)
/**
* The Bind message is send to the TCP manager actor, which is obtained via
* [[TcpExt#getManager]] in order to bind to a listening socket. The manager
* replies either with a [[CommandFailed]] or the actor handling the listen
* socket replies with a [[Bound]] message. If the local port is set to 0 in
* the Bind message, then the [[Bound]] message should be inspected to find
* the actual port which was bound to.
*
* @param handler The actor which will receive all incoming connection requests
* in the form of [[Connected]] messages.
*
* @param localAddress The socket address to bind to; use port zero for
* automatic assignment (i.e. an ephemeral port, see [[Bound]])
*
* @param backlog This specifies the number of unaccepted connections the O/S
* kernel will hold for this port before refusing connections.
*
* @param options Please refer to [[TcpSO]] for a list of all supported options.
*/
def bind(handler: ActorRef,
endpoint: InetSocketAddress,
backlog: Int,
options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options)
/**
* Open a listening socket without specifying options.
*/
def bind(handler: ActorRef,
endpoint: InetSocketAddress,
backlog: Int): Command = Bind(handler, endpoint, backlog, Nil)
def register(handler: ActorRef): Command = Register(handler)
/**
* This message must be sent to a TCP connection actor after receiving the
* [[Connected]] message. The connection will not read any data from the
* socket until this message is received, because this message defines the
* actor which will receive all inbound data.
*
* @param handler The actor which will receive all incoming data and which
* will be informed when the connection is closed.
*
* @param keepOpenOnPeerClosed If this is set to true then the connection
* is not automatically closed when the peer closes its half,
* requiring an explicit [[Closed]] from our side when finished.
*
* @param useResumeWriting If this is set to true then the connection actor
* will refuse all further writes after issuing a [[CommandFailed]]
* notification until [[ResumeWriting]] is received. This can
* be used to implement NACK-based write backpressure.
*/
def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, useResumeWriting: Boolean): Command =
Register(handler, keepOpenOnPeerClosed, useResumeWriting)
/**
* The same as `register(handler, false, false)`.
*/
def register(handler: ActorRef): Command = Register(handler)
/**
* In order to close down a listening socket, send this message to that sockets
* actor (that is the actor which previously had sent the [[Bound]] message). The
* listener socket actor will reply with a [[Unbound]] message.
*/
def unbind: Command = Unbind
/**
* A normal close operation will first flush pending writes and then close the
* socket. The sender of this command and the registered handler for incoming
* data will both be notified once the socket is closed using a [[Closed]]
* message.
*/
def close: Command = Close
/**
* A confirmed close operation will flush pending writes and half-close the
* connection, waiting for the peer to close the other half. The sender of this
* command and the registered handler for incoming data will both be notified
* once the socket is closed using a [[ConfirmedClosed]] message.
*/
def confirmedClose: Command = ConfirmedClose
/**
* An abort operation will not flush pending writes and will issue a TCP ABORT
* command to the O/S kernel which should result in a TCP_RST packet being sent
* to the peer. The sender of this command and the registered handler for
* incoming data will both be notified once the socket is closed using a
* [[Aborted]] message.
*/
def abort: Command = Abort
def noAck: NoAck = NoAck
/**
* Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[WriteCommand#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
def noAck(token: AnyRef): NoAck = NoAck(token)
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
def noAck: NoAck = NoAck
def write(data: ByteString): Command = Write(data)
/**
* Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object. The connection actor will reply with a [[CommandFailed]]
* message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
* returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
* token once the write has been successfully enqueued to the O/S kernel.
* <b>Note that this does not in any way guarantee that the data will be
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
def write(data: ByteString, ack: Event): Command = Write(data, ack)
/**
* The same as `write(data, noAck())`.
*/
def write(data: ByteString): Command = Write(data)
def suspendReading: Command = SuspendReading
def resumeReading: Command = ResumeReading
/**
* Write `count` bytes starting at `position` from file at `filePath` to the connection.
* The count must be > 0. The connection actor will reply with a [[CommandFailed]]
* message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
* returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
* token once the write has been successfully enqueued to the O/S kernel.
* <b>Note that this does not in any way guarantee that the data will be
* or have been sent!</b> Unfortunately there is no way to determine whether
* a particular write has been sent by the O/S.
*/
def writeFile(filePath: String, position: Long, count: Long, ack: Event): Command =
WriteFile(filePath, position, count, ack)
/**
* When `useResumeWriting` is in effect as was indicated in the [[Register]] message
* then this command needs to be sent to the connection actor in order to re-enable
* writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the
* connection actor between the first [[CommandFailed]] and subsequent reception of
* this message will also be rejected with [[CommandFailed]].
*/
def resumeWriting: Command = ResumeWriting
/**
* Sending this command to the connection actor will disable reading from the TCP
* socket. TCP flow-control will then propagate backpressure to the sender side
* as buffers fill up on either end. To re-enable reading send [[ResumeReading]].
*/
def suspendReading: Command = SuspendReading
/**
* This command needs to be sent to the connection actor after a [[SuspendReading]]
* command in order to resume reading from the socket.
*/
def resumeReading: Command = ResumeReading
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._
coll.asScala.to
coll.asScala.to[immutable.Traversable]
}
}

View file

@ -11,7 +11,6 @@ import scala.util.control.NonFatal
import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
import akka.io.SelectionHandler._
import akka.io.Tcp._
import akka.io.IO.HasFailureMessage
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
/**
@ -64,7 +63,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
context.stop(self)
}
override def supervisorStrategy = IO.connectionSupervisorStrategy
override def supervisorStrategy = SelectionHandler.connectionSupervisorStrategy
def receive: Receive = {
case registration: ChannelRegistration

View file

@ -6,7 +6,6 @@ package akka.io
import Tcp._
import akka.actor.{ ActorLogging, Props }
import akka.io.IO.SelectorBasedManager
/**
* INTERNAL API
@ -45,7 +44,8 @@ import akka.io.IO.SelectorBasedManager
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
*
*/
private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
private[io] class TcpManager(tcp: TcpExt)
extends SelectionHandler.SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
def receive = workerForCommandHandler {
case c: Connect

View file

@ -12,6 +12,18 @@ import akka.util.Helpers.Requiring
import akka.util.ByteString
import akka.actor._
/**
* UDP Extension for Akkas IO layer.
*
* This extension implements the connectionless UDP protocol without
* calling `connect` on the underlying sockets, i.e. without restricting
* from whom data can be received. For connected UDP mode see [[UdpConnected]].
*
* For a full description of the design and philosophy behind this IO
* implementation please refer to {@see <a href="http://doc.akka.io/">the Akka online documentation</a>}.
*
* The Java API for generating UDP commands is available at [[UdpMessage]].
*/
object Udp extends ExtensionKey[UdpExt] {
/**
@ -19,14 +31,49 @@ object Udp extends ExtensionKey[UdpExt] {
*/
override def get(system: ActorSystem): UdpExt = super.get(system)
trait Command extends IO.HasFailureMessage {
/**
* The common interface for [[Command]] and [[Event]].
*/
sealed trait Message
/**
* The common type of all commands supported by the UDP implementation.
*/
trait Command extends SelectionHandler.HasFailureMessage with Message {
def failureMessage = CommandFailed(this)
}
case class NoAck(token: Any)
/**
* Each [[Send]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[Send#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
case class NoAck(token: Any) extends Event
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
object NoAck extends NoAck(null)
case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {
/**
* This message is understood by the simple sender which can be obtained by
* sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by
* the listener actors which are created in response to [[Bind]]. It will send
* the given payload data as one UDP datagram to the given target address. The
* UDP actor will respond with [[CommandFailed]] if the send could not be
* enqueued to the O/S kernel because the send buffer was full. If the given
* `ack` is not of type [[NoAck]] the UDP actor will reply with the given
* object as soon as the datagram has been successfully enqueued to the O/S
* kernel.
*
* The sending UDP sockets address belongs to the simple sender which does
* not handle inbound datagrams and sends from an ephemeral port; therefore
* sending using this mechanism is not suitable if replies are expected, use
* [[Bind]] in that case.
*/
case class Send(payload: ByteString, target: InetSocketAddress, ack: Event) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
@ -35,32 +82,92 @@ object Udp extends ExtensionKey[UdpExt] {
def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck)
}
/**
* Send this message to the [[UdpExt#manager]] in order to bind to the given
* local port (or an automatically assigned one if the port number is zero).
* The listener actor for the newly bound port will reply with a [[Bound]]
* message, or the manager will reply with a [[CommandFailed]] message.
*/
case class Bind(handler: ActorRef,
localAddress: InetSocketAddress,
options: immutable.Traversable[SocketOption] = Nil) extends Command
/**
* Send this message to the listener actor that previously sent a [[Bound]]
* message in order to close the listening socket. The recipient will reply
* with an [[Unbound]] message.
*/
case object Unbind extends Command
/**
* Retrieve a reference to a simple sender actor of the UDP extension.
* The newly created simple sender will reply with the [[SimpleSenderReady]] notification.
*
* The simple sender is a convenient service for being able to send datagrams
* when the originating address is meaningless, i.e. when no reply is expected.
*
* The simple sender will not stop itself, you will have to send it a [[akka.actor.PoisonPill]]
* when you want to close the socket.
*/
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
object SimpleSender extends SimpleSender(Nil)
case object StopReading extends Command
/**
* Send this message to a listener actor (which sent a [[Bound]] message) to
* have it stop reading datagrams from the network. If the O/S kernels receive
* buffer runs full then subsequent datagrams will be silently discarded.
* Re-enable reading from the socket using the [[ResumeReading]] command.
*/
case object SuspendReading extends Command
/**
* This message must be sent to the listener actor to re-enable reading from
* the socket after a [[SuspendReading]] command.
*/
case object ResumeReading extends Command
trait Event
/**
* The common type of all events emitted by the UDP implementation.
*/
trait Event extends Message
/**
* When a listener actor receives a datagram from its socket it will send
* it to the handler designated in the [[Bind]] message using this message type.
*/
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
/**
* When a command fails it will be replied to with this message type,
* wrapping the failing command object.
*/
case class CommandFailed(cmd: Command) extends Event
/**
* This message is sent by the listener actor in response to a [[Bind]] command.
* If the address to bind to specified a port number of zero, then this message
* can be inspected to find out which port was automatically assigned.
*/
case class Bound(localAddress: InetSocketAddress) extends Event
sealed trait SimpleSendReady extends Event
case object SimpleSendReady extends SimpleSendReady
/**
* The simple sender sends this message type in response to a [[SimpleSender]] query.
*/
sealed trait SimpleSenderReady extends Event
case object SimpleSenderReady extends SimpleSenderReady
/**
* This message is sent by the listener actor in response to an [[Unbind]] command
* after the socket has been closed.
*/
sealed trait Unbound
case object Unbound extends Unbound
case class SendFailed(cause: Throwable) extends Event
/**
* Scala API: This object provides access to all socket options applicable to UDP sockets.
*
* For the Java API see [[UdpSO]].
*/
object SO extends Inet.SoForwarders {
/**
@ -106,7 +213,15 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
name = "IO-UDP-FF")
}
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
/**
* Java API: retrieve the UDP manager actors reference.
*/
def getManager: ActorRef = manager
/**
* INTERNAL API
*/
private[io] val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
/**
@ -118,24 +233,99 @@ object UdpMessage {
import scala.collection.JavaConverters._
import language.implicitConversions
def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target)
def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack)
/**
* Each [[Send]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[Send#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
def noAck(token: AnyRef): NoAck = NoAck(token)
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
def noAck: NoAck = NoAck
def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind =
/**
* This message is understood by the simple sender which can be obtained by
* sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by
* the listener actors which are created in response to [[Bind]]. It will send
* the given payload data as one UDP datagram to the given target address. The
* UDP actor will respond with [[CommandFailed]] if the send could not be
* enqueued to the O/S kernel because the send buffer was full. If the given
* `ack` is not of type [[NoAck]] the UDP actor will reply with the given
* object as soon as the datagram has been successfully enqueued to the O/S
* kernel.
*
* The sending UDP sockets address belongs to the simple sender which does
* not handle inbound datagrams and sends from an ephemeral port; therefore
* sending using this mechanism is not suitable if replies are expected, use
* [[Bind]] in that case.
*/
def send(payload: ByteString, target: InetSocketAddress, ack: Event): Command = Send(payload, target, ack)
/**
* The same as `send(payload, target, noAck())`.
*/
def send(payload: ByteString, target: InetSocketAddress): Command = Send(payload, target)
/**
* Send this message to the [[UdpExt#manager]] in order to bind to the given
* local port (or an automatically assigned one if the port number is zero).
* The listener actor for the newly bound port will reply with a [[Bound]]
* message, or the manager will reply with a [[CommandFailed]] message.
*/
def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Command =
Bind(handler, endpoint, options.asScala.to)
/**
* Bind without specifying options.
*/
def bind(handler: ActorRef, endpoint: InetSocketAddress): Command = Bind(handler, endpoint, Nil)
def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil)
/**
* Send this message to the listener actor that previously sent a [[Bound]]
* message in order to close the listening socket. The recipient will reply
* with an [[Unbound]] message.
*/
def unbind: Command = Unbind
def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to)
def simpleSender: SimpleSender = SimpleSender
/**
* Retrieve a reference to a simple sender actor of the UDP extension.
* The newly created simple sender will reply with the [[SimpleSenderReady]] notification.
*
* The simple sender is a convenient service for being able to send datagrams
* when the originating address is meaningless, i.e. when no reply is expected.
*
* The simple sender will not stop itself, you will have to send it a [[akka.actor.PoisonPill]]
* when you want to close the socket.
*/
def simpleSender(options: JIterable[SocketOption]): Command = SimpleSender(options.asScala.to)
/**
* Retrieve a simple sender without specifying options.
*/
def simpleSender: Command = SimpleSender
def unbind: Unbind.type = Unbind
/**
* Send this message to a listener actor (which sent a [[Bound]] message) to
* have it stop reading datagrams from the network. If the O/S kernels receive
* buffer runs full then subsequent datagrams will be silently discarded.
* Re-enable reading from the socket using the [[ResumeReading]] command.
*/
def suspendReading: Command = SuspendReading
def stopReading: StopReading.type = StopReading
def resumeReading: ResumeReading.type = ResumeReading
/**
* This message must be sent to the listener actor to re-enable reading from
* the socket after a [[SuspendReading]] command.
*/
def resumeReading: Command = ResumeReading
}
object UdpSO extends SoJavaFactories {
import Udp.SO._
/**
* [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option
*
* For more information see [[java.net.DatagramSocket#setBroadcast]]
*/
def broadcast(on: Boolean) = Broadcast(on)
}

View file

@ -11,21 +11,61 @@ import akka.io.Udp.UdpSettings
import akka.util.ByteString
import akka.actor._
/**
* UDP Extension for Akkas IO layer.
*
* This extension implements the connectionless UDP protocol with
* calling `connect` on the underlying sockets, i.e. with restricting
* from whom data can be received. For unconnected UDP mode see [[Udp]].
*
* For a full description of the design and philosophy behind this IO
* implementation please refer to {@see <a href="http://doc.akka.io/">the Akka online documentation</a>}.
*
* The Java API for generating UDP commands is available at [[UdpConnectedMessage]].
*/
object UdpConnected extends ExtensionKey[UdpConnectedExt] {
/**
* Java API: retrieve the UdpConnected extension for the given system.
*/
override def get(system: ActorSystem): UdpConnectedExt = super.get(system)
trait Command extends IO.HasFailureMessage {
/**
* The common interface for [[Command]] and [[Event]].
*/
sealed trait Message
/**
* The common type of all commands supported by the UDP implementation.
*/
trait Command extends SelectionHandler.HasFailureMessage with Message {
def failureMessage = CommandFailed(this)
}
case class NoAck(token: Any)
/**
* Each [[Send]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[Send#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
case class NoAck(token: Any) extends Event
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
object NoAck extends NoAck(null)
/**
* This message is understood by the connection actors to send data to their
* designated destination. The connection actor will respond with
* [[CommandFailed]] if the send could not be enqueued to the O/S kernel
* because the send buffer was full. If the given `ack` is not of type [[NoAck]]
* the connection actor will reply with the given object as soon as the datagram
* has been successfully enqueued to the O/S kernel.
*/
case class Send(payload: ByteString, ack: Any) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
require(ack
!= null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
@ -33,29 +73,70 @@ object UdpConnected extends ExtensionKey[UdpConnectedExt] {
def apply(data: ByteString): Send = Send(data, NoAck)
}
/**
* Send this message to the [[UdpExt#manager]] in order to bind to a local
* port (optionally with the chosen `localAddress`) and create a UDP socket
* which is restricted to sending to and receiving from the given `remoteAddress`.
* All received datagrams will be sent to the designated `handler` actor.
*/
case class Connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command
case object StopReading extends Command
/**
* Send this message to a connection actor (which had previously sent the
* [[Connected]] message) in order to close the socket. The connection actor
* will reply with a [[Disconnected]] message.
*/
case object Disconnect extends Command
/**
* Send this message to a listener actor (which sent a [[Bound]] message) to
* have it stop reading datagrams from the network. If the O/S kernels receive
* buffer runs full then subsequent datagrams will be silently discarded.
* Re-enable reading from the socket using the [[ResumeReading]] command.
*/
case object SuspendReading extends Command
/**
* This message must be sent to the listener actor to re-enable reading from
* the socket after a [[SuspendReading]] command.
*/
case object ResumeReading extends Command
trait Event
/**
* The common type of all events emitted by the UDP implementation.
*/
trait Event extends Message
/**
* When a connection actor receives a datagram from its socket it will send
* it to the handler designated in the [[Bind]] message using this message type.
*/
case class Received(data: ByteString) extends Event
/**
* When a command fails it will be replied to with this message type,
* wrapping the failing command object.
*/
case class CommandFailed(cmd: Command) extends Event
/**
* This message is sent by the connection actor to the actor which sent the
* [[Connect]] message when the UDP socket has been bound to the local and
* remote addresses given.
*/
sealed trait Connected extends Event
case object Connected extends Connected
/**
* This message is sent by the connection actor to the actor which sent the
* [[Disconnect]] message when the UDP socket has been closed.
*/
sealed trait Disconnected extends Event
case object Disconnected extends Disconnected
case object Close extends Command
case class SendFailed(cause: Throwable) extends Event
}
class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
@ -68,6 +149,11 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
name = "IO-UDP-CONN")
}
/**
* Java API: retrieve the UDP manager actors reference.
*/
def getManager: ActorRef = manager
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
@ -79,29 +165,79 @@ object UdpConnectedMessage {
import language.implicitConversions
import UdpConnected._
/**
* Send this message to the [[UdpExt#manager]] in order to bind to a local
* port (optionally with the chosen `localAddress`) and create a UDP socket
* which is restricted to sending to and receiving from the given `remoteAddress`.
* All received datagrams will be sent to the designated `handler` actor.
*/
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options)
/**
* Connect without specifying the `localAddress`.
*/
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options)
/**
* Connect without specifying the `localAddress` or `options`.
*/
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil)
def send(data: ByteString): Command = Send(data)
/**
* This message is understood by the connection actors to send data to their
* designated destination. The connection actor will respond with
* [[CommandFailed]] if the send could not be enqueued to the O/S kernel
* because the send buffer was full. If the given `ack` is not of type [[NoAck]]
* the connection actor will reply with the given object as soon as the datagram
* has been successfully enqueued to the O/S kernel.
*/
def send(data: ByteString, ack: AnyRef): Command = Send(data, ack)
/**
* Send without requesting acknowledgment.
*/
def send(data: ByteString): Command = Send(data)
def close: Command = Close
/**
* Send this message to a connection actor (which had previously sent the
* [[Connected]] message) in order to close the socket. The connection actor
* will reply with a [[Disconnected]] message.
*/
def disconnect: Command = Disconnect
def noAck: NoAck = NoAck
/**
* Each [[Send]] can optionally request a positive acknowledgment to be sent
* to the commanding actor. If such notification is not desired the [[Send#ack]]
* must be set to an instance of this class. The token contained within can be used
* to recognize which write failed when receiving a [[CommandFailed]] message.
*/
def noAck(token: AnyRef): NoAck = NoAck(token)
def stopReading: Command = StopReading
/**
* Default [[NoAck]] instance which is used when no acknowledgment information is
* explicitly provided. Its token is `null`.
*/
def noAck: NoAck = NoAck
/**
* Send this message to a listener actor (which sent a [[Bound]] message) to
* have it stop reading datagrams from the network. If the O/S kernels receive
* buffer runs full then subsequent datagrams will be silently discarded.
* Re-enable reading from the socket using the [[ResumeReading]] command.
*/
def suspendReading: Command = SuspendReading
/**
* This message must be sent to the listener actor to re-enable reading from
* the socket after a [[SuspendReading]] command.
*/
def resumeReading: Command = ResumeReading
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._
coll.asScala.to
coll.asScala.to[immutable.Traversable]
}
}

View file

@ -4,14 +4,13 @@
package akka.io
import akka.actor.Props
import akka.io.IO.SelectorBasedManager
import akka.io.UdpConnected.Connect
/**
* INTERNAL API
*/
private[io] class UdpConnectedManager(udpConn: UdpConnectedExt)
extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
extends SelectionHandler.SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case c: Connect

View file

@ -58,11 +58,11 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
}
def connected(registration: ChannelRegistration): Receive = {
case StopReading registration.disableInterest(OP_READ)
case SuspendReading registration.disableInterest(OP_READ)
case ResumeReading registration.enableInterest(OP_READ)
case ChannelReadable doRead(registration, handler)
case Close
case Disconnect
log.debug("Closing UDP connection to [{}]", remoteAddress)
channel.close()
sender ! Disconnected

View file

@ -60,7 +60,7 @@ private[io] class UdpListener(val udp: UdpExt,
}
def readHandlers(registration: ChannelRegistration): Receive = {
case StopReading registration.disableInterest(OP_READ)
case SuspendReading registration.disableInterest(OP_READ)
case ResumeReading registration.enableInterest(OP_READ)
case ChannelReadable doReceive(registration, bind.handler)

View file

@ -4,7 +4,6 @@
package akka.io
import akka.actor.Props
import akka.io.IO.SelectorBasedManager
import akka.io.Udp._
/**
@ -38,13 +37,13 @@ import akka.io.Udp._
* == Simple send ==
*
* Udp provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor
* a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady
* a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSenderReady
* message that the service is available. UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the
* sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be
* sender of SimpleSenderReady. All the datagrams will contain an ephemeral local port as sender and answers will be
* discarded.
*
*/
private[io] class UdpManager(udp: UdpExt) extends SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) {
private[io] class UdpManager(udp: UdpExt) extends SelectionHandler.SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case b: Bind

View file

@ -33,8 +33,8 @@ private[io] class UdpSender(val udp: UdpExt,
def receive: Receive = {
case registration: ChannelRegistration
context.become(sendHandlers(registration), discardOld = true)
commander ! SimpleSendReady
commander ! SimpleSenderReady
context.become(sendHandlers(registration))
}
override def postStop(): Unit = if (channel.isOpen) {

View file

@ -14,12 +14,12 @@ import akka.io.SelectionHandler._
private[io] trait WithUdpSend {
me: Actor with ActorLogging
var pendingSend: Send = null
var pendingCommander: ActorRef = null
private var pendingSend: Send = null
private var pendingCommander: ActorRef = null
// If send fails first, we allow a second go after selected writable, but no more. This flag signals that
// pending send was already tried once.
var retriedSend = false
def hasWritePending = pendingSend ne null
private var retriedSend = false
private def hasWritePending = pendingSend ne null
def channel: DatagramChannel
def udp: UdpExt
@ -44,7 +44,7 @@ private[io] trait WithUdpSend {
case ChannelWritable if (hasWritePending) doSend(registration)
}
final def doSend(registration: ChannelRegistration): Unit = {
private def doSend(registration: ChannelRegistration): Unit = {
val buffer = udp.bufferPool.acquire()
try {
buffer.clear()

View file

@ -6,86 +6,171 @@ package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
import akka.io.Inet;
import akka.io.Udp;
import akka.io.UdpConnected;
import akka.io.UdpConnectedMessage;
import akka.io.UdpMessage;
import akka.io.UdpSO;
import akka.japi.Procedure;
import akka.util.ByteString;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
//#imports
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
public class UdpDocTest {
static public class Demo extends UntypedActor {
ActorSystem system = context().system();
public void onReceive(Object message) {
//#manager
final ActorRef udp = Udp.get(system).manager();
//#manager
//#sender
public static class SimpleSender extends UntypedActor {
final InetSocketAddress remote;
//#simplesend
udp.tell(UdpMessage.simpleSender(), getSelf());
public SimpleSender(InetSocketAddress remote) {
this.remote = remote;
// ... or with socket options:
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
options.add(UdpSO.broadcast(true));
udp.tell(UdpMessage.simpleSender(), getSelf());
//#simplesend
// request creation of a SimpleSender
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(UdpMessage.simpleSender(), getSelf());
}
ActorRef simpleSender = null;
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
getContext().become(ready(getSender()));
//#sender
getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf());
//#sender
} else unhandled(msg);
}
//#simplesend-finish
if (message instanceof Udp.SimpleSendReady) {
simpleSender = getSender();
}
//#simplesend-finish
private Procedure<Object> ready(final ActorRef send) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof String) {
final String str = (String) msg;
send.tell(UdpMessage.send(ByteString.fromString(str), remote), getSelf());
//#sender
if (str.equals("world")) {
send.tell(PoisonPill.getInstance(), getSelf());
}
//#sender
final ByteString data = ByteString.empty();
//#simplesend-send
simpleSender.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#simplesend-send
final ActorRef handler = getSelf();
//#bind
udp.tell(UdpMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf());
//#bind
ActorRef udpWorker = null;
//#bind-finish
if (message instanceof Udp.Bound) {
udpWorker = getSender();
}
//#bind-finish
//#bind-receive
if (message instanceof Udp.Received) {
final Udp.Received rcvd = (Udp.Received) message;
final ByteString payload = rcvd.data();
final InetSocketAddress sender = rcvd.sender();
}
//#bind-receive
//#bind-send
udpWorker.tell(UdpMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf());
//#bind-send
} else unhandled(msg);
}
};
}
}
//#sender
@Test
public void demonstrateConnect() {
//#listener
public static class Listener extends UntypedActor {
final ActorRef nextActor;
public Listener(ActorRef nextActor) {
this.nextActor = nextActor;
// request creation of a bound listen socket
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(
UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)),
getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
//#listener
nextActor.tell(b.localAddress(), getSender());
//#listener
getContext().become(ready(getSender()));
} else unhandled(msg);
}
private Procedure<Object> ready(final ActorRef socket) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
// echo server example: send back the data
socket.tell(UdpMessage.send(r.data(), r.sender()), getSelf());
// or do some processing and forward it on
final Object processed = // parse data etc., e.g. using PipelineStage
//#listener
r.data().utf8String();
//#listener
nextActor.tell(processed, getSelf());
} else if (msg.equals(UdpMessage.unbind())) {
socket.tell(msg, getSelf());
} else if (msg instanceof Udp.Unbound) {
getContext().stop(getSelf());
} else unhandled(msg);
}
};
}
}
//#listener
//#connected
public static class Connected extends UntypedActor {
final InetSocketAddress remote;
public Connected(InetSocketAddress remote) {
this.remote = remote;
// create a restricted a.k.a. connected socket
final ActorRef mgr = UdpConnected.get(getContext().system()).getManager();
mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof UdpConnected.Connected) {
getContext().become(ready(getSender()));
//#connected
getSender()
.tell(UdpConnectedMessage.send(ByteString.fromString("hello")),
getSelf());
//#connected
} else unhandled(msg);
}
private Procedure<Object> ready(final ActorRef connection) {
return new Procedure<Object>() {
@Override
public void apply(Object msg) throws Exception {
if (msg instanceof UdpConnected.Received) {
final UdpConnected.Received r = (UdpConnected.Received) msg;
// process data, send it on, etc.
// #connected
if (r.data().utf8String().equals("hello")) {
connection.tell(
UdpConnectedMessage.send(ByteString.fromString("world")),
getSelf());
}
// #connected
} else if (msg instanceof String) {
final String str = (String) msg;
connection
.tell(UdpConnectedMessage.send(ByteString.fromString(str)),
getSelf());
} else if (msg.equals(UdpConnectedMessage.disconnect())) {
connection.tell(msg, getSelf());
} else if (msg instanceof UdpConnected.Disconnected) {
getContext().stop(getSelf());
} else unhandled(msg);
}
};
}
}
//#connected
}

View file

@ -652,113 +652,85 @@ and forward any replies to some ``listener`` actor.
Using UDP
---------
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
to any remote address. Connection-based UDP workers are linked to a single remote address.
UDP is a connectionless datagram protocol which offers two different ways of
communication on the JDK level:
The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending
UDP datagrams.
* sockets which are free to send datagrams to any destination and receive
datagrams from any origin
.. includecode:: code/docs/io/UdpDocTest.java#manager
* sockets which are restricted to communication with one specific remote
socket address
The connection-based UDP manager is accessed through ``UdpConnected``.
In the low-level API the distinction is made—confusingly—by whether or not
:meth:`connect` has been called on the socket (even when connect has been
called the protocol is still connectionless). These two forms of UDP usage are
offered using distinct IO extensions described below.
.. includecode:: code/docs/io/UdpConnectedDocTest.java#manager
UDP servers can be only implemented by the connectionless API, but clients can use both.
Connectionless UDP
^^^^^^^^^^^^^^^^^^
The following imports are assumed in the following sections:
.. includecode:: code/docs/io/UdpDocTest.java#imports
Unconnected UDP
^^^^^^^^^^^^^^^
Simple Send
............
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
``Udp`` manager:
.. includecode:: code/docs/io/UdpDocTest.java#sender
.. includecode:: code/docs/io/UdpDocTest.java#simplesend
The simplest form of UDP usage is to just send datagrams without the need of
getting a reply. To this end a “simple sender” facility is provided as
demonstrated above. The UDP extension is queried using the
:meth:`simpleSender` message, which is answered by a :class:`SimpleSenderReady`
notification. The sender of this message is the newly created sender actor
which from this point onward can be used to send datagrams to arbitrary
destinations; in this example it will just send any UTF-8 encoded
:class:`String` it receives to a predefined remote address.
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
.. includecode:: code/docs/io/UdpDocTest.java#simplesend-finish
After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple
message send:
.. includecode:: code/docs/io/UdpDocTest.java#simplesend-send
.. note::
The simple sender will not shut itself down because it cannot know when you
are done with it. You will need to send it a :class:`PoisonPill` when you
want to close the ephemeral port the sender is bound to.
Bind (and Send)
...............
To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP
manager
.. includecode:: code/docs/io/UdpDocTest.java#listener
.. includecode:: code/docs/io/UdpDocTest.java#bind
If you want to implement a UDP server which listens on a socket for incoming
datagrams then you need to use the :meth:`bind` command as shown above. The
local address specified may have a zero port in which case the operating system
will automatically choose a free port and assign it to the new socket. Which
port was actually bound can be found out by inspecting the :class:`Bound`
message.
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
this message is the worker for the UDP channel bound to the local address.
The sender of the :class:`Bound` message is the actor which manages the new
socket. Sending datagrams is achieved by using the :meth:`send` message type
and the socket can be closed by sending a :meth:`unbind` command, in which
case the socket actor will reply with a :class:`Unbound` notification.
.. includecode:: code/docs/io/UdpDocTest.java#bind-finish
Received datagrams are sent to the actor designated in the :meth:`bind`
message, whereas the :class:`Bound` message will be sent to the sender of the
:meth:`bind`.
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
Connected UDP
^^^^^^^^^^^^^
.. includecode:: code/docs/io/UdpDocTest.java#bind-receive
The service provided by the connection based UDP API is similar to the
bind-and-send service we saw earlier, but the main difference is that a
connection is only able to send to the ``remoteAddress`` it was connected to,
and will receive datagrams only from that address.
The ``Received`` message contains the payload of the datagram and the address of the sender.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
.. includecode:: code/docs/io/UdpDocTest.java#bind-send
.. includecode:: code/docs/io/UdpDocTest.java#connected
Consequently the example shown here looks quite similar to the previous one,
the biggest difference is the absence of remote address information in
:meth:`send` and :class:`Received` messages.
.. note::
The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case
the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined
ephemeral port.
Connection based UDP
^^^^^^^^^^^^^^^^^^^^
The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but
the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will
receive datagrams only from that address.
Connecting is similar to what we have seen in the previous section:
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect
Or, with more options:
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connect-with-options
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
this message is the worker for the UDP connection.
.. includecode:: code/docs/io/UdpConnectedDocTest.java#connected
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. includecode:: code/docs/io/UdpConnectedDocTest.java#received
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
is provided, as a UDP connection only receives messages from the endpoint it has been connected to.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker:
.. includecode:: code/docs/io/UdpConnectedDocTest.java#send
Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address
will always be the endpoint we originally connected to.
.. note::
There is a small performance benefit in using connection based UDP API over the connectionless one.
If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security
check, while in the case of connection-based UDP the security check is cached after connect, thus writes does
not suffer an additional performance penalty.
There is a small performance benefit in using connection based UDP API over
the connectionless one. If there is a SecurityManager enabled on the system,
every connectionless message send has to go through a security check, while
in the case of connection-based UDP the security check is cached after
connect, thus writes do not suffer an additional performance penalty.
Architecture in-depth
---------------------

View file

@ -0,0 +1,167 @@
package docs.io
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.io.IO
import akka.io.Udp
import akka.actor.ActorRef
import java.net.InetSocketAddress
import akka.util.ByteString
import akka.testkit.TestProbe
import akka.actor.Props
import scala.concurrent.duration._
import akka.actor.PoisonPill
import akka.io.UdpConnected
object ScalaUdpDocSpec {
//#sender
class SimpleSender(remote: InetSocketAddress) extends Actor {
import context.system
IO(Udp) ! Udp.SimpleSender
def receive = {
case Udp.SimpleSenderReady
context.become(ready(sender))
//#sender
sender ! Udp.Send(ByteString("hello"), remote)
//#sender
}
def ready(send: ActorRef): Receive = {
case msg: String
send ! Udp.Send(ByteString(msg), remote)
//#sender
if (msg == "world") send ! PoisonPill
//#sender
}
}
//#sender
//#listener
class Listener(nextActor: ActorRef) extends Actor {
import context.system
IO(Udp) ! Udp.Bind(self, new InetSocketAddress("localhost", 0))
def receive = {
case Udp.Bound(local)
//#listener
nextActor forward local
//#listener
context.become(ready(sender))
}
def ready(socket: ActorRef): Receive = {
case Udp.Received(data, remote)
val processed = // parse data etc., e.g. using PipelineStage
//#listener
data.utf8String
//#listener
socket ! Udp.Send(data, remote) // example server echoes back
nextActor ! processed
case Udp.Unbind socket ! Udp.Unbind
case Udp.Unbound context.stop(self)
}
}
//#listener
//#connected
class Connected(remote: InetSocketAddress) extends Actor {
import context.system
IO(UdpConnected) ! UdpConnected.Connect(self, remote)
def receive = {
case UdpConnected.Connected
context.become(ready(sender))
//#connected
sender ! UdpConnected.Send(ByteString("hello"))
//#connected
}
def ready(connection: ActorRef): Receive = {
case UdpConnected.Received(data)
// process data, send it on, etc.
//#connected
if (data.utf8String == "hello")
connection ! UdpConnected.Send(ByteString("world"))
//#connected
case msg: String
connection ! UdpConnected.Send(ByteString(msg))
case d @ UdpConnected.Disconnect connection ! d
case UdpConnected.Disconnected context.stop(self)
}
}
//#connected
}
abstract class UdpDocSpec extends AkkaSpec {
def listenerProps(next: ActorRef): Props
def simpleSenderProps(remote: InetSocketAddress): Props
def connectedProps(remote: InetSocketAddress): Props
"demonstrate Udp" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
val send = system.actorOf(simpleSenderProps(local))
probe.expectMsg("hello")
send ! "world"
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
}
"demonstrate Udp suspend reading" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
listener ! Udp.SuspendReading
Thread.sleep(1000) // no way to find out when the above is finished
val send = system.actorOf(simpleSenderProps(local))
probe.expectNoMsg(500.millis)
listener ! Udp.ResumeReading
probe.expectMsg("hello")
send ! "world"
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
}
"demonstrate UdpConnected" in {
val probe = TestProbe()
val listen = watch(system.actorOf(listenerProps(probe.ref)))
val local = probe.expectMsgType[InetSocketAddress]
val listener = probe.lastSender
val conn = watch(system.actorOf(connectedProps(local)))
probe.expectMsg("hello")
probe.expectMsg("world")
conn ! "hello"
probe.expectMsg("hello")
probe.expectMsg("world")
listen ! Udp.Unbind
expectTerminated(listen)
conn ! UdpConnected.Disconnect
expectTerminated(conn)
}
}
class ScalaUdpDocSpec extends UdpDocSpec {
import ScalaUdpDocSpec._
override def listenerProps(next: ActorRef) = Props(new Listener(next))
override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote))
override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote))
}
class JavaUdpDocSpec extends UdpDocSpec {
import UdpDocTest._
override def listenerProps(next: ActorRef) = Props(new Listener(next))
override def simpleSenderProps(remote: InetSocketAddress) = Props(new SimpleSender(remote))
override def connectedProps(remote: InetSocketAddress) = Props(new Connected(remote))
}

View file

@ -654,145 +654,85 @@ keep things well separated.
Using UDP
---------
UDP support comes in two flavors: connectionless and connection-based. With connectionless UDP, workers can send datagrams
to any remote address. Connection-based UDP workers are linked to a single remote address.
UDP is a connectionless datagram protocol which offers two different ways of
communication on the JDK level:
The connectionless UDP manager is accessed through ``Udp``. ``Udp`` refers to the "fire-and-forget" style of sending
UDP datagrams.
* sockets which are free to send datagrams to any destination and receive
datagrams from any origin
.. code-block:: scala
* sockets which are restricted to communication with one specific remote
socket address
import akka.io.IO
import akka.io.Udp
val connectionLessUdp = IO(Udp)
In the low-level API the distinction is made—confusingly—by whether or not
:meth:`connect` has been called on the socket (even when connect has been
called the protocol is still connectionless). These two forms of UDP usage are
offered using distinct IO extensions described below.
The connection-based UDP manager is accessed through ``UdpConnected``.
.. code-block:: scala
import akka.io.UdpConnected
val connectionBasedUdp = IO(UdpConnected)
UDP servers can be only implemented by the connectionless API, but clients can use both.
Connectionless UDP
^^^^^^^^^^^^^^^^^^
Unconnected UDP
^^^^^^^^^^^^^^^
Simple Send
............
To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the
``Udp`` manager:
.. includecode:: code/docs/io/UdpDocSpec.scala#sender
.. code-block:: scala
The simplest form of UDP usage is to just send datagrams without the need of
getting a reply. To this end a “simple sender” facility is provided as
demonstrated above. The UDP extension is queried using the
:class:`SimpleSender` message, which is answered by a :class:`SimpleSenderReady`
notification. The sender of this message is the newly created sender actor
which from this point onward can be used to send datagrams to arbitrary
destinations; in this example it will just send any UTF-8 encoded
:class:`String` it receives to a predefined remote address.
IO(Udp) ! SimpleSender
// or with socket options:
import akka.io.Udp._
IO(Udp) ! SimpleSender(List(SO.Broadcast(true)))
The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message:
.. code-block:: scala
case SimpleSendReady =>
simpleSender = sender
After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple
message send:
.. code-block:: scala
simpleSender ! Send(data, serverAddress)
.. note::
The simple sender will not shut itself down because it cannot know when you
are done with it. You will need to send it a :class:`PoisonPill` when you
want to close the ephemeral port the sender is bound to.
Bind (and Send)
...............
To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP
manager
.. includecode:: code/docs/io/UdpDocSpec.scala#listener
.. code-block:: scala
If you want to implement a UDP server which listens on a socket for incoming
datagrams then you need to use the :class:`Bind` command as shown above. The
local address specified may have a zero port in which case the operating system
will automatically choose a free port and assign it to the new socket. Which
port was actually bound can be found out by inspecting the :class:`Bound`
message.
IO(Udp) ! Bind(handler, localAddress)
The sender of the :class:`Bound` message is the actor which manages the new
socket. Sending datagrams is achieved by using the :class:`Send` message type
and the socket can be closed by sending a :class:`Unbind` command, in which
case the socket actor will reply with a :class:`Unbound` notification.
After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of
this message is the worker for the UDP channel bound to the local address.
Received datagrams are sent to the actor designated in the :class:`Bind`
message, whereas the :class:`Bound` message will be sent to the sender of the
:class:`Bind`.
.. code-block:: scala
Connected UDP
^^^^^^^^^^^^^
case Bound =>
udpWorker = sender // Save the worker ref for later use
The service provided by the connection based UDP API is similar to the
bind-and-send service we saw earlier, but the main difference is that a
connection is only able to send to the ``remoteAddress`` it was connected to,
and will receive datagrams only from that address.
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. includecode:: code/docs/io/UdpDocSpec.scala#connected
.. code-block:: scala
case Received(dataByteString, remoteAddress) => // Do something with the data
The ``Received`` message contains the payload of the datagram and the address of the sender.
It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``:
.. code-block:: scala
udpWorker ! Send(data, serverAddress)
Consequently the example shown here looks quite similar to the previous one,
the biggest difference is the absence of remote address information in
:class:`Send` and :class:`Received` messages.
.. note::
The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case
the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined
ephemeral port.
Connection based UDP
^^^^^^^^^^^^^^^^^^^^
The service provided by the connection based UDP API is similar to the bind-and-send service we saw earlier, but
the main difference is that a connection is only able to send to the ``remoteAddress`` it was connected to, and will
receive datagrams only from that address.
Connecting is similar to what we have seen in the previous section:
.. code-block:: scala
IO(UdpConnected) ! Connect(handler, remoteAddress)
Or, with more options:
.. code-block:: scala
IO(UdpConnected) ! Connect(handler, Some(localAddress), remoteAddress, List(SO.Broadcast(true)))
After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of
this message is the worker for the UDP connection.
.. code-block:: scala
case Connected =>
udpConnectionActor = sender // Save the worker ref for later use
The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address:
.. code-block:: scala
case Received(dataByteString) => // Do something with the data
The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address
is provided, as a UDP connection only receives messages from the endpoint it has been connected to.
UDP datagrams can be sent by sending a ``Send`` message to the worker actor.
.. code-block:: scala
udpConnectionActor ! Send(data)
Again, like the ``Received`` message, the ``Send`` message does not contain a remote address. This is because the address
will always be the endpoint we originally connected to.
.. note::
There is a small performance benefit in using connection based UDP API over the connectionless one.
If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security
check, while in the case of connection-based UDP the security check is cached after connect, thus writes do
not suffer an additional performance penalty.
There is a small performance benefit in using connection based UDP API over
the connectionless one. If there is a SecurityManager enabled on the system,
every connectionless message send has to go through a security check, while
in the case of connection-based UDP the security check is cached after
connect, thus writes do not suffer an additional performance penalty.
Architecture in-depth
---------------------