!act #13490 Changed the callback to SocketOption to accept a channel instead of a Socket, this allows for using the nio features.

For example in Java 7 you can now join a multicast group:

case class JoinGroup(group: InetAddress, networkInterface: NetworkInterface) extends SocketOption {

  override def afterConnect(c: DatagramChannel): Unit = {
    c.join(group, networkInterface)
  }
}

  IO(Udp) ! Udp.Bind(self, new InetSocketAddress(MulticastListener.port),
    options=List(ReuseAddress(true),
      JoinGroup(MulticastListener.group, MulticastListener.interf)))

Other minor changes:

 - changed all methods in SocketOption to take a Channel instead of a Socket.  The socket can be gotten from the Channel but not the reverse.
 - all methods that are called before the bind are now called beforeBind for consistency.
 - All network connections now call the beforeBind and afterConnect.
This commit is contained in:
Michael R. Maletich 2013-09-08 09:29:45 -05:00
parent cb05725c1e
commit a6d3704ef6
11 changed files with 104 additions and 30 deletions

View file

@ -4,17 +4,19 @@
package akka.io package akka.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
import akka.util.ByteString import akka.util.ByteString
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.io.Udp._ import akka.io.Udp._
import akka.io.Inet._
import akka.TestUtils._ import akka.TestUtils._
class UdpIntegrationSpec extends AkkaSpec(""" class UdpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO akka.loglevel = INFO
akka.actor.serialize-creators = on""") with ImplicitSender { akka.actor.serialize-creators = on""") with ImplicitSender {
val addresses = temporaryServerAddresses(3, udp = true) val addresses = temporaryServerAddresses(5, udp = true)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe() val commander = TestProbe()
@ -73,6 +75,40 @@ class UdpIntegrationSpec extends AkkaSpec("""
else checkSendingToClient() else checkSendingToClient()
} }
} }
"call SocketOption.beforeBind method before bind." in {
val commander = TestProbe()
val assertOption = AssertBeforeBind()
commander.send(IO(Udp), Bind(testActor, addresses(3), options = List(assertOption)))
commander.expectMsg(Bound(addresses(3)))
assert(assertOption.beforeCalled === 1)
}
"call SocketOption.afterConnect method after binding." in {
val commander = TestProbe()
val assertOption = AssertAfterConnect()
commander.send(IO(Udp), Bind(testActor, addresses(4), options = List(assertOption)))
commander.expectMsg(Bound(addresses(4)))
assert(assertOption.afterCalled === 1)
}
} }
} }
private case class AssertBeforeBind() extends SocketOption {
var beforeCalled = 0
override def beforeBind(c: DatagramChannel) = {
assert(!c.socket.isBound)
beforeCalled += 1
}
}
private case class AssertAfterConnect() extends SocketOption {
var afterCalled = 0
override def afterConnect(c: DatagramChannel) = {
assert(c.socket.isBound)
afterCalled += 1
}
}

View file

@ -3,7 +3,7 @@
*/ */
package akka.io package akka.io
import java.net.{ DatagramSocket, Socket, ServerSocket } import java.nio.channels.{ DatagramChannel, SocketChannel, ServerSocketChannel }
object Inet { object Inet {
@ -13,19 +13,38 @@ object Inet {
*/ */
trait SocketOption { trait SocketOption {
def beforeDatagramBind(ds: DatagramSocket): Unit = () /**
* Action to be taken for this option before bind() is called
def beforeServerSocketBind(ss: ServerSocket): Unit = () */
def beforeBind(ds: DatagramChannel): Unit = ()
/** /**
* Action to be taken for this option before calling connect() * Action to be taken for this option before bind() is called
*/ */
def beforeConnect(s: Socket): Unit = () def beforeBind(ss: ServerSocketChannel): Unit = ()
/**
* Action to be taken for this option before bind() is called
*/
def beforeBind(s: SocketChannel): Unit = ()
/** /**
* Action to be taken for this option after connect returned (i.e. on * Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers). * the slave socket for servers).
*/ */
def afterConnect(s: Socket): Unit = () def afterConnect(c: DatagramChannel): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
def afterConnect(c: ServerSocketChannel): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
def afterConnect(c: SocketChannel): Unit = ()
} }
object SO { object SO {
@ -37,9 +56,9 @@ object Inet {
*/ */
final case class ReceiveBufferSize(size: Int) extends SocketOption { final case class ReceiveBufferSize(size: Int) extends SocketOption {
require(size > 0, "ReceiveBufferSize must be > 0") require(size > 0, "ReceiveBufferSize must be > 0")
override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size) override def beforeBind(c: ServerSocketChannel): Unit = c.socket.setReceiveBufferSize(size)
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size) override def beforeBind(c: DatagramChannel): Unit = c.socket.setReceiveBufferSize(size)
override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size) override def beforeBind(c: SocketChannel): Unit = c.socket.setReceiveBufferSize(size)
} }
// server socket options // server socket options
@ -50,9 +69,9 @@ object Inet {
* For more information see [[java.net.Socket.setReuseAddress]] * For more information see [[java.net.Socket.setReuseAddress]]
*/ */
final case class ReuseAddress(on: Boolean) extends SocketOption { final case class ReuseAddress(on: Boolean) extends SocketOption {
override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReuseAddress(on) override def beforeBind(c: ServerSocketChannel): Unit = c.socket.setReuseAddress(on)
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReuseAddress(on) override def beforeBind(c: DatagramChannel): Unit = c.socket.setReuseAddress(on)
override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on) override def beforeBind(c: SocketChannel): Unit = c.socket.setReuseAddress(on)
} }
/** /**
@ -62,7 +81,8 @@ object Inet {
*/ */
final case class SendBufferSize(size: Int) extends SocketOption { final case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "SendBufferSize must be > 0") require(size > 0, "SendBufferSize must be > 0")
override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size) override def afterConnect(c: DatagramChannel): Unit = c.socket.setSendBufferSize(size)
override def afterConnect(c: SocketChannel): Unit = c.socket.setSendBufferSize(size)
} }
/** /**
@ -74,7 +94,8 @@ object Inet {
*/ */
final case class TrafficClass(tc: Int) extends SocketOption { final case class TrafficClass(tc: Int) extends SocketOption {
require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]") require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]")
override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc) override def afterConnect(c: DatagramChannel): Unit = c.socket.setTrafficClass(tc)
override def afterConnect(c: SocketChannel): Unit = c.socket.setTrafficClass(tc)
} }
} }

View file

@ -5,7 +5,7 @@
package akka.io package akka.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.Socket import java.nio.channels.SocketChannel
import akka.io.Inet._ import akka.io.Inet._
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -56,7 +56,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* For more information see [[java.net.Socket.setKeepAlive]] * For more information see [[java.net.Socket.setKeepAlive]]
*/ */
final case class KeepAlive(on: Boolean) extends SocketOption { final case class KeepAlive(on: Boolean) extends SocketOption {
override def afterConnect(s: Socket): Unit = s.setKeepAlive(on) override def afterConnect(c: SocketChannel): Unit = c.socket.setKeepAlive(on)
} }
/** /**
@ -67,7 +67,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* For more information see [[java.net.Socket.setOOBInline]] * For more information see [[java.net.Socket.setOOBInline]]
*/ */
final case class OOBInline(on: Boolean) extends SocketOption { final case class OOBInline(on: Boolean) extends SocketOption {
override def afterConnect(s: Socket): Unit = s.setOOBInline(on) override def afterConnect(c: SocketChannel): Unit = c.socket.setOOBInline(on)
} }
// SO_LINGER is handled by the Close code // SO_LINGER is handled by the Close code
@ -81,7 +81,7 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
* For more information see [[java.net.Socket.setTcpNoDelay]] * For more information see [[java.net.Socket.setTcpNoDelay]]
*/ */
final case class TcpNoDelay(on: Boolean) extends SocketOption { final case class TcpNoDelay(on: Boolean) extends SocketOption {
override def afterConnect(s: Socket): Unit = s.setTcpNoDelay(on) override def afterConnect(c: SocketChannel): Unit = c.socket.setTcpNoDelay(on)
} }
} }

View file

