!act #15626 expose DatagramChannel creation in DatagramChannelCreator

* move channel creation logic to a separate trait
* new Java API: AbstractSocketOption
This commit is contained in:
Martynas Mickevicius 2014-08-19 14:02:23 +03:00
parent eb766d49f3
commit 325e05ee27
12 changed files with 466 additions and 6 deletions

View file

@ -16,7 +16,7 @@ class UdpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO
akka.actor.serialize-creators = on""") with ImplicitSender {
val addresses = temporaryServerAddresses(5, udp = true)
val addresses = temporaryServerAddresses(6, udp = true)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
@ -91,6 +91,14 @@ class UdpIntegrationSpec extends AkkaSpec("""
commander.expectMsg(Bound(addresses(4)))
assert(assertOption.afterCalled === 1)
}
"call DatagramChannelCreator.create method when opening channel" in {
val commander = TestProbe()
val assertOption = AssertOpenDatagramChannel()
commander.send(IO(Udp), Bind(testActor, addresses(5), options = List(assertOption)))
commander.expectMsg(Bound(addresses(5)))
assert(assertOption.openCalled === 1)
}
}
}
@ -112,3 +120,12 @@ private case class AssertAfterConnect() extends SocketOption {
afterCalled += 1
}
}
private case class AssertOpenDatagramChannel() extends DatagramChannelCreator {
var openCalled = 0
override def create() = {
openCalled += 1
super.create()
}
}

View file

