diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala
index 31e103a82f..4b278b4eeb 100644
--- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala
@@ -24,7 +24,7 @@ class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitS
val simpleSender: ActorRef = {
val commander = TestProbe()
commander.send(IO(Udp), SimpleSender)
- commander.expectMsg(SimpleSendReady)
+ commander.expectMsg(SimpleSenderReady)
commander.sender
}
diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala
index f876b8ba9d..b87c93b9b3 100644
--- a/akka-actor/src/main/scala/akka/io/IO.scala
+++ b/akka-actor/src/main/scala/akka/io/IO.scala
@@ -10,44 +10,29 @@ import akka.routing.RandomRouter
import akka.io.SelectionHandler.WorkerForCommand
import akka.event.Logging
+/**
+ * Entry point to Akka’s IO layer.
+ *
+ * All contents of the `akka.io` package is marked “experimental”.
+ *
+ * This marker signifies that APIs may still change in response to user feedback
+ * through-out the 2.2 release cycle. The implementation itself is considered
+ * stable and ready for production use.
+ *
+ * @see the Akka online documentation
+ */
object IO {
trait Extension extends akka.actor.Extension {
def manager: ActorRef
}
+ /**
+ * Scala API: obtain a reference to the manager actor for the given IO extension,
+ * for example [[Tcp]] or [[Udp]].
+ *
+ * For the Java API please refer to the individual extensions directly.
+ */
def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager
- // What is this? It's public API so I think it deserves a mention
- trait HasFailureMessage {
- def failureMessage: Any
- }
-
- abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
-
- override def supervisorStrategy = connectionSupervisorStrategy
-
- val selectorPool = context.actorOf(
- props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
- name = "selectors")
-
- final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = {
- case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
- }
- }
-
- /**
- * Special supervisor strategy for parents of TCP connection and listener actors.
- * Stops the child on all errors and logs DeathPactExceptions only at debug level.
- */
- private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
- new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
- override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
- decision: SupervisorStrategy.Directive): Unit =
- if (cause.isInstanceOf[DeathPactException]) {
- try context.system.eventStream.publish {
- Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
- } catch { case NonFatal(_) ⇒ }
- } else super.logFailure(context, child, cause, decision)
- }
}
\ No newline at end of file
diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala
index 18c96594d6..e33b7ee6d1 100644
--- a/akka-actor/src/main/scala/akka/io/Inet.scala
+++ b/akka-actor/src/main/scala/akka/io/Inet.scala
@@ -80,17 +80,67 @@ object Inet {
}
trait SoForwarders {
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
+ *
+ * For more information see [[java.net.Socket.setReceiveBufferSize]]
+ */
val ReceiveBufferSize = SO.ReceiveBufferSize
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
+ *
+ * For more information see [[java.net.Socket.setReuseAddress]]
+ */
val ReuseAddress = SO.ReuseAddress
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
+ *
+ * For more information see [[java.net.Socket.setSendBufferSize]]
+ */
val SendBufferSize = SO.SendBufferSize
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the traffic class or
+ * type-of-service octet in the IP header for packets sent from this
+ * socket.
+ *
+ * For more information see [[java.net.Socket.setTrafficClass]]
+ */
val TrafficClass = SO.TrafficClass
}
trait SoJavaFactories {
import SO._
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
+ *
+ * For more information see [[java.net.Socket.setReceiveBufferSize]]
+ */
def receiveBufferSize(size: Int) = ReceiveBufferSize(size)
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
+ *
+ * For more information see [[java.net.Socket.setReuseAddress]]
+ */
def reuseAddress(on: Boolean) = ReuseAddress(on)
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
+ *
+ * For more information see [[java.net.Socket.setSendBufferSize]]
+ */
def sendBufferSize(size: Int) = SendBufferSize(size)
+
+ /**
+ * [[akka.io.Tcp.SocketOption]] to set the traffic class or
+ * type-of-service octet in the IP header for packets sent from this
+ * socket.
+ *
+ * For more information see [[java.net.Socket.setTrafficClass]]
+ */
def trafficClass(tc: Int) = TrafficClass(tc)
}
diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala
index dc522b657d..10852f4372 100644
--- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala
+++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala
@@ -15,10 +15,11 @@ import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext
import akka.event.LoggingAdapter
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
-import akka.io.IO.HasFailureMessage
import akka.util.Helpers.Requiring
import akka.util.SerializedSuspendableExecutionContext
import akka.actor._
+import akka.routing.RandomRouter
+import akka.event.Logging
abstract class SelectionHandlerSettings(config: Config) {
import config._
@@ -60,6 +61,10 @@ private[io] trait ChannelRegistration {
private[io] object SelectionHandler {
+ trait HasFailureMessage {
+ def failureMessage: Any
+ }
+
case class WorkerForCommand(apiCommand: HasFailureMessage, commander: ActorRef, childProps: ChannelRegistry ⇒ Props)
case class Retry(command: WorkerForCommand, retriesLeft: Int) { require(retriesLeft >= 0) }
@@ -69,6 +74,34 @@ private[io] object SelectionHandler {
case object ChannelReadable
case object ChannelWritable
+ private[io] abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
+
+ override def supervisorStrategy = connectionSupervisorStrategy
+
+ val selectorPool = context.actorOf(
+ props = Props(classOf[SelectionHandler], selectorSettings).withRouter(RandomRouter(nrOfSelectors)),
+ name = "selectors")
+
+ final def workerForCommandHandler(pf: PartialFunction[HasFailureMessage, ChannelRegistry ⇒ Props]): Receive = {
+ case cmd: HasFailureMessage if pf.isDefinedAt(cmd) ⇒ selectorPool ! WorkerForCommand(cmd, sender, pf(cmd))
+ }
+ }
+
+ /**
+ * Special supervisor strategy for parents of TCP connection and listener actors.
+ * Stops the child on all errors and logs DeathPactExceptions only at debug level.
+ */
+ private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
+ new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
+ override protected def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
+ decision: SupervisorStrategy.Directive): Unit =
+ if (cause.isInstanceOf[DeathPactException]) {
+ try context.system.eventStream.publish {
+ Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
+ } catch { case NonFatal(_) ⇒ }
+ } else super.logFailure(context, child, cause, decision)
+ }
+
private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry {
private[this] val selector = SelectorProvider.provider.openSelector
private[this] val wakeUp = new AtomicBoolean(false)
diff --git a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala
index 1114c64cec..af7ac2c7cd 100644
--- a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala
+++ b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala
@@ -47,7 +47,10 @@ object SslTlsSupport {
* This pipeline stage implements SSL / TLS support, using an externally
* configured [[SSLEngine]]. It operates on the level of [[Tcp.Event]] and
* [[Tcp.Command]] messages, which means that it will typically be one of
- * the lowest stages in a protocol stack.
+ * the lowest stages in a protocol stack. Since SSLEngine relies on contiguous
+ * transmission of a data stream you will need to handle backpressure from
+ * the TCP connection actor, for example by using a [[BackpressureBuffer]]
+ * underneath the SSL stage.
*
* Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB
* allocated which is used by the SSLEngine.
diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala
index a333037fb9..1f462995f9 100644
--- a/akka-actor/src/main/scala/akka/io/Tcp.scala
+++ b/akka-actor/src/main/scala/akka/io/Tcp.scala
@@ -15,6 +15,20 @@ import akka.util.Helpers.Requiring
import akka.actor._
import java.lang.{ Iterable ⇒ JIterable }
+/**
+ * TCP Extension for Akka’s IO layer.
+ *
+ * For a full description of the design and philosophy behind this IO
+ * implementation please refer to {@see the Akka online documentation}.
+ *
+ * In order to open an outbound connection send a [[Tcp.Connect]] message
+ * to the [[TcpExt#manager]].
+ *
+ * In order to start listening for inbound connetions send a [[Tcp.Bind]]
+ * message to the [[TcpExt#manager]].
+ *
+ * The Java API for generating TCP commands is available at [[TcpMessage]].
+ */
object Tcp extends ExtensionKey[TcpExt] {
/**
@@ -22,7 +36,11 @@ object Tcp extends ExtensionKey[TcpExt] {
*/
override def get(system: ActorSystem): TcpExt = super.get(system)
- // shared socket options
+ /**
+ * Scala API: this object contains all applicable socket options for TCP.
+ *
+ * For the Java API see [[TcpSO]].
+ */
object SO extends Inet.SoForwarders {
// general socket options
@@ -63,60 +81,180 @@ object Tcp extends ExtensionKey[TcpExt] {
}
- trait Message
+ /**
+ * The common interface for [[Command]] and [[Event]].
+ */
+ sealed trait Message
/// COMMANDS
/**
* This is the common trait for all commands understood by TCP actors.
*/
- trait Command extends Message with IO.HasFailureMessage {
+ trait Command extends Message with SelectionHandler.HasFailureMessage {
def failureMessage = CommandFailed(this)
}
/**
- * The Connect message is sent to the [[TcpManager]], which is obtained via
- * [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]]
+ * The Connect message is sent to the TCP manager actor, which is obtained via
+ * [[TcpExt#manager]]. Either the manager replies with a [[CommandFailed]]
* or the actor handling the new connection replies with a [[Connected]]
* message.
+ *
+ * @param remoteAddress is the address to connect to
+ * @param localAddress optionally specifies a specific address to bind to
+ * @param options Please refer to the [[SO]] object for a list of all supported options.
*/
case class Connect(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command
+
+ /**
+ * The Bind message is send to the TCP manager actor, which is obtained via
+ * [[TcpExt#manager]] in order to bind to a listening socket. The manager
+ * replies either with a [[CommandFailed]] or the actor handling the listen
+ * socket replies with a [[Bound]] message. If the local port is set to 0 in
+ * the Bind message, then the [[Bound]] message should be inspected to find
+ * the actual port which was bound to.
+ *
+ * @param handler The actor which will receive all incoming connection requests
+ * in the form of [[Connected]] messages.
+ *
+ * @param localAddress The socket address to bind to; use port zero for
+ * automatic assignment (i.e. an ephemeral port, see [[Bound]])
+ *
+ * @param backlog This specifies the number of unaccepted connections the O/S
+ * kernel will hold for this port before refusing connections.
+ *
+ * @param options Please refer to the [[SO]] object for a list of all supported options.
+ */
case class Bind(handler: ActorRef,
localAddress: InetSocketAddress,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil) extends Command
+ /**
+ * This message must be sent to a TCP connection actor after receiving the
+ * [[Connected]] message. The connection will not read any data from the
+ * socket until this message is received, because this message defines the
+ * actor which will receive all inbound data.
+ *
+ * @param handler The actor which will receive all incoming data and which
+ * will be informed when the connection is closed.
+ *
+ * @param keepOpenOnPeerClosed If this is set to true then the connection
+ * is not automatically closed when the peer closes its half,
+ * requiring an explicit [[Closed]] from our side when finished.
+ *
+ * @param useResumeWriting If this is set to true then the connection actor
+ * will refuse all further writes after issuing a [[CommandFailed]]
+ * notification until [[ResumeWriting]] is received. This can
+ * be used to implement NACK-based write backpressure.
+ */
case class Register(handler: ActorRef, keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) extends Command
+
+ /**
+ * In order to close down a listening socket, send this message to that socket’s
+ * actor (that is the actor which previously had sent the [[Bound]] message). The
+ * listener socket actor will reply with a [[Unbound]] message.
+ */
case object Unbind extends Command
+ /**
+ * Common interface for all commands which aim to close down an open connection.
+ */
sealed trait CloseCommand extends Command {
+ /**
+ * The corresponding event which is sent as an acknowledgment once the
+ * close operation is finished.
+ */
def event: ConnectionClosed
}
+
+ /**
+ * A normal close operation will first flush pending writes and then close the
+ * socket. The sender of this command and the registered handler for incoming
+ * data will both be notified once the socket is closed using a [[Closed]]
+ * message.
+ */
case object Close extends CloseCommand {
+ /**
+ * The corresponding event which is sent as an acknowledgment once the
+ * close operation is finished.
+ */
override def event = Closed
}
+
+ /**
+ * A confirmed close operation will flush pending writes and half-close the
+ * connection, waiting for the peer to close the other half. The sender of this
+ * command and the registered handler for incoming data will both be notified
+ * once the socket is closed using a [[ConfirmedClosed]] message.
+ */
case object ConfirmedClose extends CloseCommand {
+ /**
+ * The corresponding event which is sent as an acknowledgment once the
+ * close operation is finished.
+ */
override def event = ConfirmedClosed
}
+
+ /**
+ * An abort operation will not flush pending writes and will issue a TCP ABORT
+ * command to the O/S kernel which should result in a TCP_RST packet being sent
+ * to the peer. The sender of this command and the registered handler for
+ * incoming data will both be notified once the socket is closed using a
+ * [[Aborted]] message.
+ */
case object Abort extends CloseCommand {
+ /**
+ * The corresponding event which is sent as an acknowledgment once the
+ * close operation is finished.
+ */
override def event = Aborted
}
+ /**
+ * Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[WriteCommand#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
case class NoAck(token: Any) extends Event
+
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
object NoAck extends NoAck(null)
+ /**
+ * Common interface for all write commands, currently [[Write]] and [[WriteFile]].
+ */
sealed trait WriteCommand extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
- def ack: Any
+ /**
+ * The acknowledgment token associated with this write command.
+ */
+ def ack: Event
+
+ /**
+ * An acknowledgment is only sent if this write command “wants an ack”, which is
+ * equivalent to the [[#ack]] token not being a of type [[NoAck]].
+ */
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
/**
* Write data to the TCP connection. If no ack is needed use the special
- * `NoAck` object.
+ * `NoAck` object. The connection actor will reply with a [[CommandFailed]]
+ * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
+ * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
+ * token once the write has been successfully enqueued to the O/S kernel.
+ * Note that this does not in any way guarantee that the data will be
+ * or have been sent! Unfortunately there is no way to determine whether
+ * a particular write has been sent by the O/S.
*/
case class Write(data: ByteString, ack: Event) extends WriteCommand
object Write {
@@ -137,50 +275,147 @@ object Tcp extends ExtensionKey[TcpExt] {
/**
* Write `count` bytes starting at `position` from file at `filePath` to the connection.
- * When write is finished acknowledge with `ack`. If no ack is needed use `NoAck`. The
- * count must be > 0.
+ * The count must be > 0. The connection actor will reply with a [[CommandFailed]]
+ * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
+ * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
+ * token once the write has been successfully enqueued to the O/S kernel.
+ * Note that this does not in any way guarantee that the data will be
+ * or have been sent! Unfortunately there is no way to determine whether
+ * a particular write has been sent by the O/S.
*/
- case class WriteFile(filePath: String, position: Long, count: Long, ack: Any) extends WriteCommand {
+ case class WriteFile(filePath: String, position: Long, count: Long, ack: Event) extends WriteCommand {
require(position >= 0, "WriteFile.position must be >= 0")
require(count > 0, "WriteFile.count must be > 0")
}
+ /**
+ * When `useResumeWriting` is in effect as was indicated in the [[Register]] message
+ * then this command needs to be sent to the connection actor in order to re-enable
+ * writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the
+ * connection actor between the first [[CommandFailed]] and subsequent reception of
+ * this message will also be rejected with [[CommandFailed]].
+ */
case object ResumeWriting extends Command
+ /**
+ * Sending this command to the connection actor will disable reading from the TCP
+ * socket. TCP flow-control will then propagate backpressure to the sender side
+ * as buffers fill up on either end. To re-enable reading send [[ResumeReading]].
+ */
case object SuspendReading extends Command
+
+ /**
+ * This command needs to be sent to the connection actor after a [[SuspendReading]]
+ * command in order to resume reading from the socket.
+ */
case object ResumeReading extends Command
/// EVENTS
+ /**
+ * Common interface for all events generated by the TCP layer actors.
+ */
trait Event extends Message
+ /**
+ * Whenever data are read from a socket they will be transferred within this
+ * class to the handler actor which was designated in the [[Register]] message.
+ */
case class Received(data: ByteString) extends Event
+
+ /**
+ * The connection actor sends this message either to the sender of a [[Connect]]
+ * command (for outbound) or to the handler for incoming connections designated
+ * in the [[Bind]] message. The connection is characterized by the `remoteAddress`
+ * and `localAddress` TCP endpoints.
+ */
case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
+
+ /**
+ * Whenever a command cannot be completed, the queried actor will reply with
+ * this message, wrapping the original command which failed.
+ */
case class CommandFailed(cmd: Command) extends Event
+ /**
+ * When `useResumeWriting` is in effect as indicated in the [[Register]] message,
+ * the [[ResumeWriting]] command will be acknowledged by this message type, upon
+ * which it is safe to send at least one write. This means that all writes preceding
+ * the first [[CommandFailed]] message have been enqueued to the O/S kernel at this
+ * point.
+ */
sealed trait WritingResumed extends Event
case object WritingResumed extends WritingResumed
+ /**
+ * The sender of a [[Bind]] command will—in case of success—receive confirmation
+ * in this form. If the bind address indicated a 0 port number, then the contained
+ * `localAddress` can be used to find out which port was automatically assigned.
+ */
case class Bound(localAddress: InetSocketAddress) extends Event
+
+ /**
+ * The sender of an [[Unbind]] command will receive confirmation through this
+ * message once the listening socket has been closed.
+ */
sealed trait Unbound extends Event
case object Unbound extends Unbound
+ /**
+ * This is the common interface for all events which indicate that a connection
+ * has been closed or half-closed.
+ */
sealed trait ConnectionClosed extends Event {
+ /**
+ * `true` iff the connection has been closed in response to an [[Abort]] command.
+ */
def isAborted: Boolean = false
+ /**
+ * `true` iff the connection has been fully closed in response to a
+ * [[ConfirmedClose]] command.
+ */
def isConfirmed: Boolean = false
+ /**
+ * `true` iff the connection has been closed by the peer; in case
+ * `keepOpenOnPeerClosed` is in effect as per the [[Register]] command,
+ * this connection’s reading half is now closed.
+ */
def isPeerClosed: Boolean = false
+ /**
+ * `true` iff the connection has been closed due to an IO error.
+ */
def isErrorClosed: Boolean = false
+ /**
+ * If `isErrorClosed` returns true, then the error condition can be
+ * retrieved by this method.
+ */
def getErrorCause: String = null
}
+ /**
+ * The connection has been closed normally in response to a [[Close]] command.
+ */
case object Closed extends ConnectionClosed
+ /**
+ * The connection has been aborted in response to an [[Abort]] command.
+ */
case object Aborted extends ConnectionClosed {
override def isAborted = true
}
+ /**
+ * The connection has been half-closed by us and then half-close by the peer
+ * in response to a [[ConfirmedClose]] command.
+ */
case object ConfirmedClosed extends ConnectionClosed {
override def isConfirmed = true
}
+ /**
+ * The peer has closed its writing half of the connection.
+ */
case object PeerClosed extends ConnectionClosed {
override def isPeerClosed = true
}
+ /**
+ * The connection has been closed due to an IO error.
+ */
case class ErrorClosed(cause: String) extends ConnectionClosed {
override def isErrorClosed = true
override def getErrorCause = cause
@@ -218,10 +453,14 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
private[this] def getIntBytes(path: String): Int = {
val size = getBytes(path)
require(size < Int.MaxValue, s"$path must be < 2 GiB")
+ require(size >= 0, s"$path must be non-negative")
size.toInt
}
}
+ /**
+ *
+ */
val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher),
@@ -237,10 +476,36 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
val fileIoDispatcher = system.dispatchers.lookup(Settings.FileIODispatcher)
}
+/**
+ * Java API for accessing socket options.
+ */
object TcpSO extends SoJavaFactories {
import Tcp.SO._
+
+ /**
+ * [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE
+ *
+ * For more information see [[java.net.Socket.setKeepAlive]]
+ */
def keepAlive(on: Boolean) = KeepAlive(on)
+
+ /**
+ * [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt
+ * of TCP urgent data) By default, this option is disabled and TCP urgent
+ * data is silently discarded.
+ *
+ * For more information see [[java.net.Socket.setOOBInline]]
+ */
def oobInline(on: Boolean) = OOBInline(on)
+
+ /**
+ * [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY
+ * (disable or enable Nagle's algorithm)
+ *
+ * Please note, that TCP_NODELAY is enabled by default.
+ *
+ * For more information see [[java.net.Socket.setTcpNoDelay]]
+ */
def tcpNoDelay(on: Boolean) = TcpNoDelay(on)
}
@@ -248,43 +513,183 @@ object TcpMessage {
import language.implicitConversions
import Tcp._
+ /**
+ * The Connect message is sent to the TCP manager actor, which is obtained via
+ * [[TcpExt#getManager]]. Either the manager replies with a [[CommandFailed]]
+ * or the actor handling the new connection replies with a [[Connected]]
+ * message.
+ *
+ * @param remoteAddress is the address to connect to
+ * @param localAddress optionally specifies a specific address to bind to
+ * @param options Please refer to [[TcpSO]] for a list of all supported options.
+ */
def connect(remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, Some(localAddress), options)
+ /**
+ * Connect to the given `remoteAddress` without binding to a local address.
+ */
def connect(remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options)
+ /**
+ * Connect to the given `remoteAddress` without binding to a local address and without
+ * specifying options.
+ */
def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil)
+ /**
+ * The Bind message is send to the TCP manager actor, which is obtained via
+ * [[TcpExt#getManager]] in order to bind to a listening socket. The manager
+ * replies either with a [[CommandFailed]] or the actor handling the listen
+ * socket replies with a [[Bound]] message. If the local port is set to 0 in
+ * the Bind message, then the [[Bound]] message should be inspected to find
+ * the actual port which was bound to.
+ *
+ * @param handler The actor which will receive all incoming connection requests
+ * in the form of [[Connected]] messages.
+ *
+ * @param localAddress The socket address to bind to; use port zero for
+ * automatic assignment (i.e. an ephemeral port, see [[Bound]])
+ *
+ * @param backlog This specifies the number of unaccepted connections the O/S
+ * kernel will hold for this port before refusing connections.
+ *
+ * @param options Please refer to [[TcpSO]] for a list of all supported options.
+ */
def bind(handler: ActorRef,
endpoint: InetSocketAddress,
backlog: Int,
options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options)
+ /**
+ * Open a listening socket without specifying options.
+ */
def bind(handler: ActorRef,
endpoint: InetSocketAddress,
backlog: Int): Command = Bind(handler, endpoint, backlog, Nil)
- def register(handler: ActorRef): Command = Register(handler)
+ /**
+ * This message must be sent to a TCP connection actor after receiving the
+ * [[Connected]] message. The connection will not read any data from the
+ * socket until this message is received, because this message defines the
+ * actor which will receive all inbound data.
+ *
+ * @param handler The actor which will receive all incoming data and which
+ * will be informed when the connection is closed.
+ *
+ * @param keepOpenOnPeerClosed If this is set to true then the connection
+ * is not automatically closed when the peer closes its half,
+ * requiring an explicit [[Closed]] from our side when finished.
+ *
+ * @param useResumeWriting If this is set to true then the connection actor
+ * will refuse all further writes after issuing a [[CommandFailed]]
+ * notification until [[ResumeWriting]] is received. This can
+ * be used to implement NACK-based write backpressure.
+ */
def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, useResumeWriting: Boolean): Command =
Register(handler, keepOpenOnPeerClosed, useResumeWriting)
+ /**
+ * The same as `register(handler, false, false)`.
+ */
+ def register(handler: ActorRef): Command = Register(handler)
+
+ /**
+ * In order to close down a listening socket, send this message to that socket’s
+ * actor (that is the actor which previously had sent the [[Bound]] message). The
+ * listener socket actor will reply with a [[Unbound]] message.
+ */
def unbind: Command = Unbind
+ /**
+ * A normal close operation will first flush pending writes and then close the
+ * socket. The sender of this command and the registered handler for incoming
+ * data will both be notified once the socket is closed using a [[Closed]]
+ * message.
+ */
def close: Command = Close
+
+ /**
+ * A confirmed close operation will flush pending writes and half-close the
+ * connection, waiting for the peer to close the other half. The sender of this
+ * command and the registered handler for incoming data will both be notified
+ * once the socket is closed using a [[ConfirmedClosed]] message.
+ */
def confirmedClose: Command = ConfirmedClose
+
+ /**
+ * An abort operation will not flush pending writes and will issue a TCP ABORT
+ * command to the O/S kernel which should result in a TCP_RST packet being sent
+ * to the peer. The sender of this command and the registered handler for
+ * incoming data will both be notified once the socket is closed using a
+ * [[Aborted]] message.
+ */
def abort: Command = Abort
- def noAck: NoAck = NoAck
+ /**
+ * Each [[WriteCommand]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[WriteCommand#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
def noAck(token: AnyRef): NoAck = NoAck(token)
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
+ def noAck: NoAck = NoAck
- def write(data: ByteString): Command = Write(data)
+ /**
+ * Write data to the TCP connection. If no ack is needed use the special
+ * `NoAck` object. The connection actor will reply with a [[CommandFailed]]
+ * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
+ * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
+ * token once the write has been successfully enqueued to the O/S kernel.
+ * Note that this does not in any way guarantee that the data will be
+ * or have been sent! Unfortunately there is no way to determine whether
+ * a particular write has been sent by the O/S.
+ */
def write(data: ByteString, ack: Event): Command = Write(data, ack)
+ /**
+ * The same as `write(data, noAck())`.
+ */
+ def write(data: ByteString): Command = Write(data)
- def suspendReading: Command = SuspendReading
- def resumeReading: Command = ResumeReading
+ /**
+ * Write `count` bytes starting at `position` from file at `filePath` to the connection.
+ * The count must be > 0. The connection actor will reply with a [[CommandFailed]]
+ * message if the write could not be enqueued. If [[WriteCommand#wantsAck]]
+ * returns true, the connection actor will reply with the supplied [[WriteCommand#ack]]
+ * token once the write has been successfully enqueued to the O/S kernel.
+ * Note that this does not in any way guarantee that the data will be
+ * or have been sent! Unfortunately there is no way to determine whether
+ * a particular write has been sent by the O/S.
+ */
+ def writeFile(filePath: String, position: Long, count: Long, ack: Event): Command =
+ WriteFile(filePath, position, count, ack)
+ /**
+ * When `useResumeWriting` is in effect as was indicated in the [[Register]] message
+ * then this command needs to be sent to the connection actor in order to re-enable
+ * writing after a [[CommandFailed]] event. All [[WriteCommand]] processed by the
+ * connection actor between the first [[CommandFailed]] and subsequent reception of
+ * this message will also be rejected with [[CommandFailed]].
+ */
def resumeWriting: Command = ResumeWriting
+ /**
+ * Sending this command to the connection actor will disable reading from the TCP
+ * socket. TCP flow-control will then propagate backpressure to the sender side
+ * as buffers fill up on either end. To re-enable reading send [[ResumeReading]].
+ */
+ def suspendReading: Command = SuspendReading
+
+ /**
+ * This command needs to be sent to the connection actor after a [[SuspendReading]]
+ * command in order to resume reading from the socket.
+ */
+ def resumeReading: Command = ResumeReading
+
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._
- coll.asScala.to
+ coll.asScala.to[immutable.Traversable]
}
}
diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala
index 946c63c07e..6bd93baa60 100644
--- a/akka-actor/src/main/scala/akka/io/TcpListener.scala
+++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala
@@ -11,7 +11,6 @@ import scala.util.control.NonFatal
import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
import akka.io.SelectionHandler._
import akka.io.Tcp._
-import akka.io.IO.HasFailureMessage
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
/**
@@ -64,7 +63,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
context.stop(self)
}
- override def supervisorStrategy = IO.connectionSupervisorStrategy
+ override def supervisorStrategy = SelectionHandler.connectionSupervisorStrategy
def receive: Receive = {
case registration: ChannelRegistration ⇒
diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala
index d4dc1d25a3..237011c8f3 100644
--- a/akka-actor/src/main/scala/akka/io/TcpManager.scala
+++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala
@@ -6,7 +6,6 @@ package akka.io
import Tcp._
import akka.actor.{ ActorLogging, Props }
-import akka.io.IO.SelectorBasedManager
/**
* INTERNAL API
@@ -45,7 +44,8 @@ import akka.io.IO.SelectorBasedManager
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
*
*/
-private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
+private[io] class TcpManager(tcp: TcpExt)
+ extends SelectionHandler.SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
def receive = workerForCommandHandler {
case c: Connect ⇒
diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala
index 04779c47b7..4126f603ec 100644
--- a/akka-actor/src/main/scala/akka/io/Udp.scala
+++ b/akka-actor/src/main/scala/akka/io/Udp.scala
@@ -12,6 +12,18 @@ import akka.util.Helpers.Requiring
import akka.util.ByteString
import akka.actor._
+/**
+ * UDP Extension for Akka’s IO layer.
+ *
+ * This extension implements the connectionless UDP protocol without
+ * calling `connect` on the underlying sockets, i.e. without restricting
+ * from whom data can be received. For “connected” UDP mode see [[UdpConnected]].
+ *
+ * For a full description of the design and philosophy behind this IO
+ * implementation please refer to {@see the Akka online documentation}.
+ *
+ * The Java API for generating UDP commands is available at [[UdpMessage]].
+ */
object Udp extends ExtensionKey[UdpExt] {
/**
@@ -19,14 +31,49 @@ object Udp extends ExtensionKey[UdpExt] {
*/
override def get(system: ActorSystem): UdpExt = super.get(system)
- trait Command extends IO.HasFailureMessage {
+ /**
+ * The common interface for [[Command]] and [[Event]].
+ */
+ sealed trait Message
+
+ /**
+ * The common type of all commands supported by the UDP implementation.
+ */
+ trait Command extends SelectionHandler.HasFailureMessage with Message {
def failureMessage = CommandFailed(this)
}
- case class NoAck(token: Any)
+ /**
+ * Each [[Send]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[Send#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
+ case class NoAck(token: Any) extends Event
+
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
object NoAck extends NoAck(null)
- case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command {
+ /**
+ * This message is understood by the “simple sender” which can be obtained by
+ * sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by
+ * the listener actors which are created in response to [[Bind]]. It will send
+ * the given payload data as one UDP datagram to the given target address. The
+ * UDP actor will respond with [[CommandFailed]] if the send could not be
+ * enqueued to the O/S kernel because the send buffer was full. If the given
+ * `ack` is not of type [[NoAck]] the UDP actor will reply with the given
+ * object as soon as the datagram has been successfully enqueued to the O/S
+ * kernel.
+ *
+ * The sending UDP socket’s address belongs to the “simple sender” which does
+ * not handle inbound datagrams and sends from an ephemeral port; therefore
+ * sending using this mechanism is not suitable if replies are expected, use
+ * [[Bind]] in that case.
+ */
+ case class Send(payload: ByteString, target: InetSocketAddress, ack: Event) extends Command {
require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
@@ -35,32 +82,92 @@ object Udp extends ExtensionKey[UdpExt] {
def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck)
}
+ /**
+ * Send this message to the [[UdpExt#manager]] in order to bind to the given
+ * local port (or an automatically assigned one if the port number is zero).
+ * The listener actor for the newly bound port will reply with a [[Bound]]
+ * message, or the manager will reply with a [[CommandFailed]] message.
+ */
case class Bind(handler: ActorRef,
localAddress: InetSocketAddress,
options: immutable.Traversable[SocketOption] = Nil) extends Command
+
+ /**
+ * Send this message to the listener actor that previously sent a [[Bound]]
+ * message in order to close the listening socket. The recipient will reply
+ * with an [[Unbound]] message.
+ */
case object Unbind extends Command
+ /**
+ * Retrieve a reference to a “simple sender” actor of the UDP extension.
+ * The newly created “simple sender” will reply with the [[SimpleSenderReady]] notification.
+ *
+ * The “simple sender” is a convenient service for being able to send datagrams
+ * when the originating address is meaningless, i.e. when no reply is expected.
+ *
+ * The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]]
+ * when you want to close the socket.
+ */
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
object SimpleSender extends SimpleSender(Nil)
- case object StopReading extends Command
+ /**
+ * Send this message to a listener actor (which sent a [[Bound]] message) to
+ * have it stop reading datagrams from the network. If the O/S kernel’s receive
+ * buffer runs full then subsequent datagrams will be silently discarded.
+ * Re-enable reading from the socket using the [[ResumeReading]] command.
+ */
+ case object SuspendReading extends Command
+
+ /**
+ * This message must be sent to the listener actor to re-enable reading from
+ * the socket after a [[SuspendReading]] command.
+ */
case object ResumeReading extends Command
- trait Event
+ /**
+ * The common type of all events emitted by the UDP implementation.
+ */
+ trait Event extends Message
+ /**
+ * When a listener actor receives a datagram from its socket it will send
+ * it to the handler designated in the [[Bind]] message using this message type.
+ */
case class Received(data: ByteString, sender: InetSocketAddress) extends Event
+
+ /**
+ * When a command fails it will be replied to with this message type,
+ * wrapping the failing command object.
+ */
case class CommandFailed(cmd: Command) extends Event
+ /**
+ * This message is sent by the listener actor in response to a [[Bind]] command.
+ * If the address to bind to specified a port number of zero, then this message
+ * can be inspected to find out which port was automatically assigned.
+ */
case class Bound(localAddress: InetSocketAddress) extends Event
- sealed trait SimpleSendReady extends Event
- case object SimpleSendReady extends SimpleSendReady
+ /**
+ * The “simple sender” sends this message type in response to a [[SimpleSender]] query.
+ */
+ sealed trait SimpleSenderReady extends Event
+ case object SimpleSenderReady extends SimpleSenderReady
+ /**
+ * This message is sent by the listener actor in response to an [[Unbind]] command
+ * after the socket has been closed.
+ */
sealed trait Unbound
case object Unbound extends Unbound
- case class SendFailed(cause: Throwable) extends Event
-
+ /**
+ * Scala API: This object provides access to all socket options applicable to UDP sockets.
+ *
+ * For the Java API see [[UdpSO]].
+ */
object SO extends Inet.SoForwarders {
/**
@@ -106,7 +213,15 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
name = "IO-UDP-FF")
}
- val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
+ /**
+ * Java API: retrieve the UDP manager actor’s reference.
+ */
+ def getManager: ActorRef = manager
+
+ /**
+ * INTERNAL API
+ */
+ private[io] val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
/**
@@ -118,24 +233,99 @@ object UdpMessage {
import scala.collection.JavaConverters._
import language.implicitConversions
- def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target)
- def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack)
+ /**
+ * Each [[Send]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[Send#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
+ def noAck(token: AnyRef): NoAck = NoAck(token)
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
+ def noAck: NoAck = NoAck
- def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind =
+ /**
+ * This message is understood by the “simple sender” which can be obtained by
+ * sending the [[SimpleSender]] query to the [[UdpExt#manager]] as well as by
+ * the listener actors which are created in response to [[Bind]]. It will send
+ * the given payload data as one UDP datagram to the given target address. The
+ * UDP actor will respond with [[CommandFailed]] if the send could not be
+ * enqueued to the O/S kernel because the send buffer was full. If the given
+ * `ack` is not of type [[NoAck]] the UDP actor will reply with the given
+ * object as soon as the datagram has been successfully enqueued to the O/S
+ * kernel.
+ *
+ * The sending UDP socket’s address belongs to the “simple sender” which does
+ * not handle inbound datagrams and sends from an ephemeral port; therefore
+ * sending using this mechanism is not suitable if replies are expected, use
+ * [[Bind]] in that case.
+ */
+ def send(payload: ByteString, target: InetSocketAddress, ack: Event): Command = Send(payload, target, ack)
+ /**
+ * The same as `send(payload, target, noAck())`.
+ */
+ def send(payload: ByteString, target: InetSocketAddress): Command = Send(payload, target)
+
+ /**
+ * Send this message to the [[UdpExt#manager]] in order to bind to the given
+ * local port (or an automatically assigned one if the port number is zero).
+ * The listener actor for the newly bound port will reply with a [[Bound]]
+ * message, or the manager will reply with a [[CommandFailed]] message.
+ */
+ def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Command =
Bind(handler, endpoint, options.asScala.to)
+ /**
+ * Bind without specifying options.
+ */
+ def bind(handler: ActorRef, endpoint: InetSocketAddress): Command = Bind(handler, endpoint, Nil)
- def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil)
+ /**
+ * Send this message to the listener actor that previously sent a [[Bound]]
+ * message in order to close the listening socket. The recipient will reply
+ * with an [[Unbound]] message.
+ */
+ def unbind: Command = Unbind
- def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to)
- def simpleSender: SimpleSender = SimpleSender
+ /**
+ * Retrieve a reference to a “simple sender” actor of the UDP extension.
+ * The newly created “simple sender” will reply with the [[SimpleSenderReady]] notification.
+ *
+ * The “simple sender” is a convenient service for being able to send datagrams
+ * when the originating address is meaningless, i.e. when no reply is expected.
+ *
+ * The “simple sender” will not stop itself, you will have to send it a [[akka.actor.PoisonPill]]
+ * when you want to close the socket.
+ */
+ def simpleSender(options: JIterable[SocketOption]): Command = SimpleSender(options.asScala.to)
+ /**
+ * Retrieve a simple sender without specifying options.
+ */
+ def simpleSender: Command = SimpleSender
- def unbind: Unbind.type = Unbind
+ /**
+ * Send this message to a listener actor (which sent a [[Bound]] message) to
+ * have it stop reading datagrams from the network. If the O/S kernel’s receive
+ * buffer runs full then subsequent datagrams will be silently discarded.
+ * Re-enable reading from the socket using the [[ResumeReading]] command.
+ */
+ def suspendReading: Command = SuspendReading
- def stopReading: StopReading.type = StopReading
- def resumeReading: ResumeReading.type = ResumeReading
+ /**
+ * This message must be sent to the listener actor to re-enable reading from
+ * the socket after a [[SuspendReading]] command.
+ */
+ def resumeReading: Command = ResumeReading
}
object UdpSO extends SoJavaFactories {
import Udp.SO._
+
+ /**
+ * [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option
+ *
+ * For more information see [[java.net.DatagramSocket#setBroadcast]]
+ */
def broadcast(on: Boolean) = Broadcast(on)
}
diff --git a/akka-actor/src/main/scala/akka/io/UdpConnected.scala b/akka-actor/src/main/scala/akka/io/UdpConnected.scala
index 600317a8b9..3498bce045 100644
--- a/akka-actor/src/main/scala/akka/io/UdpConnected.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpConnected.scala
@@ -11,21 +11,61 @@ import akka.io.Udp.UdpSettings
import akka.util.ByteString
import akka.actor._
+/**
+ * UDP Extension for Akka’s IO layer.
+ *
+ * This extension implements the connectionless UDP protocol with
+ * calling `connect` on the underlying sockets, i.e. with restricting
+ * from whom data can be received. For “unconnected” UDP mode see [[Udp]].
+ *
+ * For a full description of the design and philosophy behind this IO
+ * implementation please refer to {@see the Akka online documentation}.
+ *
+ * The Java API for generating UDP commands is available at [[UdpConnectedMessage]].
+ */
object UdpConnected extends ExtensionKey[UdpConnectedExt] {
/**
* Java API: retrieve the UdpConnected extension for the given system.
*/
override def get(system: ActorSystem): UdpConnectedExt = super.get(system)
- trait Command extends IO.HasFailureMessage {
+ /**
+ * The common interface for [[Command]] and [[Event]].
+ */
+ sealed trait Message
+
+ /**
+ * The common type of all commands supported by the UDP implementation.
+ */
+ trait Command extends SelectionHandler.HasFailureMessage with Message {
def failureMessage = CommandFailed(this)
}
- case class NoAck(token: Any)
+ /**
+ * Each [[Send]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[Send#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
+ case class NoAck(token: Any) extends Event
+
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
object NoAck extends NoAck(null)
+ /**
+ * This message is understood by the connection actors to send data to their
+ * designated destination. The connection actor will respond with
+ * [[CommandFailed]] if the send could not be enqueued to the O/S kernel
+ * because the send buffer was full. If the given `ack` is not of type [[NoAck]]
+ * the connection actor will reply with the given object as soon as the datagram
+ * has been successfully enqueued to the O/S kernel.
+ */
case class Send(payload: ByteString, ack: Any) extends Command {
- require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.")
+ require(ack
+ != null, "ack must be non-null. Use NoAck if you don't want acks.")
def wantsAck: Boolean = !ack.isInstanceOf[NoAck]
}
@@ -33,29 +73,70 @@ object UdpConnected extends ExtensionKey[UdpConnectedExt] {
def apply(data: ByteString): Send = Send(data, NoAck)
}
+ /**
+ * Send this message to the [[UdpExt#manager]] in order to bind to a local
+ * port (optionally with the chosen `localAddress`) and create a UDP socket
+ * which is restricted to sending to and receiving from the given `remoteAddress`.
+ * All received datagrams will be sent to the designated `handler` actor.
+ */
case class Connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil) extends Command
- case object StopReading extends Command
+ /**
+ * Send this message to a connection actor (which had previously sent the
+ * [[Connected]] message) in order to close the socket. The connection actor
+ * will reply with a [[Disconnected]] message.
+ */
+ case object Disconnect extends Command
+
+ /**
+ * Send this message to a listener actor (which sent a [[Bound]] message) to
+ * have it stop reading datagrams from the network. If the O/S kernel’s receive
+ * buffer runs full then subsequent datagrams will be silently discarded.
+ * Re-enable reading from the socket using the [[ResumeReading]] command.
+ */
+ case object SuspendReading extends Command
+
+ /**
+ * This message must be sent to the listener actor to re-enable reading from
+ * the socket after a [[SuspendReading]] command.
+ */
case object ResumeReading extends Command
- trait Event
+ /**
+ * The common type of all events emitted by the UDP implementation.
+ */
+ trait Event extends Message
+ /**
+ * When a connection actor receives a datagram from its socket it will send
+ * it to the handler designated in the [[Bind]] message using this message type.
+ */
case class Received(data: ByteString) extends Event
+
+ /**
+ * When a command fails it will be replied to with this message type,
+ * wrapping the failing command object.
+ */
case class CommandFailed(cmd: Command) extends Event
+ /**
+ * This message is sent by the connection actor to the actor which sent the
+ * [[Connect]] message when the UDP socket has been bound to the local and
+ * remote addresses given.
+ */
sealed trait Connected extends Event
case object Connected extends Connected
+ /**
+ * This message is sent by the connection actor to the actor which sent the
+ * [[Disconnect]] message when the UDP socket has been closed.
+ */
sealed trait Disconnected extends Event
case object Disconnected extends Disconnected
- case object Close extends Command
-
- case class SendFailed(cause: Throwable) extends Event
-
}
class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
@@ -68,6 +149,11 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
name = "IO-UDP-CONN")
}
+ /**
+ * Java API: retrieve the UDP manager actor’s reference.
+ */
+ def getManager: ActorRef = manager
+
val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize)
}
@@ -79,29 +165,79 @@ object UdpConnectedMessage {
import language.implicitConversions
import UdpConnected._
+ /**
+ * Send this message to the [[UdpExt#manager]] in order to bind to a local
+ * port (optionally with the chosen `localAddress`) and create a UDP socket
+ * which is restricted to sending to and receiving from the given `remoteAddress`.
+ * All received datagrams will be sent to the designated `handler` actor.
+ */
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options)
+ /**
+ * Connect without specifying the `localAddress`.
+ */
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress,
options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options)
+ /**
+ * Connect without specifying the `localAddress` or `options`.
+ */
def connect(handler: ActorRef,
remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil)
- def send(data: ByteString): Command = Send(data)
+ /**
+ * This message is understood by the connection actors to send data to their
+ * designated destination. The connection actor will respond with
+ * [[CommandFailed]] if the send could not be enqueued to the O/S kernel
+ * because the send buffer was full. If the given `ack` is not of type [[NoAck]]
+ * the connection actor will reply with the given object as soon as the datagram
+ * has been successfully enqueued to the O/S kernel.
+ */
def send(data: ByteString, ack: AnyRef): Command = Send(data, ack)
+ /**
+ * Send without requesting acknowledgment.
+ */
+ def send(data: ByteString): Command = Send(data)
- def close: Command = Close
+ /**
+ * Send this message to a connection actor (which had previously sent the
+ * [[Connected]] message) in order to close the socket. The connection actor
+ * will reply with a [[Disconnected]] message.
+ */
+ def disconnect: Command = Disconnect
- def noAck: NoAck = NoAck
+ /**
+ * Each [[Send]] can optionally request a positive acknowledgment to be sent
+ * to the commanding actor. If such notification is not desired the [[Send#ack]]
+ * must be set to an instance of this class. The token contained within can be used
+ * to recognize which write failed when receiving a [[CommandFailed]] message.
+ */
def noAck(token: AnyRef): NoAck = NoAck(token)
- def stopReading: Command = StopReading
+ /**
+ * Default [[NoAck]] instance which is used when no acknowledgment information is
+ * explicitly provided. Its “token” is `null`.
+ */
+ def noAck: NoAck = NoAck
+
+ /**
+ * Send this message to a listener actor (which sent a [[Bound]] message) to
+ * have it stop reading datagrams from the network. If the O/S kernel’s receive
+ * buffer runs full then subsequent datagrams will be silently discarded.
+ * Re-enable reading from the socket using the [[ResumeReading]] command.
+ */
+ def suspendReading: Command = SuspendReading
+
+ /**
+ * This message must be sent to the listener actor to re-enable reading from
+ * the socket after a [[SuspendReading]] command.
+ */
def resumeReading: Command = ResumeReading
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._
- coll.asScala.to
+ coll.asScala.to[immutable.Traversable]
}
}
diff --git a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala
index f2328fdca6..4bacc4db44 100644
--- a/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpConnectedManager.scala
@@ -4,14 +4,13 @@
package akka.io
import akka.actor.Props
-import akka.io.IO.SelectorBasedManager
import akka.io.UdpConnected.Connect
/**
* INTERNAL API
*/
private[io] class UdpConnectedManager(udpConn: UdpConnectedExt)
- extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
+ extends SelectionHandler.SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case c: Connect ⇒
diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala
index 59bacc495d..3eb6778b25 100644
--- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala
@@ -58,11 +58,11 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
}
def connected(registration: ChannelRegistration): Receive = {
- case StopReading ⇒ registration.disableInterest(OP_READ)
+ case SuspendReading ⇒ registration.disableInterest(OP_READ)
case ResumeReading ⇒ registration.enableInterest(OP_READ)
case ChannelReadable ⇒ doRead(registration, handler)
- case Close ⇒
+ case Disconnect ⇒
log.debug("Closing UDP connection to [{}]", remoteAddress)
channel.close()
sender ! Disconnected
diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala
index 667958e5b7..b4ab0e2caf 100644
--- a/akka-actor/src/main/scala/akka/io/UdpListener.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala
@@ -60,7 +60,7 @@ private[io] class UdpListener(val udp: UdpExt,
}
def readHandlers(registration: ChannelRegistration): Receive = {
- case StopReading ⇒ registration.disableInterest(OP_READ)
+ case SuspendReading ⇒ registration.disableInterest(OP_READ)
case ResumeReading ⇒ registration.enableInterest(OP_READ)
case ChannelReadable ⇒ doReceive(registration, bind.handler)
diff --git a/akka-actor/src/main/scala/akka/io/UdpManager.scala b/akka-actor/src/main/scala/akka/io/UdpManager.scala
index 092222a35e..b7e91d6ab6 100644
--- a/akka-actor/src/main/scala/akka/io/UdpManager.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpManager.scala
@@ -4,7 +4,6 @@
package akka.io
import akka.actor.Props
-import akka.io.IO.SelectorBasedManager
import akka.io.Udp._
/**
@@ -38,13 +37,13 @@ import akka.io.Udp._
* == Simple send ==
*
* Udp provides a simple method of sending UDP datagrams if no reply is expected. To acquire the Sender actor
- * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSendReady
+ * a SimpleSend message has to be sent to the manager. The sender of the command will be notified by a SimpleSenderReady
* message that the service is available. UDP datagrams can be sent by sending [[akka.io.Udp.Send]] messages to the
- * sender of SimpleSendReady. All the datagrams will contain an ephemeral local port as sender and answers will be
+ * sender of SimpleSenderReady. All the datagrams will contain an ephemeral local port as sender and answers will be
* discarded.
*
*/
-private[io] class UdpManager(udp: UdpExt) extends SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) {
+private[io] class UdpManager(udp: UdpExt) extends SelectionHandler.SelectorBasedManager(udp.settings, udp.settings.NrOfSelectors) {
def receive = workerForCommandHandler {
case b: Bind ⇒
diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala
index 921414e6e9..925c6dbf92 100644
--- a/akka-actor/src/main/scala/akka/io/UdpSender.scala
+++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala
@@ -33,8 +33,8 @@ private[io] class UdpSender(val udp: UdpExt,
def receive: Receive = {
case registration: ChannelRegistration ⇒
- context.become(sendHandlers(registration), discardOld = true)
- commander ! SimpleSendReady
+ commander ! SimpleSenderReady
+ context.become(sendHandlers(registration))
}
override def postStop(): Unit = if (channel.isOpen) {
diff --git a/akka-actor/src/main/scala/akka/io/WithUdpSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala
index aadfedd527..37be5c7a68 100644
--- a/akka-actor/src/main/scala/akka/io/WithUdpSend.scala
+++ b/akka-actor/src/main/scala/akka/io/WithUdpSend.scala
@@ -14,12 +14,12 @@ import akka.io.SelectionHandler._
private[io] trait WithUdpSend {
me: Actor with ActorLogging ⇒
- var pendingSend: Send = null
- var pendingCommander: ActorRef = null
+ private var pendingSend: Send = null
+ private var pendingCommander: ActorRef = null
// If send fails first, we allow a second go after selected writable, but no more. This flag signals that
// pending send was already tried once.
- var retriedSend = false
- def hasWritePending = pendingSend ne null
+ private var retriedSend = false
+ private def hasWritePending = pendingSend ne null
def channel: DatagramChannel
def udp: UdpExt
@@ -44,7 +44,7 @@ private[io] trait WithUdpSend {
case ChannelWritable ⇒ if (hasWritePending) doSend(registration)
}
- final def doSend(registration: ChannelRegistration): Unit = {
+ private def doSend(registration: ChannelRegistration): Unit = {
val buffer = udp.bufferPool.acquire()
try {
buffer.clear()
diff --git a/akka-docs/rst/java/code/docs/io/UdpDocTest.java b/akka-docs/rst/java/code/docs/io/UdpDocTest.java
index 5b61d3b21f..fd03021357 100644
--- a/akka-docs/rst/java/code/docs/io/UdpDocTest.java
+++ b/akka-docs/rst/java/code/docs/io/UdpDocTest.java
@@ -6,86 +6,171 @@ package docs.io;
//#imports
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
import akka.actor.UntypedActor;
-import akka.io.Inet;
import akka.io.Udp;
+import akka.io.UdpConnected;
+import akka.io.UdpConnectedMessage;
import akka.io.UdpMessage;
-import akka.io.UdpSO;
+import akka.japi.Procedure;
import akka.util.ByteString;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
//#imports
-import akka.testkit.AkkaJUnitActorSystemResource;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-
public class UdpDocTest {
- static public class Demo extends UntypedActor {
- ActorSystem system = context().system();
- public void onReceive(Object message) {
- //#manager
- final ActorRef udp = Udp.get(system).manager();
- //#manager
+ //#sender
+ public static class SimpleSender extends UntypedActor {
+ final InetSocketAddress remote;
- //#simplesend
- udp.tell(UdpMessage.simpleSender(), getSelf());
+ public SimpleSender(InetSocketAddress remote) {
+ this.remote = remote;
+
+ // request creation of a SimpleSender
+ final ActorRef mgr = Udp.get(getContext().system()).getManager();
+ mgr.tell(UdpMessage.simpleSender(), getSelf());
+ }
+
+ @Override
+ public void onReceive(Object msg) {
+ if (msg instanceof Udp.SimpleSenderReady) {
+ getContext().become(ready(getSender()));
+ //#sender
+ getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf());
+ //#sender
+ } else unhandled(msg);
+ }
+
+ private Procedure