diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index f5d20d5b6b..2c9c0b5194 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -16,7 +16,7 @@ class UdpIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO akka.actor.serialize-creators = on""") with ImplicitSender { - val addresses = temporaryServerAddresses(5, udp = true) + val addresses = temporaryServerAddresses(6, udp = true) def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() @@ -91,6 +91,14 @@ class UdpIntegrationSpec extends AkkaSpec(""" commander.expectMsg(Bound(addresses(4))) assert(assertOption.afterCalled === 1) } + + "call DatagramChannelCreator.create method when opening channel" in { + val commander = TestProbe() + val assertOption = AssertOpenDatagramChannel() + commander.send(IO(Udp), Bind(testActor, addresses(5), options = List(assertOption))) + commander.expectMsg(Bound(addresses(5))) + assert(assertOption.openCalled === 1) + } } } @@ -112,3 +120,12 @@ private case class AssertAfterConnect() extends SocketOption { afterCalled += 1 } } + +private case class AssertOpenDatagramChannel() extends DatagramChannelCreator { + var openCalled = 0 + + override def create() = { + openCalled += 1 + super.create() + } +} diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index de117e394e..a3cb7c65ae 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -9,7 +9,7 @@ object Inet { /** * SocketOption is a package of data (from the user) and associated - * behavior (how to apply that to a socket). + * behavior (how to apply that to a channel). */ trait SocketOption { @@ -47,6 +47,26 @@ object Inet { def afterConnect(c: SocketChannel): Unit = () } + /** + * DatagramChannel creation behavior. + */ + class DatagramChannelCreator extends SocketOption { + + /** + * Open and return new DatagramChannel. + * + * [[scala.throws]] is needed because [[DatagramChannel.open]] method + * can throw an exception. + */ + @throws(classOf[Exception]) + def create(): DatagramChannel = DatagramChannel.open() + } + + object DatagramChannelCreator { + val default = new DatagramChannelCreator() + def apply() = default + } + object SO { /** @@ -165,4 +185,43 @@ object Inet { def trafficClass(tc: Int) = TrafficClass(tc) } + /** + * Java API: AbstractSocketOption is a package of data (from the user) and associated + * behavior (how to apply that to a channel). + */ + abstract class AbstractSocketOption extends SocketOption { + + /** + * Action to be taken for this option before bind() is called + */ + override def beforeBind(ds: DatagramChannel): Unit = () + + /** + * Action to be taken for this option before bind() is called + */ + override def beforeBind(ss: ServerSocketChannel): Unit = () + + /** + * Action to be taken for this option before bind() is called + */ + override def beforeBind(s: SocketChannel): Unit = () + + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + override def afterConnect(c: DatagramChannel): Unit = () + + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + override def afterConnect(c: ServerSocketChannel): Unit = () + + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + override def afterConnect(c: SocketChannel): Unit = () + } } diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index d460368e81..2434c8b78b 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -12,6 +12,7 @@ import scala.util.control.NonFatal import akka.actor.{ ActorLogging, Actor, ActorRef } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString +import akka.io.Inet.DatagramChannelCreator import akka.io.SelectionHandler._ import akka.io.Udp._ @@ -31,7 +32,9 @@ private[io] class UdpListener(val udp: UdpExt, context.watch(bind.handler) // sign death pact - val channel = DatagramChannel.open + val channel = bind.options.collectFirst { + case creator: DatagramChannelCreator ⇒ creator + }.getOrElse(DatagramChannelCreator()).create() channel.configureBlocking(false) val localAddress = diff --git a/akka-docs/rst/java/io-udp.rst b/akka-docs/rst/java/io-udp.rst index 8550e551e4..a91024b461 100644 --- a/akka-docs/rst/java/io-udp.rst +++ b/akka-docs/rst/java/io-udp.rst @@ -83,3 +83,24 @@ the biggest difference is the absence of remote address information in in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty. +UDP Multicast +------------------------------------------ + +If you want to use UDP multicast you will need to use Java 7. Akka provides +a way to control various options of ``DatagramChannel`` through the +``akka.io.Inet.SocketOption`` interface. The example below shows +how to setup a receiver of multicast messages using IPv6 protocol. + +To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator`` +class which implements ``akka.io.Inet.SocketOption``. Provide custom logic +for opening a datagram channel by overriding :meth:`create` method. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#inet6-protocol-family + +Another socket option will be needed to join a multicast group. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#multicast-group + +Socket options must be provided to :meth:`UdpMessage.bind` command. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#bind diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 737251fd58..3d8fc81aa8 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -51,9 +51,10 @@ Which turns out to be useful in many systems where same-state transitions actual In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead. -SocketOption's method signature changed to access channel -========================================================= -Server Socket Methods have been changed to take a channel instead of a socket. The channel's socket can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7. +More control over Channel properties in Akka-IO +=============================================== +Method signatures for ``SocketOption`` have been changed to take a channel instead of a socket. The channel's socket +can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7. ======================================== ===================================== 2.3 2.4 @@ -66,6 +67,9 @@ Server Socket Methods have been changed to take a channel instead of a socket. ``afterConnect(Socket)`` ``afterConnect(SocketChannel)`` ======================================== ===================================== +A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for +custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels. + Removed Deprecated Features =========================== diff --git a/akka-docs/rst/scala/io-udp.rst b/akka-docs/rst/scala/io-udp.rst index 8dc1b14786..7390baaf60 100644 --- a/akka-docs/rst/scala/io-udp.rst +++ b/akka-docs/rst/scala/io-udp.rst @@ -83,4 +83,24 @@ the biggest difference is the absence of remote address information in in the case of connection-based UDP the security check is cached after connect, thus writes do not suffer an additional performance penalty. +UDP Multicast +------------------------------------------ +If you want to use UDP multicast you will need to use Java 7. Akka provides +a way to control various options of ``DatagramChannel`` through the +``akka.io.Inet.SocketOption`` interface. The example below shows +how to setup a receiver of multicast messages using IPv6 protocol. + +To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator`` +class which extends ``akka.io.Inet.SocketOption``. Provide custom logic +for opening a datagram channel by overriding :meth:`create` method. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#inet6-protocol-family + +Another socket option will be needed to join a multicast group. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#multicast-group + +Socket options must be provided to :class:`UdpMessage.Bind` message. + +.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#bind diff --git a/akka-samples/akka-docs-udp-multicast/build.sbt b/akka-samples/akka-docs-udp-multicast/build.sbt new file mode 100644 index 0000000000..359cb8ecd6 --- /dev/null +++ b/akka-samples/akka-docs-udp-multicast/build.sbt @@ -0,0 +1,19 @@ +name := "akka-docs-udp-multicast" + +version := "2.4-SNAPSHOT" + +scalaVersion := "2.10.4" + +compileOrder := CompileOrder.ScalaThenJava + +javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint") + +testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT", + "com.typesafe.akka" %% "akka-testkit" % "2.4-SNAPSHOT", + "org.scalatest" %% "scalatest" % "2.2.1" % "test", + "junit" % "junit" % "4.11" % "test", + "com.novocode" % "junit-interface" % "0.10" % "test" +) diff --git a/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java b/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java new file mode 100644 index 0000000000..42c13819e5 --- /dev/null +++ b/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.io; + +//#imports +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.io.Inet; +import akka.io.Udp; +import akka.io.UdpMessage; +import akka.util.ByteString; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.StandardProtocolFamily; +import java.nio.channels.DatagramChannel; +import java.util.ArrayList; +import java.util.List; +//#imports + +public class JavaUdpMulticast { + //#inet6-protocol-family + public static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator { + @Override + public DatagramChannel create() throws Exception { + return DatagramChannel.open(StandardProtocolFamily.INET6); + } + } + //#inet6-protocol-family + + //#multicast-group + public static class MulticastGroup extends Inet.AbstractSocketOption { + private String address; + private String interf; + + public MulticastGroup(String address, String interf) { + this.address = address; + this.interf = interf; + } + + @Override + public void afterConnect(DatagramChannel c) { + try { + InetAddress group = InetAddress.getByName(address); + NetworkInterface networkInterface = NetworkInterface.getByName(interf); + c.join(group, networkInterface); + } catch (Exception ex) { + System.out.println("Unable to join multicast group."); + } + } + } + //#multicast-group + + public static class Listener extends UntypedActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + ActorRef sink; + + public Listener(String iface, String group, Integer port, ActorRef sink) { + this.sink = sink; + + //#bind + List options = new ArrayList<>(); + options.add(new Inet6ProtocolFamily()); + options.add(new MulticastGroup(group, iface)); + + final ActorRef mgr = Udp.get(getContext().system()).getManager(); + // listen for datagrams on this address + InetSocketAddress endpoint = new InetSocketAddress(port); + mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf()); + //#bind + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof Udp.Bound) { + final Udp.Bound b = (Udp.Bound) msg; + log.info("Bound to " + b.localAddress()); + } else if (msg instanceof Udp.Received) { + final Udp.Received r = (Udp.Received) msg; + log.info("Received '" + r.data().decodeString("utf-8") + "' from '" + r.sender() + "'"); + } else unhandled(msg); + } + } + + public static class Sender extends UntypedActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + String group; + Integer port; + String message; + + public Sender(String group, Integer port, String msg) { + this.group = group; + this.port = port; + this.message = msg; + + List options = new ArrayList<>(); + options.add(new Inet6ProtocolFamily()); + + final ActorRef mgr = Udp.get(getContext().system()).getManager(); + mgr.tell(UdpMessage.simpleSender(options), getSelf()); + } + + @Override + public void onReceive(Object msg) { + if (msg instanceof Udp.SimpleSenderReady) { + InetSocketAddress remote = new InetSocketAddress(group, port); + log.info("Sending message to " + remote); + getSender().tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf()); + } else unhandled(msg); + } + } +} diff --git a/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala b/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala new file mode 100644 index 0000000000..7a361f13a9 --- /dev/null +++ b/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.io + +import java.net.{InetAddress, InetSocketAddress, NetworkInterface, StandardProtocolFamily} +import java.nio.channels.DatagramChannel + +import akka.actor.{Actor, ActorLogging, ActorRef} +import akka.io.Inet.{DatagramChannelCreator, SocketOption} +import akka.io.{IO, Udp} +import akka.util.ByteString + +//#inet6-protocol-family +final case class Inet6ProtocolFamily() extends DatagramChannelCreator { + override def create() = + DatagramChannel.open(StandardProtocolFamily.INET6) +} +//#inet6-protocol-family + +//#multicast-group +final case class MulticastGroup(address: String, interface: String) extends SocketOption { + override def afterConnect(c: DatagramChannel) { + val group = InetAddress.getByName(address) + val networkInterface = NetworkInterface.getByName(interface) + c.join(group, networkInterface) + } +} +//#multicast-group + +class Listener(iface: String, group: String, port: Int, sink: ActorRef) extends Actor with ActorLogging { + //#bind + import context.system + val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface)) + IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts) + //#bind + + def receive = { + case Udp.Bound(to) => log.info(s"Bound to $to") + case Udp.Received(data, remote) => + val msg = data.decodeString("utf-8") + log.info(s"Received '$msg' from '$remote'") + sink ! msg + } +} + +class Sender(group: String, port: Int, msg: String) extends Actor with ActorLogging { + import context.system + IO(Udp) ! Udp.SimpleSender(List(Inet6ProtocolFamily())) + + def receive = { + case Udp.SimpleSenderReady => { + val remote = new InetSocketAddress(group, port) + log.info(s"Sending message to $remote") + sender() ! Udp.Send(ByteString(msg), remote) + } + } +} diff --git a/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java b/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java new file mode 100644 index 0000000000..7f39ac6b77 --- /dev/null +++ b/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.io; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.util.Enumeration; + +public class JavaUdpMulticastTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("JavaUdpMulticastTest"); + } + + @Test + public void testUdpMulticast() throws Exception { + new JavaTestKit(system) {{ + NetworkInterface ipv6Iface = null; + for (Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); interfaces.hasMoreElements() && ipv6Iface == null;) { + NetworkInterface interf = interfaces.nextElement(); + for (Enumeration addresses = interf.getInetAddresses(); addresses.hasMoreElements() && ipv6Iface == null;) { + InetAddress address = addresses.nextElement(); + if (address instanceof Inet6Address) { + ipv6Iface = interf; + } + } + } + + final String group = "FF33::1200"; + final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface); + final String msg = "ohi"; + final ActorRef sink = getRef(); + + final ActorRef listener = system.actorOf(Props.create(Listener.class, ipv6Iface.getName(), group, port, sink)); + final ActorRef sender = system.actorOf(Props.create(Sender.class, group, port, msg)); + + expectMsgEquals(msg); + + // unbind + system.stop(listener); + }}; + } + + @AfterClass + public static void tearDown() { + JavaTestKit.shutdownActorSystem(system); + system = null; + } +} diff --git a/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala b/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala new file mode 100644 index 0000000000..bb95e9fc66 --- /dev/null +++ b/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.io + +import java.net.{Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily} +import java.nio.channels.DatagramChannel + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestKit +import org.scalatest.{BeforeAndAfter, WordSpecLike} + +import scala.collection.JavaConversions.enumerationAsScalaIterator + +class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfter { + + "listener" should { + "send message back to sink" in { + val Some(ipv6Iface) = NetworkInterface.getNetworkInterfaces.collectFirst { + case iface if iface.getInetAddresses.exists(_.isInstanceOf[Inet6Address]) => iface + } + + val port = TestUtils.temporaryUdpIpv6Port(ipv6Iface) + + val (iface, group, msg, sink) = (ipv6Iface.getName, "FF33::1200", "ohi", testActor) + + val listener = system.actorOf(Props(classOf[Listener], iface, group, port, sink)) + val sender = system.actorOf(Props(classOf[Sender], group, port, msg)) + + expectMsg(msg) + + // unbind + system.stop(listener) + } + } + + def afterAll = { + TestKit.shutdownActorSystem(system) + } + +} + +object TestUtils { + def temporaryUdpIpv6Port(iface: NetworkInterface) = { + val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket() + serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0)) + val port = serverSocket.getLocalPort + serverSocket.close() + port + } +} diff --git a/scripts/build/extra-build-steps.sh b/scripts/build/extra-build-steps.sh index 51cbc13afb..94ea02f99d 100755 --- a/scripts/build/extra-build-steps.sh +++ b/scripts/build/extra-build-steps.sh @@ -3,6 +3,8 @@ # defaults declare -r default_java_home="/usr/local/share/java/jdk6" declare -r default_java8_home="/usr/local/share/java/jdk8" +declare -r default_sbt_jar="/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar" +declare -r default_ivy_home="~/.ivy2" # get the source location for this script; handles symlinks function get_script_path { @@ -64,6 +66,16 @@ function mvncleantest { try mvn clean test "mvn execution in $2 failed" } +# run sbt clean test using the specified java home in the specified directory +function sbtcleantest { + tmp="$script_dir/../../$2" + try cd "$tmp" "can't step into project directory: $tmp" + orig_path="$PATH" + export PATH="$1/bin:$PATH" + try java -jar $sbt_jar -Dsbt.ivy.home=$ivy_home clean test "sbt execution in $2 failed" + export PATH="$orig_path" +} + # initialize variables with defaults and override from environment declare java_home="$default_java_home" if [ $AKKA_BUILD_JAVA_HOME ]; then @@ -75,6 +87,16 @@ if [ $AKKA_BUILD_JAVA8_HOME ]; then java8_home="$AKKA_BUILD_JAVA8_HOME" fi +declare sbt_jar="$default_sbt_jar" +if [ $AKKA_BUILD_SBT_JAR ]; then + sbt_jar="$AKKA_BUILD_SBT_JAR" +fi + +declare ivy_home="$default_ivy_home" +if [ $AKKA_BUILD_IVY_HOME ]; then + ivy_home="$AKKA_BUILD_IVY_HOME" +fi + # process options and set flags while true; do case "$1" in @@ -110,3 +132,5 @@ try cd "$tmp" "can't step into project directory: $tmp" export JAVA_HOME="$java8_home" try mvn clean compile exec:java -Dexec.mainClass="akka.Main" -Dexec.args="sample.hello.HelloWorld" "mvn execution in $sample_dir failed" try mvn exec:java -Dexec.mainClass="sample.hello.Main2" "mvn execution in $sample_dir failed" + +sbtcleantest "$java8_home" "akka-samples/akka-docs-udp-multicast"