@ -179,7 +179,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
options: immutable.Traversable[SocketOption]): Unit = { options: immutable.Traversable[SocketOption]): Unit = {
// Turn off Nagle's algorithm by default // Turn off Nagle's algorithm by default
channel.socket.setTcpNoDelay(true) channel.socket.setTcpNoDelay(true)
options.foreach(_.afterConnect(channel.socket)) options.foreach(_.afterConnect(channel))
commander ! Connected( commander ! Connected(
channel.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress], channel.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress],

View file

@ -49,7 +49,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
val localAddress = val localAddress =
try { try {
val socket = channel.socket val socket = channel.socket
bind.options.foreach(_.beforeServerSocketBind(socket)) bind.options.foreach(_.beforeBind(channel))
socket.bind(bind.localAddress, bind.backlog) socket.bind(bind.localAddress, bind.backlog)
val ret = socket.getLocalSocketAddress match { val ret = socket.getLocalSocketAddress match {
case isa: InetSocketAddress isa case isa: InetSocketAddress isa
@ -57,6 +57,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
} }
channelRegistry.register(channel, if (bind.pullMode) 0 else SelectionKey.OP_ACCEPT) channelRegistry.register(channel, if (bind.pullMode) 0 else SelectionKey.OP_ACCEPT)
log.debug("Successfully bound to {}", ret) log.debug("Successfully bound to {}", ret)
bind.options.foreach(_.afterConnect(channel))
ret ret
} catch { } catch {
case NonFatal(e) case NonFatal(e)

View file

@ -30,7 +30,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
context.watch(commander) // sign death pact context.watch(commander) // sign death pact
options.foreach(_.beforeConnect(channel.socket)) options.foreach(_.beforeBind(channel))
localAddress.foreach(channel.socket.bind) localAddress.foreach(channel.socket.bind)
channelRegistry.register(channel, 0) channelRegistry.register(channel, 0)
timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied

View file

@ -3,8 +3,8 @@
*/ */
package akka.io package akka.io
import java.net.DatagramSocket
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.collection.immutable import scala.collection.immutable
import akka.io.Inet.{ SoJavaFactories, SocketOption } import akka.io.Inet.{ SoJavaFactories, SocketOption }
@ -180,7 +180,7 @@ object Udp extends ExtensionId[UdpExt] with ExtensionIdProvider {
* For more information see [[java.net.DatagramSocket#setBroadcast]] * For more information see [[java.net.DatagramSocket#setBroadcast]]
*/ */
final case class Broadcast(on: Boolean) extends SocketOption { final case class Broadcast(on: Boolean) extends SocketOption {
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setBroadcast(on) override def beforeBind(c: DatagramChannel): Unit = c.socket.setBroadcast(on)
} }
} }

View file

@ -35,7 +35,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
val datagramChannel = DatagramChannel.open val datagramChannel = DatagramChannel.open
datagramChannel.configureBlocking(false) datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket val socket = datagramChannel.socket
options.foreach(_.beforeDatagramBind(socket)) options.foreach(_.beforeBind(datagramChannel))
try { try {
localAddress foreach socket.bind localAddress foreach socket.bind
datagramChannel.connect(remoteAddress) datagramChannel.connect(remoteAddress)
@ -53,6 +53,7 @@ private[io] class UdpConnection(udpConn: UdpConnectedExt,
def receive = { def receive = {
case registration: ChannelRegistration case registration: ChannelRegistration
options.foreach(_.afterConnect(channel))
commander ! Connected commander ! Connected
context.become(connected(registration), discardOld = true) context.become(connected(registration), discardOld = true)
} }

View file

@ -37,7 +37,7 @@ private[io] class UdpListener(val udp: UdpExt,
val localAddress = val localAddress =
try { try {
val socket = channel.socket val socket = channel.socket
bind.options.foreach(_.beforeDatagramBind(socket)) bind.options.foreach(_.beforeBind(channel))
socket.bind(bind.localAddress) socket.bind(bind.localAddress)
val ret = socket.getLocalSocketAddress match { val ret = socket.getLocalSocketAddress match {
case isa: InetSocketAddress isa case isa: InetSocketAddress isa
@ -45,6 +45,7 @@ private[io] class UdpListener(val udp: UdpExt,
} }
channelRegistry.register(channel, OP_READ) channelRegistry.register(channel, OP_READ)
log.debug("Successfully bound to [{}]", ret) log.debug("Successfully bound to [{}]", ret)
bind.options.foreach(_.afterConnect(channel))
ret ret
} catch { } catch {
case NonFatal(e) case NonFatal(e)

View file

@ -23,9 +23,7 @@ private[io] class UdpSender(val udp: UdpExt,
val channel = { val channel = {
val datagramChannel = DatagramChannel.open val datagramChannel = DatagramChannel.open
datagramChannel.configureBlocking(false) datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket options foreach { _.beforeBind(datagramChannel) }
options foreach { _.beforeDatagramBind(socket) }
datagramChannel datagramChannel
} }
@ -33,6 +31,7 @@ private[io] class UdpSender(val udp: UdpExt,
def receive: Receive = { def receive: Receive = {
case registration: ChannelRegistration case registration: ChannelRegistration
options.foreach(_.afterConnect(channel))
commander ! SimpleSenderReady commander ! SimpleSenderReady
context.become(sendHandlers(registration)) context.become(sendHandlers(registration))
} }

View file

@ -51,6 +51,21 @@ Which turns out to be useful in many systems where same-state transitions actual
In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead. In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead.
SocketOption's method signature changed to access channel
=========================================================
Server Socket Methods have been changed to take a channel instead of a socket. The channel's socket can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7.
======================================== =====================================
2.3 2.4
======================================== =====================================
``beforeDatagramBind(DatagramSocket)`` ``beforeBind(DatagramChannel)``
``beforeServerSocketBind(ServerSocket)`` ``beforeBind(ServerSocketChannel)``
``beforeConnect(Socket)`` ``beforeBind(SocketChannel)``
\ ``afterConnect(DatagramChannel)``
\ ``afterConnect(ServerSocketChannel)``
``afterConnect(Socket)`` ``afterConnect(SocketChannel)``
======================================== =====================================
Removed Deprecated Features Removed Deprecated Features
=========================== ===========================