@ -9,7 +9,7 @@ object Inet {
/**
* SocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a socket).
* behavior (how to apply that to a channel).
*/
trait SocketOption {
@ -47,6 +47,26 @@ object Inet {
def afterConnect(c: SocketChannel): Unit = ()
}
/**
* DatagramChannel creation behavior.
*/
class DatagramChannelCreator extends SocketOption {
/**
* Open and return new DatagramChannel.
*
* [[scala.throws]] is needed because [[DatagramChannel.open]] method
* can throw an exception.
*/
@throws(classOf[Exception])
def create(): DatagramChannel = DatagramChannel.open()
}
object DatagramChannelCreator {
val default = new DatagramChannelCreator()
def apply() = default
}
object SO {
/**
@ -165,4 +185,43 @@ object Inet {
def trafficClass(tc: Int) = TrafficClass(tc)
}
/**
* Java API: AbstractSocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a channel).
*/
abstract class AbstractSocketOption extends SocketOption {
/**
* Action to be taken for this option before bind() is called
*/
override def beforeBind(ds: DatagramChannel): Unit = ()
/**
* Action to be taken for this option before bind() is called
*/
override def beforeBind(ss: ServerSocketChannel): Unit = ()
/**
* Action to be taken for this option before bind() is called
*/
override def beforeBind(s: SocketChannel): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
override def afterConnect(c: DatagramChannel): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
override def afterConnect(c: ServerSocketChannel): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
override def afterConnect(c: SocketChannel): Unit = ()
}
}

View file

@ -12,6 +12,7 @@ import scala.util.control.NonFatal
import akka.actor.{ ActorLogging, Actor, ActorRef }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString
import akka.io.Inet.DatagramChannelCreator
import akka.io.SelectionHandler._
import akka.io.Udp._
@ -31,7 +32,9 @@ private[io] class UdpListener(val udp: UdpExt,
context.watch(bind.handler) // sign death pact
val channel = DatagramChannel.open
val channel = bind.options.collectFirst {
case creator: DatagramChannelCreator creator
}.getOrElse(DatagramChannelCreator()).create()
channel.configureBlocking(false)
val localAddress =

View file

@ -83,3 +83,24 @@ the biggest difference is the absence of remote address information in
in the case of connection-based UDP the security check is cached after
connect, thus writes do not suffer an additional performance penalty.
UDP Multicast
------------------------------------------
If you want to use UDP multicast you will need to use Java 7. Akka provides
a way to control various options of ``DatagramChannel`` through the
``akka.io.Inet.SocketOption`` interface. The example below shows
how to setup a receiver of multicast messages using IPv6 protocol.
To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator``
class which implements ``akka.io.Inet.SocketOption``. Provide custom logic
for opening a datagram channel by overriding :meth:`create` method.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#inet6-protocol-family
Another socket option will be needed to join a multicast group.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#multicast-group
Socket options must be provided to :meth:`UdpMessage.bind` command.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#bind

View file

@ -51,9 +51,10 @@ Which turns out to be useful in many systems where same-state transitions actual
In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead.
SocketOption's method signature changed to access channel
=========================================================
Server Socket Methods have been changed to take a channel instead of a socket. The channel's socket can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7.
More control over Channel properties in Akka-IO
===============================================
Method signatures for ``SocketOption`` have been changed to take a channel instead of a socket. The channel's socket
can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7.
======================================== =====================================
2.3 2.4
@ -66,6 +67,9 @@ Server Socket Methods have been changed to take a channel instead of a socket.
``afterConnect(Socket)`` ``afterConnect(SocketChannel)``
======================================== =====================================
A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for
custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels.
Removed Deprecated Features
===========================

View file

@ -83,4 +83,24 @@ the biggest difference is the absence of remote address information in
in the case of connection-based UDP the security check is cached after
connect, thus writes do not suffer an additional performance penalty.
UDP Multicast
------------------------------------------
If you want to use UDP multicast you will need to use Java 7. Akka provides
a way to control various options of ``DatagramChannel`` through the
``akka.io.Inet.SocketOption`` interface. The example below shows
how to setup a receiver of multicast messages using IPv6 protocol.
To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator``
class which extends ``akka.io.Inet.SocketOption``. Provide custom logic
for opening a datagram channel by overriding :meth:`create` method.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#inet6-protocol-family
Another socket option will be needed to join a multicast group.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#multicast-group
Socket options must be provided to :class:`UdpMessage.Bind` message.
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#bind

View file

@ -0,0 +1,19 @@
name := "akka-docs-udp-multicast"
version := "2.4-SNAPSHOT"
scalaVersion := "2.10.4"
compileOrder := CompileOrder.ScalaThenJava
javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint")
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT",
"com.typesafe.akka" %% "akka-testkit" % "2.4-SNAPSHOT",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"junit" % "junit" % "4.11" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test"
)

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
//#imports
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.io.Inet;
import akka.io.Udp;
import akka.io.UdpMessage;
import akka.util.ByteString;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.StandardProtocolFamily;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
//#imports
public class JavaUdpMulticast {
//#inet6-protocol-family
public static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator {
@Override
public DatagramChannel create() throws Exception {
return DatagramChannel.open(StandardProtocolFamily.INET6);
}
}
//#inet6-protocol-family
//#multicast-group
public static class MulticastGroup extends Inet.AbstractSocketOption {
private String address;
private String interf;
public MulticastGroup(String address, String interf) {
this.address = address;
this.interf = interf;
}
@Override
public void afterConnect(DatagramChannel c) {
try {
InetAddress group = InetAddress.getByName(address);
NetworkInterface networkInterface = NetworkInterface.getByName(interf);
c.join(group, networkInterface);
} catch (Exception ex) {
System.out.println("Unable to join multicast group.");
}
}
}
//#multicast-group
public static class Listener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
ActorRef sink;
public Listener(String iface, String group, Integer port, ActorRef sink) {
this.sink = sink;
//#bind
List<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
options.add(new MulticastGroup(group, iface));
final ActorRef mgr = Udp.get(getContext().system()).getManager();
// listen for datagrams on this address
InetSocketAddress endpoint = new InetSocketAddress(port);
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
//#bind
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.Bound) {
final Udp.Bound b = (Udp.Bound) msg;
log.info("Bound to " + b.localAddress());
} else if (msg instanceof Udp.Received) {
final Udp.Received r = (Udp.Received) msg;
log.info("Received '" + r.data().decodeString("utf-8") + "' from '" + r.sender() + "'");
} else unhandled(msg);
}
}
public static class Sender extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
String group;
Integer port;
String message;
public Sender(String group, Integer port, String msg) {
this.group = group;
this.port = port;
this.message = msg;
List<Inet.SocketOption> options = new ArrayList<>();
options.add(new Inet6ProtocolFamily());
final ActorRef mgr = Udp.get(getContext().system()).getManager();
mgr.tell(UdpMessage.simpleSender(options), getSelf());
}
@Override
public void onReceive(Object msg) {
if (msg instanceof Udp.SimpleSenderReady) {
InetSocketAddress remote = new InetSocketAddress(group, port);
log.info("Sending message to " + remote);
getSender().tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf());
} else unhandled(msg);
}
}
}

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import java.net.{InetAddress, InetSocketAddress, NetworkInterface, StandardProtocolFamily}
import java.nio.channels.DatagramChannel
import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.io.Inet.{DatagramChannelCreator, SocketOption}
import akka.io.{IO, Udp}
import akka.util.ByteString
//#inet6-protocol-family
final case class Inet6ProtocolFamily() extends DatagramChannelCreator {
override def create() =
DatagramChannel.open(StandardProtocolFamily.INET6)
}
//#inet6-protocol-family
//#multicast-group
final case class MulticastGroup(address: String, interface: String) extends SocketOption {
override def afterConnect(c: DatagramChannel) {
val group = InetAddress.getByName(address)
val networkInterface = NetworkInterface.getByName(interface)
c.join(group, networkInterface)
}
}
//#multicast-group
class Listener(iface: String, group: String, port: Int, sink: ActorRef) extends Actor with ActorLogging {
//#bind
import context.system
val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface))
IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts)
//#bind
def receive = {
case Udp.Bound(to) => log.info(s"Bound to $to")
case Udp.Received(data, remote) =>
val msg = data.decodeString("utf-8")
log.info(s"Received '$msg' from '$remote'")
sink ! msg
}
}
class Sender(group: String, port: Int, msg: String) extends Actor with ActorLogging {
import context.system
IO(Udp) ! Udp.SimpleSender(List(Inet6ProtocolFamily()))
def receive = {
case Udp.SimpleSenderReady => {
val remote = new InetSocketAddress(group, port)
log.info(s"Sending message to $remote")
sender() ! Udp.Send(ByteString(msg), remote)
}
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
public class JavaUdpMulticastTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("JavaUdpMulticastTest");
}
@Test
public void testUdpMulticast() throws Exception {
new JavaTestKit(system) {{
NetworkInterface ipv6Iface = null;
for (Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); interfaces.hasMoreElements() && ipv6Iface == null;) {
NetworkInterface interf = interfaces.nextElement();
for (Enumeration<InetAddress> addresses = interf.getInetAddresses(); addresses.hasMoreElements() && ipv6Iface == null;) {
InetAddress address = addresses.nextElement();
if (address instanceof Inet6Address) {
ipv6Iface = interf;
}
}
}
final String group = "FF33::1200";
final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface);
final String msg = "ohi";
final ActorRef sink = getRef();
final ActorRef listener = system.actorOf(Props.create(Listener.class, ipv6Iface.getName(), group, port, sink));
final ActorRef sender = system.actorOf(Props.create(Sender.class, group, port, msg));
expectMsgEquals(msg);
// unbind
system.stop(listener);
}};
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
}

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import java.net.{Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily}
import java.nio.channels.DatagramChannel
import akka.actor.{ActorSystem, Props}
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfter, WordSpecLike}
import scala.collection.JavaConversions.enumerationAsScalaIterator
class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfter {
"listener" should {
"send message back to sink" in {
val Some(ipv6Iface) = NetworkInterface.getNetworkInterfaces.collectFirst {
case iface if iface.getInetAddresses.exists(_.isInstanceOf[Inet6Address]) => iface
}
val port = TestUtils.temporaryUdpIpv6Port(ipv6Iface)
val (iface, group, msg, sink) = (ipv6Iface.getName, "FF33::1200", "ohi", testActor)
val listener = system.actorOf(Props(classOf[Listener], iface, group, port, sink))
val sender = system.actorOf(Props(classOf[Sender], group, port, msg))
expectMsg(msg)
// unbind
system.stop(listener)
}
}
def afterAll = {
TestKit.shutdownActorSystem(system)
}
}
object TestUtils {
def temporaryUdpIpv6Port(iface: NetworkInterface) = {
val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket()
serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0))
val port = serverSocket.getLocalPort
serverSocket.close()
port
}
}

