diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 1c2ab40b5d..f5d20d5b6b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -4,17 +4,19 @@ package akka.io import java.net.InetSocketAddress +import java.nio.channels.DatagramChannel import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } import akka.util.ByteString import akka.actor.ActorRef import akka.io.Udp._ +import akka.io.Inet._ import akka.TestUtils._ class UdpIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO akka.actor.serialize-creators = on""") with ImplicitSender { - val addresses = temporaryServerAddresses(3, udp = true) + val addresses = temporaryServerAddresses(5, udp = true) def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() @@ -73,6 +75,40 @@ class UdpIntegrationSpec extends AkkaSpec(""" else checkSendingToClient() } } + + "call SocketOption.beforeBind method before bind." in { + val commander = TestProbe() + val assertOption = AssertBeforeBind() + commander.send(IO(Udp), Bind(testActor, addresses(3), options = List(assertOption))) + commander.expectMsg(Bound(addresses(3))) + assert(assertOption.beforeCalled === 1) + } + + "call SocketOption.afterConnect method after binding." in { + val commander = TestProbe() + val assertOption = AssertAfterConnect() + commander.send(IO(Udp), Bind(testActor, addresses(4), options = List(assertOption))) + commander.expectMsg(Bound(addresses(4))) + assert(assertOption.afterCalled === 1) + } } } + +private case class AssertBeforeBind() extends SocketOption { + var beforeCalled = 0 + + override def beforeBind(c: DatagramChannel) = { + assert(!c.socket.isBound) + beforeCalled += 1 + } +} + +private case class AssertAfterConnect() extends SocketOption { + var afterCalled = 0 + + override def afterConnect(c: DatagramChannel) = { + assert(c.socket.isBound) + afterCalled += 1 + } +} diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 32c9533a19..de117e394e 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -3,7 +3,7 @@ */ package akka.io -import java.net.{ DatagramSocket, Socket, ServerSocket } +import java.nio.channels.{ DatagramChannel, SocketChannel, ServerSocketChannel } object Inet { @@ -13,19 +13,38 @@ object Inet { */ trait SocketOption { - def beforeDatagramBind(ds: DatagramSocket): Unit = () - - def beforeServerSocketBind(ss: ServerSocket): Unit = () + /** + * Action to be taken for this option before bind() is called + */ + def beforeBind(ds: DatagramChannel): Unit = () /** - * Action to be taken for this option before calling connect() + * Action to be taken for this option before bind() is called */ - def beforeConnect(s: Socket): Unit = () + def beforeBind(ss: ServerSocketChannel): Unit = () + + /** + * Action to be taken for this option before bind() is called + */ + def beforeBind(s: SocketChannel): Unit = () + /** * Action to be taken for this option after connect returned (i.e. on * the slave socket for servers). */ - def afterConnect(s: Socket): Unit = () + def afterConnect(c: DatagramChannel): Unit = () + + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + def afterConnect(c: ServerSocketChannel): Unit = () + + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + def afterConnect(c: SocketChannel): Unit = () } object SO { @@ -37,9 +56,9 @@ object Inet { */ final case class ReceiveBufferSize(size: Int) extends SocketOption { require(size > 0, "ReceiveBufferSize must be > 0") - override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size) - override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size) - override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size) + override def beforeBind(c: ServerSocketChannel): Unit = c.socket.setReceiveBufferSize(size) + override def beforeBind(c: DatagramChannel): Unit = c.socket.setReceiveBufferSize(size) + override def beforeBind(c: SocketChannel): Unit = c.socket.setReceiveBufferSize(size) } // server socket options @@ -50,9 +69,9 @@ object Inet { * For more information see [[java.net.Socket.setReuseAddress]] */ final case class ReuseAddress(on: Boolean) extends SocketOption { - override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReuseAddress(on) - override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReuseAddress(on) - override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on) + override def beforeBind(c: ServerSocketChannel): Unit = c.socket.setReuseAddress(on) + override def beforeBind(c: DatagramChannel): Unit = c.socket.setReuseAddress(on) + override def beforeBind(c: SocketChannel): Unit = c.socket.setReuseAddress(on) } /** @@ -62,7 +81,8 @@ object Inet { */ final case class SendBufferSize(size: Int) extends SocketOption { require(size > 0, "SendBufferSize must be > 0") - override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size) + override def afterConnect(c: DatagramChannel): Unit = c.socket.setSendBufferSize(size) + override def afterConnect(c: SocketChannel): Unit = c.socket.setSendBufferSize(size) } /** @@ -74,7 +94,8 @@ object Inet { */ final case class TrafficClass(tc: Int) extends SocketOption { require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]") - override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc) + override def afterConnect(c: DatagramChannel): Unit = c.socket.setTrafficClass(tc) + override def afterConnect(c: SocketChannel): Unit = c.socket.setTrafficClass(tc) } } diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 32f11d0e01..8dbe38d405 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -5,7 +5,7 @@ package akka.io import java.net.InetSocketAddress -import java.net.Socket +import java.nio.channels.SocketChannel import akka.io.Inet._ import com.typesafe.config.Config import scala.concurrent.duration._ @@ -56,7 +56,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * For more information see [[java.net.Socket.setKeepAlive]] */ final case class KeepAlive(on: Boolean) extends SocketOption { - override def afterConnect(s: Socket): Unit = s.setKeepAlive(on) + override def afterConnect(c: SocketChannel): Unit = c.socket.setKeepAlive(on) } /** @@ -67,7 +67,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * For more information see [[java.net.Socket.setOOBInline]] */ final case class OOBInline(on: Boolean) extends SocketOption { - override def afterConnect(s: Socket): Unit = s.setOOBInline(on) + override def afterConnect(c: SocketChannel): Unit = c.socket.setOOBInline(on) } // SO_LINGER is handled by the Close code @@ -81,7 +81,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider { * For more information see [[java.net.Socket.setTcpNoDelay]] */ final case class TcpNoDelay(on: Boolean) extends SocketOption { - override def afterConnect(s: Socket): Unit = s.setTcpNoDelay(on) + override def afterConnect(c: SocketChannel): Unit = c.socket.setTcpNoDelay(on) } } diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index fba775f98a..084f6d7de9 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -179,7 +179,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha options: immutable.Traversable[SocketOption]): Unit = { // Turn off Nagle's algorithm by default channel.socket.setTcpNoDelay(true) - options.foreach(_.afterConnect(channel.socket)) + options.foreach(_.afterConnect(channel)) commander ! Connected( channel.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress], diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index fc293411e2..ee16cdad23 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -49,7 +49,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, val localAddress = try { val socket = channel.socket - bind.options.foreach(_.beforeServerSocketBind(socket)) + bind.options.foreach(_.beforeBind(channel)) socket.bind(bind.localAddress, bind.backlog) val ret = socket.getLocalSocketAddress match { case isa: InetSocketAddress ⇒ isa @@ -57,6 +57,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, } channelRegistry.register(channel, if (bind.pullMode) 0 else SelectionKey.OP_ACCEPT) log.debug("Successfully bound to {}", ret) + bind.options.foreach(_.afterConnect(channel)) ret } catch { case NonFatal(e) ⇒ diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 696d636e4c..8c87dee287 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -30,7 +30,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, context.watch(commander) // sign death pact - options.foreach(_.beforeConnect(channel.socket)) + options.foreach(_.beforeBind(channel)) localAddress.foreach(channel.socket.bind) channelRegistry.register(channel, 0) timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 280ab2ba17..79858931e4 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -3,8 +3,8 @@ */ package akka.io -import java.net.DatagramSocket import java.net.InetSocketAddress +import java.nio.channels.DatagramChannel import com.typesafe.config.Config import scala.collection.immutable import akka.io.Inet.{ SoJavaFactories, SocketOption } @@ -180,7 +180,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider { * For more information see [[java.net.DatagramSocket#setBroadcast]] */ final case class Broadcast(on: Boolean) extends SocketOption { - override def beforeDatagramBind(s: DatagramSocket): Unit = s.setBroadcast(on) + override def beforeBind(c: DatagramChannel): Unit = c.socket.setBroadcast(on) } } diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 0c56be52a4..dc63992123 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -35,7 +35,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt, val datagramChannel = DatagramChannel.open datagramChannel.configureBlocking(false) val socket = datagramChannel.socket - options.foreach(_.beforeDatagramBind(socket)) + options.foreach(_.beforeBind(datagramChannel)) try { localAddress foreach socket.bind datagramChannel.connect(remoteAddress) @@ -53,6 +53,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt, def receive = { case registration: ChannelRegistration ⇒ + options.foreach(_.afterConnect(channel)) commander ! Connected context.become(connected(registration), discardOld = true) } diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index ff67cc19c2..d460368e81 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -37,7 +37,7 @@ private[io] class UdpListener(val udp: UdpExt, val localAddress = try { val socket = channel.socket - bind.options.foreach(_.beforeDatagramBind(socket)) + bind.options.foreach(_.beforeBind(channel)) socket.bind(bind.localAddress) val ret = socket.getLocalSocketAddress match { case isa: InetSocketAddress ⇒ isa @@ -45,6 +45,7 @@ private[io] class UdpListener(val udp: UdpExt, } channelRegistry.register(channel, OP_READ) log.debug("Successfully bound to [{}]", ret) + bind.options.foreach(_.afterConnect(channel)) ret } catch { case NonFatal(e) ⇒ diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index 05339c1529..9f1c639891 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -23,9 +23,7 @@ private[io] class UdpSender(val udp: UdpExt, val channel = { val datagramChannel = DatagramChannel.open datagramChannel.configureBlocking(false) - val socket = datagramChannel.socket - - options foreach { _.beforeDatagramBind(socket) } + options foreach { _.beforeBind(datagramChannel) } datagramChannel } @@ -33,6 +31,7 @@ private[io] class UdpSender(val udp: UdpExt, def receive: Receive = { case registration: ChannelRegistration ⇒ + options.foreach(_.afterConnect(channel)) commander ! SimpleSenderReady context.become(sendHandlers(registration)) } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 764422fe6e..737251fd58 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -51,6 +51,21 @@ Which turns out to be useful in many systems where same-state transitions actual In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead. +SocketOption's method signature changed to access channel +========================================================= +Server Socket Methods have been changed to take a channel instead of a socket. The channel's socket can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7. + +======================================== ===================================== + 2.3 2.4 +======================================== ===================================== +``beforeDatagramBind(DatagramSocket)`` ``beforeBind(DatagramChannel)`` +``beforeServerSocketBind(ServerSocket)`` ``beforeBind(ServerSocketChannel)`` +``beforeConnect(Socket)`` ``beforeBind(SocketChannel)`` +\ ``afterConnect(DatagramChannel)`` +\ ``afterConnect(ServerSocketChannel)`` +``afterConnect(Socket)`` ``afterConnect(SocketChannel)`` +======================================== ===================================== + Removed Deprecated Features ===========================