diff --git a/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java b/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java index 42c13819e5..ac877b227b 100644 --- a/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java +++ b/akka-samples/akka-docs-udp-multicast/src/main/java/docs/io/JavaUdpMulticast.java @@ -80,10 +80,13 @@ public class JavaUdpMulticast { public void onReceive(Object msg) { if (msg instanceof Udp.Bound) { final Udp.Bound b = (Udp.Bound) msg; - log.info("Bound to " + b.localAddress()); + log.info("Bound to {}", b.localAddress()); + sink.tell(b, getSelf()); } else if (msg instanceof Udp.Received) { final Udp.Received r = (Udp.Received) msg; - log.info("Received '" + r.data().decodeString("utf-8") + "' from '" + r.sender() + "'"); + final String txt = r.data().decodeString("utf-8"); + log.info("Received '{}' from {}", txt, r.sender()); + sink.tell(txt, getSelf()); } else unhandled(msg); } } @@ -91,11 +94,13 @@ public class JavaUdpMulticast { public static class Sender extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); + String iface; String group; Integer port; String message; - public Sender(String group, Integer port, String msg) { + public Sender(String iface, String group, Integer port, String msg) { + this.iface = iface; this.group = group; this.port = port; this.message = msg; @@ -110,7 +115,7 @@ public class JavaUdpMulticast { @Override public void onReceive(Object msg) { if (msg instanceof Udp.SimpleSenderReady) { - InetSocketAddress remote = new InetSocketAddress(group, port); + InetSocketAddress remote = new InetSocketAddress(group + "%" + iface, port); log.info("Sending message to " + remote); getSender().tell(UdpMessage.send(ByteString.fromString(message), remote), getSelf()); } else unhandled(msg); diff --git a/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala b/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala index 7a361f13a9..656d7cfa34 100644 --- a/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala +++ b/akka-samples/akka-docs-udp-multicast/src/main/scala/ScalaUdpMulticast.scala @@ -37,22 +37,24 @@ class Listener(iface: String, group: String, port: Int, sink: ActorRef) extends //#bind def receive = { - case Udp.Bound(to) => log.info(s"Bound to $to") + case b @ Udp.Bound(to) => + log.info("Bound to {}", to) + sink ! (b) case Udp.Received(data, remote) => val msg = data.decodeString("utf-8") - log.info(s"Received '$msg' from '$remote'") + log.info("Received '{}' from {}", msg, remote) sink ! msg } } -class Sender(group: String, port: Int, msg: String) extends Actor with ActorLogging { +class Sender(iface: String, group: String, port: Int, msg: String) extends Actor with ActorLogging { import context.system IO(Udp) ! Udp.SimpleSender(List(Inet6ProtocolFamily())) def receive = { case Udp.SimpleSenderReady => { - val remote = new InetSocketAddress(group, port) - log.info(s"Sending message to $remote") + val remote = new InetSocketAddress(s"$group%$iface", port) + log.info("Sending message to {}", remote) sender() ! Udp.Send(ByteString(msg), remote) } } diff --git a/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java b/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java index 7f39ac6b77..f9c6ba4c64 100644 --- a/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java +++ b/akka-samples/akka-docs-udp-multicast/src/test/java/docs/io/JavaUdpMulticastTest.java @@ -7,6 +7,7 @@ package docs.io; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.io.Udp; import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -16,6 +17,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Enumeration; +import java.util.Random; public class JavaUdpMulticastTest { @@ -40,14 +42,23 @@ public class JavaUdpMulticastTest { } } - final String group = "FF33::1200"; + // host assigned link local multicast address http://tools.ietf.org/html/rfc3307#section-4.3.2 + // generate a random 32 bit multicast address with the high order bit set + final String randomAddress = Long.toHexString(((long) Math.abs(new Random().nextInt())) | (1L << 31)).toUpperCase(); + final StringBuilder groupBuilder = new StringBuilder("FF02:"); + for (int i = 0; i < 2; i += 1) { + groupBuilder.append(":"); + groupBuilder.append(randomAddress.subSequence(i * 4, i * 4 + 4)); + } + final String group = groupBuilder.toString(); final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface); final String msg = "ohi"; final ActorRef sink = getRef(); + final String iface = ipv6Iface.getName(); - final ActorRef listener = system.actorOf(Props.create(Listener.class, ipv6Iface.getName(), group, port, sink)); - final ActorRef sender = system.actorOf(Props.create(Sender.class, group, port, msg)); - + final ActorRef listener = system.actorOf(Props.create(JavaUdpMulticast.Listener.class, iface, group, port, sink)); + expectMsgClass(Udp.Bound.class); + final ActorRef sender = system.actorOf(Props.create(JavaUdpMulticast.Sender.class, iface, group, port, msg)); expectMsgEquals(msg); // unbind diff --git a/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala b/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala index bb95e9fc66..8ca25a1694 100644 --- a/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala +++ b/akka-samples/akka-docs-udp-multicast/src/test/scala/ScalaUdpMulticastSpec.scala @@ -6,8 +6,10 @@ package docs.io import java.net.{Inet6Address, InetSocketAddress, NetworkInterface, StandardProtocolFamily} import java.nio.channels.DatagramChannel +import scala.util.Random import akka.actor.{ActorSystem, Props} +import akka.io.Udp import akka.testkit.TestKit import org.scalatest.{BeforeAndAfter, WordSpecLike} @@ -21,13 +23,18 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec") case iface if iface.getInetAddresses.exists(_.isInstanceOf[Inet6Address]) => iface } + // host assigned link local multicast address http://tools.ietf.org/html/rfc3307#section-4.3.2 + // generate a random 32 bit multicast address with the high order bit set + val randomAddress: String = (Random.nextInt().abs.toLong | (1L << 31)).toHexString.toUpperCase + val group = randomAddress.grouped(4).mkString("FF02::", ":", "") val port = TestUtils.temporaryUdpIpv6Port(ipv6Iface) - - val (iface, group, msg, sink) = (ipv6Iface.getName, "FF33::1200", "ohi", testActor) + val msg = "ohi" + val sink = testActor + val iface = ipv6Iface.getName val listener = system.actorOf(Props(classOf[Listener], iface, group, port, sink)) - val sender = system.actorOf(Props(classOf[Sender], group, port, msg)) - + expectMsgType[Udp.Bound] + val sender = system.actorOf(Props(classOf[Sender], iface, group, port, msg)) expectMsg(msg) // unbind