View file

@ -3,6 +3,8 @@
# defaults
declare -r default_java_home="/usr/local/share/java/jdk6"
declare -r default_java8_home="/usr/local/share/java/jdk8"
declare -r default_sbt_jar="/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar"
declare -r default_ivy_home="~/.ivy2"
# get the source location for this script; handles symlinks
function get_script_path {
@ -64,6 +66,16 @@ function mvncleantest {
try mvn clean test "mvn execution in $2 failed"
}
# run sbt clean test using the specified java home in the specified directory
function sbtcleantest {
tmp="$script_dir/../../$2"
try cd "$tmp" "can't step into project directory: $tmp"
orig_path="$PATH"
export PATH="$1/bin:$PATH"
try java -jar $sbt_jar -Dsbt.ivy.home=$ivy_home clean test "sbt execution in $2 failed"
export PATH="$orig_path"
}
# initialize variables with defaults and override from environment
declare java_home="$default_java_home"
if [ $AKKA_BUILD_JAVA_HOME ]; then
@ -75,6 +87,16 @@ if [ $AKKA_BUILD_JAVA8_HOME ]; then
java8_home="$AKKA_BUILD_JAVA8_HOME"
fi
declare sbt_jar="$default_sbt_jar"
if [ $AKKA_BUILD_SBT_JAR ]; then
sbt_jar="$AKKA_BUILD_SBT_JAR"
fi
declare ivy_home="$default_ivy_home"
if [ $AKKA_BUILD_IVY_HOME ]; then
ivy_home="$AKKA_BUILD_IVY_HOME"
fi
# process options and set flags
while true; do
case "$1" in
@ -110,3 +132,5 @@ try cd "$tmp" "can't step into project directory: $tmp"
export JAVA_HOME="$java8_home"
try mvn clean compile exec:java -Dexec.mainClass="akka.Main" -Dexec.args="sample.hello.HelloWorld" "mvn execution in $sample_dir failed"
try mvn exec:java -Dexec.mainClass="sample.hello.Main2" "mvn execution in $sample_dir failed"
sbtcleantest "$java8_home" "akka-samples/akka-docs-udp-multicast"