From 4f013a3d1e0d9d45b0498253847f9568e6492c96 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 9 Nov 2016 17:36:04 +0100 Subject: [PATCH] avoid infinite blocking in TcpConnectionSpec #21375 * use socket timeout * additional cleanup of socket utils --- .../scala/akka/io/TcpConnectionSpec.scala | 23 +++++++++----- .../code/docs/io/JavaUdpMulticastTest.java | 4 ++- .../code/docs/stream/io/StreamTcpDocTest.java | 6 ++-- .../rst/java/code/docs/util/SocketUtils.java | 31 ------------------- .../code/docs/io/ScalaUdpMulticastSpec.scala | 12 ++----- .../docs/stream/io/StreamTcpDocSpec.scala | 6 ++-- .../rst/scala/code/docs/utils/TestUtils.scala | 24 -------------- .../main/scala/akka/testkit/SocketUtil.scala | 15 +++++++++ 8 files changed, 42 insertions(+), 79 deletions(-) delete mode 100644 akka-docs/rst/java/code/docs/util/SocketUtils.java delete mode 100644 akka-docs/rst/scala/code/docs/utils/TestUtils.scala diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 5524a0a678..a59d0ecc12 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -26,6 +26,7 @@ import akka.testkit.{ AkkaSpec, EventFilter, SocketUtil, TestActorRef, TestProbe import akka.util.{ ByteString, Helpers } import akka.testkit.SocketUtil._ import java.util.Random +import java.net.SocketTimeoutException object TcpConnectionSpec { case class Ack(i: Int) extends Event @@ -836,13 +837,21 @@ class TcpConnectionSpec extends AkkaSpec(""" IO(Tcp) ! Connect(bindAddress) - val socket = serverSocket.accept() - connectionProbe.expectMsgType[Tcp.Connected] - val connectionActor = connectionProbe.sender() - connectionActor ! PoisonPill - watch(connectionActor) - expectTerminated(connectionActor) - an[IOException] should be thrownBy { socket.getInputStream.read() } + try { + serverSocket.setSoTimeout(remainingOrDefault.toMillis.toInt) + val socket = serverSocket.accept() + connectionProbe.expectMsgType[Tcp.Connected] + val connectionActor = connectionProbe.sender() + connectionActor ! PoisonPill + watch(connectionActor) + expectTerminated(connectionActor) + an[IOException] should be thrownBy { socket.getInputStream.read() } + } catch { + case e: SocketTimeoutException ⇒ + // thrown by serverSocket.accept, this may happen if network is offline + info(e.getMessage) + pending + } } } diff --git a/akka-docs/rst/java/code/docs/io/JavaUdpMulticastTest.java b/akka-docs/rst/java/code/docs/io/JavaUdpMulticastTest.java index c4176f4ad5..0d0bb0189f 100644 --- a/akka-docs/rst/java/code/docs/io/JavaUdpMulticastTest.java +++ b/akka-docs/rst/java/code/docs/io/JavaUdpMulticastTest.java @@ -9,6 +9,8 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.io.Udp; import akka.testkit.JavaTestKit; +import akka.testkit.SocketUtil; + import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -62,7 +64,7 @@ public class JavaUdpMulticastTest extends AbstractJavaTest { groupBuilder.append(randomAddress.subSequence(i * 4, i * 4 + 4)); } final String group = groupBuilder.toString(); - final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface); + final Integer port = SocketUtil.temporaryUdpIpv6Port(ipv6Iface); final String msg = "ohi"; final ActorRef sink = getRef(); final String iface = ipv6Iface.getName(); diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java index cdac9204ce..9a3faddb8c 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java @@ -12,7 +12,6 @@ import docs.AbstractJavaTest; import docs.stream.SilenceSystemOut; import java.net.InetSocketAddress; -import docs.util.SocketUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -24,6 +23,7 @@ import akka.stream.javadsl.*; import akka.stream.javadsl.Tcp.*; import akka.stream.stage.*; import akka.testkit.JavaTestKit; +import akka.testkit.SocketUtil; import akka.testkit.TestProbe; import akka.util.ByteString; @@ -70,7 +70,7 @@ public class StreamTcpDocTest extends AbstractJavaTest { } { - final InetSocketAddress localhost = SocketUtils.temporaryServerAddress(); + final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false); final Source> connections = Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7 @@ -93,7 +93,7 @@ public class StreamTcpDocTest extends AbstractJavaTest { @Test public void actuallyWorkingClientServerApp() { - final InetSocketAddress localhost = SocketUtils.temporaryServerAddress(); + final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false); final TestProbe serverProbe = new TestProbe(system); diff --git a/akka-docs/rst/java/code/docs/util/SocketUtils.java b/akka-docs/rst/java/code/docs/util/SocketUtils.java deleted file mode 100644 index 16603f9c4f..0000000000 --- a/akka-docs/rst/java/code/docs/util/SocketUtils.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2015-2016 Lightbend Inc. - */ -package docs.util; - -import java.net.InetSocketAddress; - -import java.io.IOException; -import java.net.ServerSocket; -import java.nio.channels.ServerSocketChannel; - -public class SocketUtils { - - public static InetSocketAddress temporaryServerAddress(String hostname) { - try { - ServerSocket socket = ServerSocketChannel.open().socket(); - socket.bind(new InetSocketAddress(hostname, 0)); - InetSocketAddress address = new InetSocketAddress(hostname, socket.getLocalPort()); - socket.close(); - return address; - } - catch (IOException io) { - throw new RuntimeException(io); - } - } - - public static InetSocketAddress temporaryServerAddress() { - return temporaryServerAddress("127.0.0.1"); - } - -} \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/io/ScalaUdpMulticastSpec.scala b/akka-docs/rst/scala/code/docs/io/ScalaUdpMulticastSpec.scala index 404c4941de..0060207509 100644 --- a/akka-docs/rst/scala/code/docs/io/ScalaUdpMulticastSpec.scala +++ b/akka-docs/rst/scala/code/docs/io/ScalaUdpMulticastSpec.scala @@ -13,6 +13,7 @@ import akka.testkit.TestKit import org.scalatest.{ BeforeAndAfter, WordSpecLike } import scala.collection.JavaConversions.enumerationAsScalaIterator import org.scalatest.BeforeAndAfterAll +import akka.testkit.SocketUtil class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfterAll { @@ -37,7 +38,7 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec") // generate a random 32 bit multicast address with the high order bit set val randomAddress: String = (Random.nextInt().abs.toLong | (1L << 31)).toHexString.toUpperCase val group = randomAddress.grouped(4).mkString("FF02::", ":", "") - val port = TestUtils.temporaryUdpIpv6Port(ipv6iface) + val port = SocketUtil.temporaryUdpIpv6Port(ipv6iface) val msg = "ohi" val sink = testActor val iface = ipv6iface.getName @@ -70,12 +71,3 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec") } -object TestUtils { - def temporaryUdpIpv6Port(iface: NetworkInterface) = { - val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket() - serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0)) - val port = serverSocket.getLocalPort - serverSocket.close() - port - } -} diff --git a/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index 366a21c69a..d50ae7a035 100644 --- a/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -11,9 +11,9 @@ import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import akka.testkit.TestProbe import akka.util.ByteString -import docs.utils.TestUtils import scala.concurrent.Future +import akka.testkit.SocketUtil class StreamTcpDocSpec extends AkkaSpec { @@ -37,7 +37,7 @@ class StreamTcpDocSpec extends AkkaSpec { //#echo-server-simple-bind } { - val (host, port) = TestUtils.temporaryServerHostnameAndPort() + val (host, port) = SocketUtil.temporaryServerHostnameAndPort() //#echo-server-simple-handle import akka.stream.scaladsl.Framing @@ -62,7 +62,7 @@ class StreamTcpDocSpec extends AkkaSpec { } "initial server banner echo server" in { - val localhost = TestUtils.temporaryServerAddress() + val localhost = SocketUtil.temporaryServerAddress() val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 val serverProbe = TestProbe() diff --git a/akka-docs/rst/scala/code/docs/utils/TestUtils.scala b/akka-docs/rst/scala/code/docs/utils/TestUtils.scala deleted file mode 100644 index b3a79e7e09..0000000000 --- a/akka-docs/rst/scala/code/docs/utils/TestUtils.scala +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package docs.utils - -import java.net.InetSocketAddress -import java.nio.channels.ServerSocketChannel - -object TestUtils { - def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = { - val serverSocket = ServerSocketChannel.open() - try { - serverSocket.socket.bind(new InetSocketAddress(interface, 0)) - val port = serverSocket.socket.getLocalPort - new InetSocketAddress(interface, port) - } finally serverSocket.close() - } - - def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = { - val socketAddress = temporaryServerAddress(interface) - socketAddress.getHostName -> socketAddress.getPort // TODO getHostString in Java7 - } -} diff --git a/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala b/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala index 7c28488ab3..fbcb9fa765 100644 --- a/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala +++ b/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala @@ -8,6 +8,8 @@ import java.net.InetSocketAddress import java.net.SocketAddress import java.nio.channels.DatagramChannel import java.nio.channels.ServerSocketChannel +import java.net.NetworkInterface +import java.net.StandardProtocolFamily /** * Utilities to get free socket address. @@ -37,4 +39,17 @@ object SocketUtil { } collect { case (socket, address) ⇒ socket.close(); address } } + def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = { + val socketAddress = temporaryServerAddress(interface) + socketAddress.getHostString → socketAddress.getPort + } + + def temporaryUdpIpv6Port(iface: NetworkInterface) = { + val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket() + serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0)) + val port = serverSocket.getLocalPort + serverSocket.close() + port + } + }