diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index 91742b8860..dfaffcd00d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -21,7 +21,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil)) + commander.send(IO(UdpConn), UdpConn.Connect(handler, remoteAddress, localAddress, Nil)) commander.expectMsg(UdpConn.Connected) commander.sender } diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 0b9fb4ca0c..18c96594d6 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -86,4 +86,12 @@ object Inet { val TrafficClass = SO.TrafficClass } + trait SoJavaFactories { + import SO._ + def receiveBufferSize(size: Int) = ReceiveBufferSize(size) + def reuseAddress(on: Boolean) = ReuseAddress(on) + def sendBufferSize(size: Int) = SendBufferSize(size) + def trafficClass(tc: Int) = TrafficClass(tc) + } + } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 0d1c0d439e..b041b6d44e 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2012 Typesafe Inc. + * Copyright (C) 2009-2013 Typesafe Inc. */ package akka.io diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 97076477cf..989c1e05f1 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString import akka.actor._ +import java.lang.{ Iterable ⇒ JIterable } object Tcp extends ExtensionKey[TcpExt] { @@ -69,6 +70,7 @@ object Tcp extends ExtensionKey[TcpExt] { endpoint: InetSocketAddress, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil) extends Command + case class Register(handler: ActorRef) extends Command case object Unbind extends Command @@ -77,7 +79,8 @@ object Tcp extends ExtensionKey[TcpExt] { case object ConfirmedClose extends CloseCommand case object Abort extends CloseCommand - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) /** * Write data to the TCP connection. If no ack is needed use the special @@ -86,7 +89,7 @@ object Tcp extends ExtensionKey[TcpExt] { case class Write(data: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Write { val Empty: Write = Write(ByteString.empty, NoAck) @@ -103,15 +106,33 @@ object Tcp extends ExtensionKey[TcpExt] { case class Received(data: ByteString) extends Event case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - case object Bound extends Event - case object Unbound extends Event - sealed trait ConnectionClosed extends Event + sealed trait Bound extends Event + case object Bound extends Bound + sealed trait Unbound extends Event + case object Unbound extends Unbound + + sealed trait ConnectionClosed extends Event { + def isAborted: Boolean = false + def isConfirmed: Boolean = false + def isPeerClosed: Boolean = false + def isErrorClosed: Boolean = false + def getErrorCause: String = null + } case object Closed extends ConnectionClosed - case object Aborted extends ConnectionClosed - case object ConfirmedClosed extends ConnectionClosed - case object PeerClosed extends ConnectionClosed - case class ErrorClosed(cause: String) extends ConnectionClosed + case object Aborted extends ConnectionClosed { + override def isAborted = true + } + case object ConfirmedClosed extends ConnectionClosed { + override def isConfirmed = true + } + case object PeerClosed extends ConnectionClosed { + override def isPeerClosed = true + } + case class ErrorClosed(cause: String) extends ConnectionClosed { + override def isErrorClosed = true + override def getErrorCause = cause + } } class TcpExt(system: ExtendedActorSystem) extends IO.Extension { @@ -158,3 +179,51 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(Settings.DirectBufferSize, Settings.MaxDirectBufferPoolSize) } + +object TcpSO extends SoJavaFactories { + import Tcp.SO._ + def keepAlive(on: Boolean) = KeepAlive(on) + def oobInline(on: Boolean) = OOBInline(on) + def tcpNoDelay(on: Boolean) = TcpNoDelay(on) +} + +object TcpMessage { + import language.implicitConversions + import Tcp._ + + def connect(remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(remoteAddress, Some(localAddress), options) + def connect(remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(remoteAddress, None, options) + def connect(remoteAddress: InetSocketAddress): Command = Connect(remoteAddress, None, Nil) + + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int, + options: JIterable[SocketOption]): Command = Bind(handler, endpoint, backlog, options) + def bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int): Command = Bind(handler, endpoint, backlog, Nil) + + def register(handler: ActorRef): Command = Register(handler) + def unbind: Command = Unbind + + def close: Command = Close + def confirmedClose: Command = ConfirmedClose + def abort: Command = Abort + + def noAck: NoAck = NoAck + def noAck(token: AnyRef): NoAck = NoAck(token) + + def write(data: ByteString): Command = Write(data) + def write(data: ByteString, ack: AnyRef): Command = Write(data, ack) + + def stopReading: Command = StopReading + def resumeReading: Command = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 12e73bdaa1..0af6497668 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -20,6 +20,8 @@ import akka.io.SelectionHandler._ /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. + * + * INTERNAL API */ private[io] abstract class TcpConnection(val channel: SocketChannel, val tcp: TcpExt) extends Actor with ActorLogging { @@ -287,7 +289,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } else this def hasData = buffer.remaining() > 0 || remainingData.size > 0 - def wantsAck = ack != NoAck + def wantsAck = !ack.isInstanceOf[NoAck] } def createWrite(write: Write): PendingWrite = { val buffer = bufferPool.acquire() @@ -298,6 +300,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } } +/** + * INTERNAL API + */ private[io] object TcpConnection { sealed trait ReadResult object NoData extends ReadResult diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index 2f7cf9c5fa..e1bcc0e399 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -13,6 +13,8 @@ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } /** * An actor handling the connection state machine for an incoming, already connected * SocketChannel. + * + * INTERNAL API */ private[io] class TcpIncomingConnection(_channel: SocketChannel, _tcp: TcpExt, diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index d806fe8490..8631b67c42 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -15,6 +15,9 @@ import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.IO.HasFailureMessage +/** + * INTERNAL API + */ private[io] object TcpListener { case class RegisterIncoming(channel: SocketChannel) extends HasFailureMessage { @@ -25,6 +28,9 @@ private[io] object TcpListener { } +/** + * INTERNAL API + */ private[io] class TcpListener(val selectorRouter: ActorRef, val tcp: TcpExt, val bindCommander: ActorRef, diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index aa80e96c10..4dd1d4466f 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -9,6 +9,8 @@ import akka.actor.{ ActorLogging, Props } import akka.io.IO.SelectorBasedManager /** + * INTERNAL API + * * TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections. * * TcpManager is obtainable by calling {{{ IO(Tcp) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]]) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 03d978293e..098ab69b43 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -15,6 +15,8 @@ import scala.collection.immutable /** * An actor handling the connection state machine for an outgoing connection * to be established. + * + * INTERNAL API */ private[io] class TcpOutgoingConnection(_tcp: TcpExt, commander: ActorRef, @@ -53,7 +55,10 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, } -object TcpOutgoingConnection { +/** + * INTERNAL API + */ +private[io] object TcpOutgoingConnection { private def newSocketChannel() = { val channel = SocketChannel.open() channel.configureBlocking(false) diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 840dda666d..bfbcbc587c 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -4,7 +4,7 @@ package akka.io import java.net.DatagramSocket -import akka.io.Inet.SocketOption +import akka.io.Inet.{ SoJavaFactories, SocketOption } import com.typesafe.config.Config import akka.actor.{ Props, ActorSystemImpl } @@ -44,5 +44,9 @@ object Udp { size.toInt } } - +} + +object UdpSO extends SoJavaFactories { + import Udp.SO._ + def broadcast(on: Boolean) = Broadcast(on) } diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConn.scala index aee429a716..6fff5a864f 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConn.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConn.scala @@ -9,6 +9,7 @@ import akka.io.Udp.UdpSettings import akka.util.ByteString import java.net.InetSocketAddress import scala.collection.immutable +import java.lang.{ Iterable ⇒ JIterable } object UdpConn extends ExtensionKey[UdpConnExt] { // Java API @@ -18,19 +19,21 @@ object UdpConn extends ExtensionKey[UdpConnExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString): Send = Send(data, NoAck) } case class Connect(handler: ActorRef, - localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil) extends Command case object StopReading extends Command @@ -40,8 +43,12 @@ object UdpConn extends ExtensionKey[UdpConnExt] { case class Received(data: ByteString) extends Event case class CommandFailed(cmd: Command) extends Event - case object Connected extends Event - case object Disconnected extends Event + + sealed trait Connected extends Event + case object Connected extends Connected + + sealed trait Disconnected extends Event + case object Disconnected extends Disconnected case object Close extends Command @@ -61,4 +68,38 @@ class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { val bufferPool: BufferPool = new DirectByteBufferPool(settings.DirectBufferSize, settings.MaxDirectBufferPoolSize) -} \ No newline at end of file +} + +/** + * Java API + */ +object UdpConnMessage { + import language.implicitConversions + import UdpConn._ + + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + localAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, Some(localAddress), options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress, + options: JIterable[SocketOption]): Command = Connect(handler, remoteAddress, None, options) + def connect(handler: ActorRef, + remoteAddress: InetSocketAddress): Command = Connect(handler, remoteAddress, None, Nil) + + def send(data: ByteString): Command = Send(data) + def send(data: ByteString, ack: AnyRef): Command = Send(data, ack) + + def close: Command = Close + + def noAck: NoAck = NoAck + def noAck(token: AnyRef): NoAck = NoAck(token) + + def stopReading: Command = StopReading + def resumeReading: Command = ResumeReading + + implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { + import scala.collection.JavaConverters._ + coll.asScala.to + } +} diff --git a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala index 3868289c6b..a362a02f38 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnManager.scala @@ -7,7 +7,10 @@ import akka.actor.Props import akka.io.IO.SelectorBasedManager import akka.io.UdpConn.Connect -class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { +/** + * INTERNAL API + */ +private[io] class UdpConnManager(udpConn: UdpConnExt) extends SelectorBasedManager(udpConn.settings, udpConn.settings.NrOfSelectors) { def receive = workerForCommandHandler { case c: Connect ⇒ diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 6d52adfb3b..06d4f6d523 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -13,6 +13,9 @@ import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +/** + * INTERNAL API + */ private[io] class UdpConnection(val udpConn: UdpConnExt, val commander: ActorRef, val connect: Connect) extends Actor with ActorLogging { diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 838f53a88d..fc9b955b74 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -19,11 +19,13 @@ object UdpFF extends ExtensionKey[UdpFFExt] { def failureMessage = CommandFailed(this) } - case object NoAck + case class NoAck(token: Any) + object NoAck extends NoAck(null) + case class Send(payload: ByteString, target: InetSocketAddress, ack: Any) extends Command { require(ack != null, "ack must be non-null. Use NoAck if you don't want acks.") - def wantsAck: Boolean = ack != NoAck + def wantsAck: Boolean = !ack.isInstanceOf[NoAck] } object Send { def apply(data: ByteString, target: InetSocketAddress): Send = Send(data, target, NoAck) @@ -44,14 +46,43 @@ object UdpFF extends ExtensionKey[UdpFFExt] { case class Received(data: ByteString, sender: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - case object Bound extends Event - case object SimpleSendReady extends Event - case object Unbound extends Event + + sealed trait Bound extends Event + case object Bound extends Bound + + sealed trait SimpleSendReady extends Event + case object SimpleSendReady extends SimpleSendReady + + sealed trait Unbound + case object Unbound extends Unbound case class SendFailed(cause: Throwable) extends Event } +object UdpFFMessage { + import UdpFF._ + import java.lang.{ Iterable ⇒ JIterable } + import scala.collection.JavaConverters._ + import language.implicitConversions + + def send(payload: ByteString, target: InetSocketAddress): Send = Send(payload, target) + def send(payload: ByteString, target: InetSocketAddress, ack: Any): Send = Send(payload, target, ack) + + def bind(handler: ActorRef, endpoint: InetSocketAddress, options: JIterable[SocketOption]): Bind = + Bind(handler, endpoint, options.asScala.to) + + def bind(handler: ActorRef, endpoint: InetSocketAddress): Bind = Bind(handler, endpoint, Nil) + + def simpleSender(options: JIterable[SocketOption]): SimpleSender = SimpleSender(options.asScala.to) + def simpleSender: SimpleSender = SimpleSender + + def unbind: Unbind.type = Unbind + + def stopReading: StopReading.type = StopReading + def resumeReading: ResumeReading.type = ResumeReading +} + class UdpFFExt(system: ExtendedActorSystem) extends IO.Extension { val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-fire-and-forget")) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index add5775832..25a5551b4c 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -14,6 +14,9 @@ import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +/** + * INTERNAL API + */ private[io] class UdpFFListener(val udpFF: UdpFFExt, val bindCommander: ActorRef, val bind: Bind) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 16d835ae49..679e3253de 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -8,6 +8,8 @@ import akka.io.IO.SelectorBasedManager import akka.io.UdpFF._ /** + * INTERNAL API + * * UdpFFManager is a facade for simple fire-and-forget style UDP operations * * UdpFFManager is obtainable by calling {{{ IO(UdpFF) }}} (see [[akka.io.IO]] and [[akka.io.UdpFF]]) diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala index 1120efba33..dc327a1039 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala @@ -13,6 +13,8 @@ import scala.util.control.NonFatal /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. + * + * INTERNAL API */ private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) extends Actor with ActorLogging with WithUdpFFSend { diff --git a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala index 99ed9393e2..841eb690b2 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala @@ -8,6 +8,9 @@ import akka.io.UdpFF.{ CommandFailed, Send } import akka.io.SelectionHandler._ import java.nio.channels.DatagramChannel +/** + * INTERNAL API + */ private[io] trait WithUdpFFSend { me: Actor with ActorLogging ⇒ diff --git a/akka-docs/rst/java/code/docs/io/IODocTest.java b/akka-docs/rst/java/code/docs/io/IODocTest.java new file mode 100644 index 0000000000..76e6efb76e --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IODocTest.java @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.Tcp; +import akka.io.TcpExt; +import akka.io.TcpMessage; +import akka.io.TcpSO; +import akka.util.ByteString; +//#imports + +public class IODocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef listener = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef tcp = Tcp.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = new InetSocketAddress("127.0.0.1", + 12345); + tcp.tell(TcpMessage.connect(remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.keepAlive(true)); + tcp.tell(TcpMessage.connect(remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof Tcp.Connected) { + final Tcp.Connected conn = (Tcp.Connected) msg; + connectionActor = getSender(); + connectionActor.tell(TcpMessage.register(listener), getSelf()); + } + //#connected + else + //#received + if (msg instanceof Tcp.Received) { + final Tcp.Received recv = (Tcp.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof Tcp.CommandFailed) { + final Tcp.CommandFailed failed = (Tcp.CommandFailed) msg; + final Tcp.Command command = failed.cmd(); + // react to failed connect, bind, write, etc. + } else if (msg instanceof Tcp.ConnectionClosed) { + final Tcp.ConnectionClosed closed = (Tcp.ConnectionClosed) msg; + if (closed.isAborted()) { + // handle close reasons like this + } + } + //#received + else + if ("bind".equals(msg)) { + final ActorRef handler = getSelf(); + //#bind + final ActorRef tcp = Tcp.get(system).manager(); + final InetSocketAddress localAddr = new InetSocketAddress("127.0.0.1", + 1234); + final List options = new ArrayList(); + options.add(TcpSO.reuseAddress(true)); + tcp.tell(TcpMessage.bind(handler, localAddr, 10, options), getSelf()); + //#bind + } + } + } + + static ActorSystem system; + + // This is currently only a compilation test, nothing is run +} diff --git a/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java new file mode 100644 index 0000000000..6cbcf6ecd2 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/IOUdpFFDocTest.java @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +//#imports +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +import akka.io.Inet; +import akka.io.UdpFF; +import akka.io.UdpFFMessage; +import akka.io.UdpSO; +import akka.util.ByteString; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +//#imports + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class IOUdpFFDocTest { + static public class Demo extends UntypedActor { + public void onReceive(Object message) { + //#manager + final ActorRef udpFF = UdpFF.get(system).manager(); + //#manager + + //#simplesend + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + + // ... or with socket options: + final List options = new ArrayList(); + options.add(UdpSO.broadcast(true)); + udpFF.tell(UdpFFMessage.simpleSender(), getSelf()); + //#simplesend + + ActorRef simpleSender = null; + + //#simplesend-finish + if (message instanceof UdpFF.SimpleSendReady) { + simpleSender = getSender(); + } + //#simplesend-finish + + final ByteString data = ByteString.empty(); + + //#simplesend-send + simpleSender.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#simplesend-send + + final ActorRef handler = getSelf(); + + //#bind + udpFF.tell(UdpFFMessage.bind(handler, new InetSocketAddress("127.0.0.1", 9876)), getSelf()); + //#bind + + ActorRef udpWorker = null; + + //#bind-finish + if (message instanceof UdpFF.Bound) { + udpWorker = getSender(); + } + //#bind-finish + + //#bind-receive + if (message instanceof UdpFF.Received) { + final UdpFF.Received rcvd = (UdpFF.Received) message; + final ByteString payload = rcvd.data(); + final InetSocketAddress sender = rcvd.sender(); + } + //#bind-receive + + //#bind-send + udpWorker.tell(UdpFFMessage.send(data, new InetSocketAddress("127.0.0.1", 7654)), getSelf()); + //#bind-send + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("IODocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java new file mode 100644 index 0000000000..11b0e3601f --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/UdpConnDocTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import akka.actor.ActorSystem; +import akka.actor.UntypedActor; +//#imports +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.io.Inet; +import akka.io.UdpConn; +import akka.io.UdpConnMessage; +import akka.io.UdpSO; +import akka.util.ByteString; +//#imports + +public class UdpConnDocTest { + + static public class Demo extends UntypedActor { + ActorRef connectionActor = null; + ActorRef handler = getSelf(); + + @Override + public void onReceive(Object msg) { + if ("connect".equals(msg)) { + //#manager + final ActorRef udp = UdpConn.get(system).manager(); + //#manager + //#connect + final InetSocketAddress remoteAddr = + new InetSocketAddress("127.0.0.1", 12345); + udp.tell(UdpConnMessage.connect(handler, remoteAddr), getSelf()); + // or with socket options + final InetSocketAddress localAddr = + new InetSocketAddress("127.0.0.1", 1234); + final List options = + new ArrayList(); + options.add(UdpSO.broadcast(true)); + udp.tell(UdpConnMessage.connect(handler, remoteAddr, localAddr, options), getSelf()); + //#connect + } else + //#connected + if (msg instanceof UdpConn.Connected) { + final UdpConn.Connected conn = (UdpConn.Connected) msg; + connectionActor = getSender(); // Save the worker ref for later use + } + //#connected + else + //#received + if (msg instanceof UdpConn.Received) { + final UdpConn.Received recv = (UdpConn.Received) msg; + final ByteString data = recv.data(); + // and do something with the received data ... + } else if (msg instanceof UdpConn.CommandFailed) { + final UdpConn.CommandFailed failed = (UdpConn.CommandFailed) msg; + final UdpConn.Command command = failed.cmd(); + // react to failed connect, etc. + } else if (msg instanceof UdpConn.Disconnected) { + // do something on disconnect + } + //#received + else + if ("send".equals(msg)) { + ByteString data = ByteString.empty(); + //#send + connectionActor.tell(UdpConnMessage.send(data), getSelf()); + //#send + } + } + } + + static ActorSystem system; + + @BeforeClass + static public void setup() { + system = ActorSystem.create("UdpConnDocTest"); + } + + @AfterClass + static public void teardown() { + system.shutdown(); + } + + @Test + public void demonstrateConnect() { + } + +} diff --git a/akka-docs/rst/java/index.rst b/akka-docs/rst/java/index.rst index b0f29c27ef..81404409cb 100644 --- a/akka-docs/rst/java/index.rst +++ b/akka-docs/rst/java/index.rst @@ -20,6 +20,7 @@ Java API stm agents transactors + io fsm extending-akka zeromq diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst new file mode 100644 index 0000000000..dbcd9571cd --- /dev/null +++ b/akka-docs/rst/java/io.rst @@ -0,0 +1,284 @@ +.. _io-java: + +I/O (Java) +========== + +Introduction +------------ + +The ``akka.io`` package has been developed in collaboration between the Akka +and `spray.io`_ teams. Its design incorporates the experiences with the +``spray-io`` module along with improvements that were jointly developed for +more general consumption as an actor-based service. + +This documentation is in progress and some sections may be incomplete. More will be coming. + +Terminology, Concepts +--------------------- +The I/O API is completely actor based, meaning that all operations are implemented as message passing instead of +direct method calls. Every I/O driver (TCP, UDP) has a special actor, called *manager* that serves +as the entry point for the API. The manager is accessible through an extension, for example the following code +looks up the TCP manager and returns its ``ActorRef``: + +.. includecode:: code/docs/io/IODocTest.java#manager + +For various I/O commands the manager instantiates worker actors that will expose themselves to the user of the +API by replying to the command. For example after a ``Connect`` command sent to the TCP manager the manager creates +an actor representing the TCP connection. All operations related to the given TCP connections can be invoked by sending +messages to the connection actor which announces itself by sending a ``Connected`` message. + +DeathWatch and Resource Management +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Worker actors usually need a user-side counterpart actor listening for events (such events could be inbound connections, +incoming bytes or acknowledgements for writes). These worker actors *watch* their listener counterparts, therefore the +resources assigned to them are automatically released when the listener stops. This design makes the API more robust +against resource leaks. + +Thanks to the completely actor based approach of the I/O API the opposite direction works as well: a user actor +responsible for handling a connection might watch the connection actor to be notified if it unexpectedly terminates. + +Write models (Ack, Nack) +^^^^^^^^^^^^^^^^^^^^^^^^ + +Basically all of the I/O devices have a maximum throughput which limits the frequency and size of writes. When an +application tries to push more data then a device can handle, the driver has to buffer all bytes that the device has +not yet been able to write. With this approach it is possible to handle short bursts of intensive writes --- but no buffer is infinite. +Therefore, the driver has to notify the writer (a user-side actor) either that no further writes are possible, or by +explicitly notifying it when the next chunk is possible to be written or buffered. + +Both of these models are available in the TCP and UDP implementations of Akka I/O. Ack based flow control can be enabled +by providing an ack object in the write message (``Write`` in the case of TCP and ``Send`` for UDP) that will be used by +the worker to notify the writer about the success. + +If a write (or any other command) fails, the driver notifies the commander with a special message (``CommandFailed`` in +the case of UDP and TCP). This message also serves as a means to notify the writer of a failed write. Please note, that +in a Nack based flow-control setting the writer has to buffer some of the writes as the failure notification for a +write ``W1`` might arrive after additional write commands ``W2`` ``W3`` has been sent. + +.. warning:: + An acknowledged write does not mean acknowledged delivery or storage. The Ack/Nack + protocol described here is a means of flow control not error handling: receiving an Ack for a write signals that the + I/O driver is ready to accept a new one. + +ByteString +^^^^^^^^^^ + +A primary goal of Akka's IO support is to only communicate between actors with immutable objects. When dealing with network I/O on the jvm ``Array[Byte]`` and ``ByteBuffer`` are commonly used to represent collections of ``Byte``\s, but they are mutable. Scala's collection library also lacks a suitably efficient immutable collection for ``Byte``\s. Being able to safely and efficiently move ``Byte``\s around is very important for this I/O support, so ``ByteString`` was developed. + +``ByteString`` is a `Rope-like `_ data structure that is immutable and efficient. When 2 ``ByteString``\s are concatenated together they are both stored within the resulting ``ByteString`` instead of copying both to a new ``Array``. Operations such as ``drop`` and ``take`` return ``ByteString``\s that still reference the original ``Array``, but just change the offset and length that is visible. Great care has also been taken to make sure that the internal ``Array`` cannot be modified. Whenever a potentially unsafe ``Array`` is used to create a new ``ByteString`` a defensive copy is created. If you require a ``ByteString`` that only blocks a much memory as necessary for it's content, use the ``compact`` method to get a ``CompactByteString`` instance. If the ``ByteString`` represented only a slice of the original array, this will result in copying all bytes in that slice. + +``ByteString`` inherits all methods from ``IndexedSeq``, and it also has some new ones. For more information, look up the ``akka.util.ByteString`` class and it's companion object in the ScalaDoc. + +``ByteString`` also comes with it's own optimized builder and iterator classes ``ByteStringBuilder`` and ``ByteIterator`` which provides special features in addition to the standard builder / iterator methods: + +Compatibility with java.io +.......................... + +A ``ByteStringBuilder`` can be wrapped in a `java.io.OutputStream` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. + +Using TCP +--------- + +The following imports are assumed throughout this section: + +.. includecode:: code/docs/io/IODocTest.java#imports + +As with all of the Akka I/O APIs, everything starts with acquiring a reference to the appropriate manager: + +.. includecode:: code/docs/io/IODocTest.java#manager + +This is an actor that handles the underlying low level I/O resources (Selectors, channels) and instantiates workers for +specific tasks, like listening to incoming connections. + +Connecting +^^^^^^^^^^ + +The first step of connecting to a remote address is sending a ``Connect`` message to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#connect + +After issuing the Connect command the TCP manager spawns a worker actor that will handle commands related to the +connection. This worker actor will reveal itself by replying with a ``Connected`` message to the actor who sent the +``Connect`` command. + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener may receive various event notifications: + +.. includecode:: code/docs/io/IODocTest.java#received + +The last line handles all connection close events in the same way. It is possible to listen for more fine-grained +connection events, see the appropriate section below. + + +Accepting connections +^^^^^^^^^^^^^^^^^^^^^ + +To create a TCP server and listen for inbound connection, a ``Bind`` command has to be sent to the TCP manager: + +.. includecode:: code/docs/io/IODocTest.java#bind + +The actor sending the ``Bind`` message will receive a ``Bound`` message signalling that the server is ready to accept +incoming connections. Accepting connections is very similar to the last two steps of opening outbound connections: when +an incoming connection is established, the actor provided in ``handler`` will receive a ``Connected`` message whose +sender is the connection actor: + +.. includecode:: code/docs/io/IODocTest.java#connected + +When receiving the :class:`Connected` message there is still no listener +associated with the connection. To finish the connection setup a ``Register`` +has to be sent to the connection actor with the listener ``ActorRef`` as a +parameter, which therefore done in the last line above. + +After registration, the listener actor provided in the ``listener`` parameter will be watched by the connection actor. +If the listener stops, the connection is closed, and all resources allocated for the connection released. During the +lifetime the listener will receive various event notifications in the same way as we has seen in the outbound +connection case. + +Closing connections +^^^^^^^^^^^^^^^^^^^ + +A connection can be closed by sending one of the commands ``Close``, ``ConfirmedClose`` or ``Abort`` to the connection +actor. + +``Close`` will close the connection by sending a ``FIN`` message, but without waiting for confirmation from +the remote endpoint. Pending writes will be flushed. If the close is successful, the listener will be notified with +``Closed`` + +``ConfirmedClose`` will close the sending direction of the connection by sending a ``FIN`` message, but receives +will continue until the remote endpoint closes the connection, too. Pending writes will be flushed. If the close is +successful, the listener will be notified with ``ConfirmedClosed`` + +``Abort`` will immediately terminate the connection by sending a ``RST`` message to the remote endpoint. Pending +writes will be not flushed. If the close is successful, the listener will be notified with ``Aborted`` + +``PeerClosed`` will be sent to the listener if the connection has been closed by the remote endpoint. + +``ErrorClosed`` will be sent to the listener whenever an error happened that forced the connection to be closed. + +All close notifications are subclasses of ``ConnectionClosed`` so listeners who do not need fine-grained close events +may handle all close events in the same way. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + +Using UDP +--------- + +UDP support comes in two flavors: connectionless, and connection based: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#manager + +UDP servers can be only implemented by the connectionless API, but clients can use both. + +Connectionless UDP +^^^^^^^^^^^^^^^^^^ + +The following imports are assumed in the following sections: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#imports + +Simple Send +............ + +To simply send a UDP datagram without listening to an answer one needs to send the ``SimpleSender`` command to the +manager: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend + +The manager will create a worker for sending, and the worker will reply with a ``SimpleSendReady`` message: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-finish + +After saving the sender of the ``SimpleSendReady`` message it is possible to send out UDP datagrams with a simple +message send: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#simplesend-send + + +Bind (and Send) +............... + +To listen for UDP datagrams arriving on a given port, the ``Bind`` command has to be sent to the connectionless UDP +manager + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind + +After the bind succeeds, the sender of the ``Bind`` command will be notified with a ``Bound`` message. The sender of +this message is the worker for the UDP channel bound to the local address. + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-finish + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-receive + +The ``Received`` message contains the payload of the datagram and the address of the sender. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. includecode:: code/docs/io/IOUdpFFDocTest.java#bind-send + + +.. note:: + The difference between using a bound UDP worker to send instead of a simple-send worker is that in the former case + the sender field of the UDP datagram will be the bound local address, while in the latter it will be an undetermined + ephemeral port. + +Connection based UDP +^^^^^^^^^^^^^^^^^^^^ + +The service provided by the connection based UDP API is similar to the bind-and-send service we have seen earlier, but +the main difference is that a connection is only able to send to the remoteAddress it was connected to, and will +receive datagrams only from that address. + +Connecting is similar to what we have seen in the previous section: + +.. includecode:: code/docs/io/UdpConnDocTest.java#connect + +After the connect succeeds, the sender of the ``Connect`` command will be notified with a ``Connected`` message. The sender of +this message is the worker for the UDP connection. + +.. includecode:: code/docs/io/UdpConnDocTest.java#connected + +The actor passed in the ``handler`` parameter will receive inbound UDP datagrams sent to the bound address: + +.. includecode:: code/docs/io/UdpConnDocTest.java#received + +The ``Received`` message contains the payload of the datagram but unlike in the connectionless case, no sender address +will be provided, as an UDP connection only receives messages from the endpoint it has been connected to. + +It is also possible to send UDP datagrams using the ``ActorRef`` of the worker saved in ``udpWorker``: + +.. includecode:: code/docs/io/UdpConnDocTest.java#send + +Again, the send does not contain a remote address, as it is always the endpoint we have been connected to. + +.. note:: + There is a small performance benefit in using connection based UDP API over the connectionless one. + If there is a SecurityManager enabled on the system, every connectionless message send has to go through a security + check, while in the case of connection-based UDP the security check is cached after connect, thus writes does + not suffer an additional performance penalty. + +Throttling Reads and Writes +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +*This section is not yet ready. More coming soon* + + +Architecture in-depth +--------------------- + +For further details on the design and internal architecture see :ref:`io-layer`. + +.. _spray.io: http://spray.io diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 2ebb6d2b49..31dfa6e4ee 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -13,6 +13,10 @@ more general consumption as an actor-based service. This documentation is in progress and some sections may be incomplete. More will be coming. +.. toctree:: + + io-old + .. note:: The old I/O implementation has been deprecated and its documentation has been moved: :ref:`io-scala-old` @@ -378,4 +382,4 @@ Architecture in-depth For further details on the design and internal architecture see :ref:`io-layer`. -.. _spray.io: http://spray.io \ No newline at end of file +.. _spray.io: http://spray.io