Merge pull request #15627 from 2m/udp-proto-family
!act #15626 expose DatagramChannel creation in SocketOption
This commit is contained in:
commit
200d6eb0d3
12 changed files with 466 additions and 6 deletions
|
|
@ -16,7 +16,7 @@ class UdpIntegrationSpec extends AkkaSpec("""
|
||||||
akka.loglevel = INFO
|
akka.loglevel = INFO
|
||||||
akka.actor.serialize-creators = on""") with ImplicitSender {
|
akka.actor.serialize-creators = on""") with ImplicitSender {
|
||||||
|
|
||||||
val addresses = temporaryServerAddresses(5, udp = true)
|
val addresses = temporaryServerAddresses(6, udp = true)
|
||||||
|
|
||||||
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
|
||||||
val commander = TestProbe()
|
val commander = TestProbe()
|
||||||
|
|
@ -91,6 +91,14 @@ class UdpIntegrationSpec extends AkkaSpec("""
|
||||||
commander.expectMsg(Bound(addresses(4)))
|
commander.expectMsg(Bound(addresses(4)))
|
||||||
assert(assertOption.afterCalled === 1)
|
assert(assertOption.afterCalled === 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"call DatagramChannelCreator.create method when opening channel" in {
|
||||||
|
val commander = TestProbe()
|
||||||
|
val assertOption = AssertOpenDatagramChannel()
|
||||||
|
commander.send(IO(Udp), Bind(testActor, addresses(5), options = List(assertOption)))
|
||||||
|
commander.expectMsg(Bound(addresses(5)))
|
||||||
|
assert(assertOption.openCalled === 1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -112,3 +120,12 @@ private case class AssertAfterConnect() extends SocketOption {
|
||||||
afterCalled += 1
|
afterCalled += 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private case class AssertOpenDatagramChannel() extends DatagramChannelCreator {
|
||||||
|
var openCalled = 0
|
||||||
|
|
||||||
|
override def create() = {
|
||||||
|
openCalled += 1
|
||||||
|
super.create()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ object Inet {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* SocketOption is a package of data (from the user) and associated
|
* SocketOption is a package of data (from the user) and associated
|
||||||
* behavior (how to apply that to a socket).
|
* behavior (how to apply that to a channel).
|
||||||
*/
|
*/
|
||||||
trait SocketOption {
|
trait SocketOption {
|
||||||
|
|
||||||
|
|
@ -47,6 +47,26 @@ object Inet {
|
||||||
def afterConnect(c: SocketChannel): Unit = ()
|
def afterConnect(c: SocketChannel): Unit = ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DatagramChannel creation behavior.
|
||||||
|
*/
|
||||||
|
class DatagramChannelCreator extends SocketOption {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open and return new DatagramChannel.
|
||||||
|
*
|
||||||
|
* [[scala.throws]] is needed because [[DatagramChannel.open]] method
|
||||||
|
* can throw an exception.
|
||||||
|
*/
|
||||||
|
@throws(classOf[Exception])
|
||||||
|
def create(): DatagramChannel = DatagramChannel.open()
|
||||||
|
}
|
||||||
|
|
||||||
|
object DatagramChannelCreator {
|
||||||
|
val default = new DatagramChannelCreator()
|
||||||
|
def apply() = default
|
||||||
|
}
|
||||||
|
|
||||||
object SO {
|
object SO {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -165,4 +185,43 @@ object Inet {
|
||||||
def trafficClass(tc: Int) = TrafficClass(tc)
|
def trafficClass(tc: Int) = TrafficClass(tc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: AbstractSocketOption is a package of data (from the user) and associated
|
||||||
|
* behavior (how to apply that to a channel).
|
||||||
|
*/
|
||||||
|
abstract class AbstractSocketOption extends SocketOption {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option before bind() is called
|
||||||
|
*/
|
||||||
|
override def beforeBind(ds: DatagramChannel): Unit = ()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option before bind() is called
|
||||||
|
*/
|
||||||
|
override def beforeBind(ss: ServerSocketChannel): Unit = ()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option before bind() is called
|
||||||
|
*/
|
||||||
|
override def beforeBind(s: SocketChannel): Unit = ()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option after connect returned (i.e. on
|
||||||
|
* the slave socket for servers).
|
||||||
|
*/
|
||||||
|
override def afterConnect(c: DatagramChannel): Unit = ()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option after connect returned (i.e. on
|
||||||
|
* the slave socket for servers).
|
||||||
|
*/
|
||||||
|
override def afterConnect(c: ServerSocketChannel): Unit = ()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to be taken for this option after connect returned (i.e. on
|
||||||
|
* the slave socket for servers).
|
||||||
|
*/
|
||||||
|
override def afterConnect(c: SocketChannel): Unit = ()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import scala.util.control.NonFatal
|
||||||
import akka.actor.{ ActorLogging, Actor, ActorRef }
|
import akka.actor.{ ActorLogging, Actor, ActorRef }
|
||||||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
import akka.io.Inet.DatagramChannelCreator
|
||||||
import akka.io.SelectionHandler._
|
import akka.io.SelectionHandler._
|
||||||
import akka.io.Udp._
|
import akka.io.Udp._
|
||||||
|
|
||||||
|
|
@ -31,7 +32,9 @@ private[io] class UdpListener(val udp: UdpExt,
|
||||||
|
|
||||||
context.watch(bind.handler) // sign death pact
|
context.watch(bind.handler) // sign death pact
|
||||||
|
|
||||||
val channel = DatagramChannel.open
|
val channel = bind.options.collectFirst {
|
||||||
|
case creator: DatagramChannelCreator ⇒ creator
|
||||||
|
}.getOrElse(DatagramChannelCreator()).create()
|
||||||
channel.configureBlocking(false)
|
channel.configureBlocking(false)
|
||||||
|
|
||||||
val localAddress =
|
val localAddress =
|
||||||
|
|
|
||||||
|
|
@ -83,3 +83,24 @@ the biggest difference is the absence of remote address information in
|
||||||
in the case of connection-based UDP the security check is cached after
|
in the case of connection-based UDP the security check is cached after
|
||||||
connect, thus writes do not suffer an additional performance penalty.
|
connect, thus writes do not suffer an additional performance penalty.
|
||||||
|
|
||||||
|
UDP Multicast
|
||||||
|
------------------------------------------
|
||||||
|
|
||||||
|
If you want to use UDP multicast you will need to use Java 7. Akka provides
|
||||||
|
a way to control various options of ``DatagramChannel`` through the
|
||||||
|
``akka.io.Inet.SocketOption`` interface. The example below shows
|
||||||
|
how to setup a receiver of multicast messages using IPv6 protocol.
|
||||||
|
|
||||||
|
To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator``
|
||||||
|
class which implements ``akka.io.Inet.SocketOption``. Provide custom logic
|
||||||
|
for opening a datagram channel by overriding :meth:`create` method.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#inet6-protocol-family
|
||||||
|
|
||||||
|
Another socket option will be needed to join a multicast group.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#multicast-group
|
||||||
|
|
||||||
|
Socket options must be provided to :meth:`UdpMessage.bind` command.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java#bind
|
||||||
|
|
|
||||||
|
|
@ -51,9 +51,10 @@ Which turns out to be useful in many systems where same-state transitions actual
|
||||||
|
|
||||||
In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead.
|
In case you do *not* want to trigger a state transition event when effectively performing an ``X->X`` transition, use ``stay()`` instead.
|
||||||
|
|
||||||
SocketOption's method signature changed to access channel
|
More control over Channel properties in Akka-IO
|
||||||
=========================================================
|
===============================================
|
||||||
Server Socket Methods have been changed to take a channel instead of a socket. The channel's socket can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7.
|
Method signatures for ``SocketOption`` have been changed to take a channel instead of a socket. The channel's socket
|
||||||
|
can be retrieved by calling ``channel.socket``. This allows for accessing new NIO features in Java 7.
|
||||||
|
|
||||||
======================================== =====================================
|
======================================== =====================================
|
||||||
2.3 2.4
|
2.3 2.4
|
||||||
|
|
@ -66,6 +67,9 @@ Server Socket Methods have been changed to take a channel instead of a socket.
|
||||||
``afterConnect(Socket)`` ``afterConnect(SocketChannel)``
|
``afterConnect(Socket)`` ``afterConnect(SocketChannel)``
|
||||||
======================================== =====================================
|
======================================== =====================================
|
||||||
|
|
||||||
|
A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for
|
||||||
|
custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels.
|
||||||
|
|
||||||
Removed Deprecated Features
|
Removed Deprecated Features
|
||||||
===========================
|
===========================
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -83,4 +83,24 @@ the biggest difference is the absence of remote address information in
|
||||||
in the case of connection-based UDP the security check is cached after
|
in the case of connection-based UDP the security check is cached after
|
||||||
connect, thus writes do not suffer an additional performance penalty.
|
connect, thus writes do not suffer an additional performance penalty.
|
||||||
|
|
||||||
|
UDP Multicast
|
||||||
|
------------------------------------------
|
||||||
|
|
||||||
|
If you want to use UDP multicast you will need to use Java 7. Akka provides
|
||||||
|
a way to control various options of ``DatagramChannel`` through the
|
||||||
|
``akka.io.Inet.SocketOption`` interface. The example below shows
|
||||||
|
how to setup a receiver of multicast messages using IPv6 protocol.
|
||||||
|
|
||||||
|
To select a Protocol Family you must extend ``akka.io.Inet.DatagramChannelCreator``
|
||||||
|
class which extends ``akka.io.Inet.SocketOption``. Provide custom logic
|
||||||
|
for opening a datagram channel by overriding :meth:`create` method.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#inet6-protocol-family
|
||||||
|
|
||||||
|
Another socket option will be needed to join a multicast group.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#multicast-group
|
||||||
|
|
||||||
|
Socket options must be provided to :class:`UdpMessage.Bind` message.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala#bind
|
||||||
|
|
|
||||||
19
akka-samples/akka-docs-udp-multicast/build.sbt
Normal file
19
akka-samples/akka-docs-udp-multicast/build.sbt
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
name := "akka-docs-udp-multicast"
|
||||||
|
|
||||||
|
version := "2.4-SNAPSHOT"
|
||||||
|
|
||||||
|
scalaVersion := "2.10.4"
|
||||||
|
|
||||||
|
compileOrder := CompileOrder.ScalaThenJava
|
||||||
|
|
||||||
|
javacOptions ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint")
|
||||||
|
|
||||||
|
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
|
||||||
|
|
||||||
|
libraryDependencies ++= Seq(
|
||||||
|
"com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT",
|
||||||
|
"com.typesafe.akka" %% "akka-testkit" % "2.4-SNAPSHOT",
|
||||||
|
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
|
||||||
|
"junit" % "junit" % "4.11" % "test",
|
||||||
|
"com.novocode" % "junit-interface" % "0.10" % "test"
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.io;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.UntypedActor;
|
||||||
|
import akka.event.Logging;
|
||||||
|
import akka.event.LoggingAdapter;
|
||||||
|
import akka.io.Inet;
|
||||||
|
import akka.io.Udp;
|
||||||
|
import akka.io.UdpMessage;
|
||||||
|
import akka.util.ByteString;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.NetworkInterface;
|
||||||
|
import java.net.StandardProtocolFamily;
|
||||||
|
import java.nio.channels.DatagramChannel;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
public class JavaUdpMulticast {
|
||||||
|
//#inet6-protocol-family
|
||||||
|
public static class Inet6ProtocolFamily extends Inet.DatagramChannelCreator {
|
||||||
|
@Override
|
||||||
|
public DatagramChannel create() throws Exception {
|
||||||
|
return DatagramChannel.open(StandardProtocolFamily.INET6);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#inet6-protocol-family
|
||||||
|
|
||||||
|
//#multicast-group
|
||||||
|
public static class MulticastGroup extends Inet.AbstractSocketOption {
|
||||||
|
private String address;
|
||||||
|
private String interf;
|
||||||
|
|
||||||
|
public MulticastGroup(String address, String interf) {
|
||||||
|
this.address = address;
|
||||||
|
this.interf = interf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterConnect(DatagramChannel c) {
|
||||||
|
try {
|
||||||
|
InetAddress group = InetAddress.getByName(address);
|
||||||
|
NetworkInterface networkInterface = NetworkInterface.getByName(interf);
|
||||||
|
c.join(group, networkInterface);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
System.out.println("Unable to join multicast group.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#multicast-group
|
||||||
|
|
||||||
|
public static class Listener extends UntypedActor {
|
||||||
|
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
|
||||||
|
ActorRef sink;
|
||||||
|
|
||||||
|
public Listener(String iface, String group, Integer port, ActorRef sink) {
|
||||||
|
this.sink = sink;
|
||||||
|
|
||||||
|
//#bind
|
||||||
|
List<Inet.SocketOption> options = new ArrayList<>();
|
||||||
|
options.add(new Inet6ProtocolFamily());
|
||||||
|
options.add(new MulticastGroup(group, iface));
|
||||||
|
|
||||||
|
final ActorRef mgr = Udp.get(getContext().system()).getManager();
|
||||||
|
// listen for datagrams on this address
|
||||||
|
InetSocketAddress endpoint = new InetSocketAddress(port);
|
||||||
|
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
|
||||||
|
//#bind
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
if (msg instanceof Udp.Bound) {
|
||||||
|
final Udp.Bound b = (Udp.Bound) msg;
|
||||||
|
log.info("Bound to " + b.localAddress());
|
||||||
|
} else if (msg instanceof Udp.Received) {
|
||||||
|
final Udp.Received r = (Udp.Received) msg;
|
||||||
|
log.info("Received '" + r.data().decodeString("utf-8") + "' from '" + r.sender() + "'");
|
||||||
|
} else unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Sender extends UntypedActor {
|
||||||
|
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
|
||||||
|
|
||||||
|
String group;
|
||||||
|
Integer port;
|
||||||
|
String message;
|
||||||
|
|
||||||
|
public Sender(String group, Integer port, String msg) {
|
||||||
|
this.group = group;
|
||||||
|
this.port = port;
|
||||||
|
this.message = msg;
|
||||||
|
|
||||||
|
List<Inet.SocketOption> options = new ArrayList<>();
|
||||||
|
options.add(new Inet6ProtocolFamily());
|
||||||
|
|
||||||
|
final ActorRef mgr = Udp.get(getContext().system()).getManager();
|
||||||
|
mgr.tell(UdpMessage.simpleSender(options), getSelf());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
if (msg instanceof Udp.SimpleSenderReady) {
|
||||||
|
InetSocketAddress remote = new InetSocketAddress(group, port);
|
||||||
|
log.info("Sending message to " + remote);
|
||||||
|
getSender().tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf());
|
||||||
|
} else unhandled(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.io
|
||||||
|
|
||||||
|
import java.net.{InetAddress, InetSocketAddress, NetworkInterface, StandardProtocolFamily}
|
||||||
|
import java.nio.channels.DatagramChannel
|
||||||
|
|
||||||
|
import akka.actor.{Actor, ActorLogging, ActorRef}
|
||||||
|
import akka.io.Inet.{DatagramChannelCreator, SocketOption}
|
||||||
|
import akka.io.{IO, Udp}
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
//#inet6-protocol-family
|
||||||
|
final case class Inet6ProtocolFamily() extends DatagramChannelCreator {
|
||||||
|
override def create() =
|
||||||
|
DatagramChannel.open(StandardProtocolFamily.INET6)
|
||||||
|
}
|
||||||
|
//#inet6-protocol-family
|
||||||
|
|
||||||
|
//#multicast-group
|
||||||
|
final case class MulticastGroup(address: String, interface: String) extends SocketOption {
|
||||||
|
override def afterConnect(c: DatagramChannel) {
|
||||||
|
val group = InetAddress.getByName(address)
|
||||||
|
val networkInterface = NetworkInterface.getByName(interface)
|
||||||
|
c.join(group, networkInterface)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#multicast-group
|
||||||
|
|
||||||
|
class Listener(iface: String, group: String, port: Int, sink: ActorRef) extends Actor with ActorLogging {
|
||||||
|
//#bind
|
||||||
|
import context.system
|
||||||
|
val opts = List(Inet6ProtocolFamily(), MulticastGroup(group, iface))
|
||||||
|
IO(Udp) ! Udp.Bind(self, new InetSocketAddress(port), opts)
|
||||||
|
//#bind
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case Udp.Bound(to) => log.info(s"Bound to $to")
|
||||||
|
case Udp.Received(data, remote) =>
|
||||||
|
val msg = data.decodeString("utf-8")
|
||||||
|
log.info(s"Received '$msg' from '$remote'")
|
||||||
|
sink ! msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Sender(group: String, port: Int, msg: String) extends Actor with ActorLogging {
|
||||||
|
import context.system
|
||||||
|
IO(Udp) ! Udp.SimpleSender(List(Inet6ProtocolFamily()))
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case Udp.SimpleSenderReady => {
|
||||||
|
val remote = new InetSocketAddress(group, port)
|
||||||
|
log.info(s"Sending message to $remote")
|
||||||
|
sender() ! Udp.Send(ByteString(msg), remote)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.io;
|
||||||
|
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.Props;
|
||||||
|
import akka.testkit.JavaTestKit;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.net.Inet6Address;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.NetworkInterface;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
|
||||||
|
public class JavaUdpMulticastTest {
|
||||||
|
|
||||||
|
static ActorSystem system;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
system = ActorSystem.create("JavaUdpMulticastTest");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUdpMulticast() throws Exception {
|
||||||
|
new JavaTestKit(system) {{
|
||||||
|
NetworkInterface ipv6Iface = null;
|
||||||
|
for (Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces(); interfaces.hasMoreElements() && ipv6Iface == null;) {
|
||||||
|
NetworkInterface interf = interfaces.nextElement();
|
||||||
|
for (Enumeration<InetAddress> addresses = interf.getInetAddresses(); addresses.hasMoreElements() && ipv6Iface == null;) {
|
||||||
|
InetAddress address = addresses.nextElement();
|
||||||
|
if (address instanceof Inet6Address) {
|
||||||
|
ipv6Iface = interf;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final String group = "FF33::1200";
|
||||||
|
final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface);
|
||||||
|
final String msg = "ohi";
|
||||||
|
final ActorRef sink = getRef();
|
||||||
|
|
||||||
|
final ActorRef listener = system.actorOf(Props.create(Listener.class, ipv6Iface.getName(), group, port, sink));
|
||||||
|
final ActorRef sender = system.actorOf(Props.create(Sender.class, group, port, msg));
|
||||||
|
|
||||||
|
expectMsgEquals(msg);
|
||||||
|
|
||||||
|
// unbind
|
||||||
|
system.stop(listener);
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
JavaTestKit.shutdownActorSystem(system);
|
||||||
|
system = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.io
|
||||||
|
|
||||||
|
import java.net.{Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily}
|
||||||
|
import java.nio.channels.DatagramChannel
|
||||||
|
|
||||||
|
import akka.actor.{ActorSystem, Props}
|
||||||
|
import akka.testkit.TestKit
|
||||||
|
import org.scalatest.{BeforeAndAfter, WordSpecLike}
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions.enumerationAsScalaIterator
|
||||||
|
|
||||||
|
class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfter {
|
||||||
|
|
||||||
|
"listener" should {
|
||||||
|
"send message back to sink" in {
|
||||||
|
val Some(ipv6Iface) = NetworkInterface.getNetworkInterfaces.collectFirst {
|
||||||
|
case iface if iface.getInetAddresses.exists(_.isInstanceOf[Inet6Address]) => iface
|
||||||
|
}
|
||||||
|
|
||||||
|
val port = TestUtils.temporaryUdpIpv6Port(ipv6Iface)
|
||||||
|
|
||||||
|
val (iface, group, msg, sink) = (ipv6Iface.getName, "FF33::1200", "ohi", testActor)
|
||||||
|
|
||||||
|
val listener = system.actorOf(Props(classOf[Listener], iface, group, port, sink))
|
||||||
|
val sender = system.actorOf(Props(classOf[Sender], group, port, msg))
|
||||||
|
|
||||||
|
expectMsg(msg)
|
||||||
|
|
||||||
|
// unbind
|
||||||
|
system.stop(listener)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def afterAll = {
|
||||||
|
TestKit.shutdownActorSystem(system)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
object TestUtils {
|
||||||
|
def temporaryUdpIpv6Port(iface: NetworkInterface) = {
|
||||||
|
val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket()
|
||||||
|
serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0))
|
||||||
|
val port = serverSocket.getLocalPort
|
||||||
|
serverSocket.close()
|
||||||
|
port
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,6 +3,8 @@
|
||||||
# defaults
|
# defaults
|
||||||
declare -r default_java_home="/usr/local/share/java/jdk6"
|
declare -r default_java_home="/usr/local/share/java/jdk6"
|
||||||
declare -r default_java8_home="/usr/local/share/java/jdk8"
|
declare -r default_java8_home="/usr/local/share/java/jdk8"
|
||||||
|
declare -r default_sbt_jar="/usr/share/sbt-launcher-packaging/bin/sbt-launch.jar"
|
||||||
|
declare -r default_ivy_home="~/.ivy2"
|
||||||
|
|
||||||
# get the source location for this script; handles symlinks
|
# get the source location for this script; handles symlinks
|
||||||
function get_script_path {
|
function get_script_path {
|
||||||
|
|
@ -64,6 +66,16 @@ function mvncleantest {
|
||||||
try mvn clean test "mvn execution in $2 failed"
|
try mvn clean test "mvn execution in $2 failed"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# run sbt clean test using the specified java home in the specified directory
|
||||||
|
function sbtcleantest {
|
||||||
|
tmp="$script_dir/../../$2"
|
||||||
|
try cd "$tmp" "can't step into project directory: $tmp"
|
||||||
|
orig_path="$PATH"
|
||||||
|
export PATH="$1/bin:$PATH"
|
||||||
|
try java -jar $sbt_jar -Dsbt.ivy.home=$ivy_home clean test "sbt execution in $2 failed"
|
||||||
|
export PATH="$orig_path"
|
||||||
|
}
|
||||||
|
|
||||||
# initialize variables with defaults and override from environment
|
# initialize variables with defaults and override from environment
|
||||||
declare java_home="$default_java_home"
|
declare java_home="$default_java_home"
|
||||||
if [ $AKKA_BUILD_JAVA_HOME ]; then
|
if [ $AKKA_BUILD_JAVA_HOME ]; then
|
||||||
|
|
@ -75,6 +87,16 @@ if [ $AKKA_BUILD_JAVA8_HOME ]; then
|
||||||
java8_home="$AKKA_BUILD_JAVA8_HOME"
|
java8_home="$AKKA_BUILD_JAVA8_HOME"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
declare sbt_jar="$default_sbt_jar"
|
||||||
|
if [ $AKKA_BUILD_SBT_JAR ]; then
|
||||||
|
sbt_jar="$AKKA_BUILD_SBT_JAR"
|
||||||
|
fi
|
||||||
|
|
||||||
|
declare ivy_home="$default_ivy_home"
|
||||||
|
if [ $AKKA_BUILD_IVY_HOME ]; then
|
||||||
|
ivy_home="$AKKA_BUILD_IVY_HOME"
|
||||||
|
fi
|
||||||
|
|
||||||
# process options and set flags
|
# process options and set flags
|
||||||
while true; do
|
while true; do
|
||||||
case "$1" in
|
case "$1" in
|
||||||
|
|
@ -110,3 +132,5 @@ try cd "$tmp" "can't step into project directory: $tmp"
|
||||||
export JAVA_HOME="$java8_home"
|
export JAVA_HOME="$java8_home"
|
||||||
try mvn clean compile exec:java -Dexec.mainClass="akka.Main" -Dexec.args="sample.hello.HelloWorld" "mvn execution in $sample_dir failed"
|
try mvn clean compile exec:java -Dexec.mainClass="akka.Main" -Dexec.args="sample.hello.HelloWorld" "mvn execution in $sample_dir failed"
|
||||||
try mvn exec:java -Dexec.mainClass="sample.hello.Main2" "mvn execution in $sample_dir failed"
|
try mvn exec:java -Dexec.mainClass="sample.hello.Main2" "mvn execution in $sample_dir failed"
|
||||||
|
|
||||||
|
sbtcleantest "$java8_home" "akka-samples/akka-docs-udp-multicast"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue