avoid infinite blocking in TcpConnectionSpec #21375
* use socket timeout * additional cleanup of socket utils
This commit is contained in:
parent
72925ba392
commit
4f013a3d1e
8 changed files with 42 additions and 79 deletions
|
|
@ -26,6 +26,7 @@ import akka.testkit.{ AkkaSpec, EventFilter, SocketUtil, TestActorRef, TestProbe
|
||||||
import akka.util.{ ByteString, Helpers }
|
import akka.util.{ ByteString, Helpers }
|
||||||
import akka.testkit.SocketUtil._
|
import akka.testkit.SocketUtil._
|
||||||
import java.util.Random
|
import java.util.Random
|
||||||
|
import java.net.SocketTimeoutException
|
||||||
|
|
||||||
object TcpConnectionSpec {
|
object TcpConnectionSpec {
|
||||||
case class Ack(i: Int) extends Event
|
case class Ack(i: Int) extends Event
|
||||||
|
|
@ -836,13 +837,21 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
|
|
||||||
IO(Tcp) ! Connect(bindAddress)
|
IO(Tcp) ! Connect(bindAddress)
|
||||||
|
|
||||||
val socket = serverSocket.accept()
|
try {
|
||||||
connectionProbe.expectMsgType[Tcp.Connected]
|
serverSocket.setSoTimeout(remainingOrDefault.toMillis.toInt)
|
||||||
val connectionActor = connectionProbe.sender()
|
val socket = serverSocket.accept()
|
||||||
connectionActor ! PoisonPill
|
connectionProbe.expectMsgType[Tcp.Connected]
|
||||||
watch(connectionActor)
|
val connectionActor = connectionProbe.sender()
|
||||||
expectTerminated(connectionActor)
|
connectionActor ! PoisonPill
|
||||||
an[IOException] should be thrownBy { socket.getInputStream.read() }
|
watch(connectionActor)
|
||||||
|
expectTerminated(connectionActor)
|
||||||
|
an[IOException] should be thrownBy { socket.getInputStream.read() }
|
||||||
|
} catch {
|
||||||
|
case e: SocketTimeoutException ⇒
|
||||||
|
// thrown by serverSocket.accept, this may happen if network is offline
|
||||||
|
info(e.getMessage)
|
||||||
|
pending
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,8 @@ import akka.actor.ActorSystem;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
import akka.io.Udp;
|
import akka.io.Udp;
|
||||||
import akka.testkit.JavaTestKit;
|
import akka.testkit.JavaTestKit;
|
||||||
|
import akka.testkit.SocketUtil;
|
||||||
|
|
||||||
import docs.AbstractJavaTest;
|
import docs.AbstractJavaTest;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
|
@ -62,7 +64,7 @@ public class JavaUdpMulticastTest extends AbstractJavaTest {
|
||||||
groupBuilder.append(randomAddress.subSequence(i * 4, i * 4 + 4));
|
groupBuilder.append(randomAddress.subSequence(i * 4, i * 4 + 4));
|
||||||
}
|
}
|
||||||
final String group = groupBuilder.toString();
|
final String group = groupBuilder.toString();
|
||||||
final Integer port = TestUtils.temporaryUdpIpv6Port(ipv6Iface);
|
final Integer port = SocketUtil.temporaryUdpIpv6Port(ipv6Iface);
|
||||||
final String msg = "ohi";
|
final String msg = "ohi";
|
||||||
final ActorRef sink = getRef();
|
final ActorRef sink = getRef();
|
||||||
final String iface = ipv6Iface.getName();
|
final String iface = ipv6Iface.getName();
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ import docs.AbstractJavaTest;
|
||||||
import docs.stream.SilenceSystemOut;
|
import docs.stream.SilenceSystemOut;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
import docs.util.SocketUtils;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -24,6 +23,7 @@ import akka.stream.javadsl.*;
|
||||||
import akka.stream.javadsl.Tcp.*;
|
import akka.stream.javadsl.Tcp.*;
|
||||||
import akka.stream.stage.*;
|
import akka.stream.stage.*;
|
||||||
import akka.testkit.JavaTestKit;
|
import akka.testkit.JavaTestKit;
|
||||||
|
import akka.testkit.SocketUtil;
|
||||||
import akka.testkit.TestProbe;
|
import akka.testkit.TestProbe;
|
||||||
import akka.util.ByteString;
|
import akka.util.ByteString;
|
||||||
|
|
||||||
|
|
@ -70,7 +70,7 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
|
||||||
final InetSocketAddress localhost = SocketUtils.temporaryServerAddress();
|
final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false);
|
||||||
final Source<IncomingConnection, CompletionStage<ServerBinding>> connections =
|
final Source<IncomingConnection, CompletionStage<ServerBinding>> connections =
|
||||||
Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
|
Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
|
||||||
|
|
||||||
|
|
@ -93,7 +93,7 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
||||||
@Test
|
@Test
|
||||||
public void actuallyWorkingClientServerApp() {
|
public void actuallyWorkingClientServerApp() {
|
||||||
|
|
||||||
final InetSocketAddress localhost = SocketUtils.temporaryServerAddress();
|
final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false);
|
||||||
|
|
||||||
final TestProbe serverProbe = new TestProbe(system);
|
final TestProbe serverProbe = new TestProbe(system);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package docs.util;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.ServerSocket;
|
|
||||||
import java.nio.channels.ServerSocketChannel;
|
|
||||||
|
|
||||||
public class SocketUtils {
|
|
||||||
|
|
||||||
public static InetSocketAddress temporaryServerAddress(String hostname) {
|
|
||||||
try {
|
|
||||||
ServerSocket socket = ServerSocketChannel.open().socket();
|
|
||||||
socket.bind(new InetSocketAddress(hostname, 0));
|
|
||||||
InetSocketAddress address = new InetSocketAddress(hostname, socket.getLocalPort());
|
|
||||||
socket.close();
|
|
||||||
return address;
|
|
||||||
}
|
|
||||||
catch (IOException io) {
|
|
||||||
throw new RuntimeException(io);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static InetSocketAddress temporaryServerAddress() {
|
|
||||||
return temporaryServerAddress("127.0.0.1");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.testkit.TestKit
|
||||||
import org.scalatest.{ BeforeAndAfter, WordSpecLike }
|
import org.scalatest.{ BeforeAndAfter, WordSpecLike }
|
||||||
import scala.collection.JavaConversions.enumerationAsScalaIterator
|
import scala.collection.JavaConversions.enumerationAsScalaIterator
|
||||||
import org.scalatest.BeforeAndAfterAll
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import akka.testkit.SocketUtil
|
||||||
|
|
||||||
class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfterAll {
|
class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")) with WordSpecLike with BeforeAndAfterAll {
|
||||||
|
|
||||||
|
|
@ -37,7 +38,7 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")
|
||||||
// generate a random 32 bit multicast address with the high order bit set
|
// generate a random 32 bit multicast address with the high order bit set
|
||||||
val randomAddress: String = (Random.nextInt().abs.toLong | (1L << 31)).toHexString.toUpperCase
|
val randomAddress: String = (Random.nextInt().abs.toLong | (1L << 31)).toHexString.toUpperCase
|
||||||
val group = randomAddress.grouped(4).mkString("FF02::", ":", "")
|
val group = randomAddress.grouped(4).mkString("FF02::", ":", "")
|
||||||
val port = TestUtils.temporaryUdpIpv6Port(ipv6iface)
|
val port = SocketUtil.temporaryUdpIpv6Port(ipv6iface)
|
||||||
val msg = "ohi"
|
val msg = "ohi"
|
||||||
val sink = testActor
|
val sink = testActor
|
||||||
val iface = ipv6iface.getName
|
val iface = ipv6iface.getName
|
||||||
|
|
@ -70,12 +71,3 @@ class ScalaUdpMulticastSpec extends TestKit(ActorSystem("ScalaUdpMulticastSpec")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object TestUtils {
|
|
||||||
def temporaryUdpIpv6Port(iface: NetworkInterface) = {
|
|
||||||
val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket()
|
|
||||||
serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0))
|
|
||||||
val port = serverSocket.getLocalPort
|
|
||||||
serverSocket.close()
|
|
||||||
port
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@ import akka.stream.scaladsl._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import docs.utils.TestUtils
|
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
|
import akka.testkit.SocketUtil
|
||||||
|
|
||||||
class StreamTcpDocSpec extends AkkaSpec {
|
class StreamTcpDocSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
|
@ -37,7 +37,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
||||||
//#echo-server-simple-bind
|
//#echo-server-simple-bind
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
val (host, port) = TestUtils.temporaryServerHostnameAndPort()
|
val (host, port) = SocketUtil.temporaryServerHostnameAndPort()
|
||||||
//#echo-server-simple-handle
|
//#echo-server-simple-handle
|
||||||
import akka.stream.scaladsl.Framing
|
import akka.stream.scaladsl.Framing
|
||||||
|
|
||||||
|
|
@ -62,7 +62,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"initial server banner echo server" in {
|
"initial server banner echo server" in {
|
||||||
val localhost = TestUtils.temporaryServerAddress()
|
val localhost = SocketUtil.temporaryServerAddress()
|
||||||
val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7
|
val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7
|
||||||
val serverProbe = TestProbe()
|
val serverProbe = TestProbe()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package docs.utils
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
|
||||||
import java.nio.channels.ServerSocketChannel
|
|
||||||
|
|
||||||
object TestUtils {
|
|
||||||
def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = {
|
|
||||||
val serverSocket = ServerSocketChannel.open()
|
|
||||||
try {
|
|
||||||
serverSocket.socket.bind(new InetSocketAddress(interface, 0))
|
|
||||||
val port = serverSocket.socket.getLocalPort
|
|
||||||
new InetSocketAddress(interface, port)
|
|
||||||
} finally serverSocket.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = {
|
|
||||||
val socketAddress = temporaryServerAddress(interface)
|
|
||||||
socketAddress.getHostName -> socketAddress.getPort // TODO getHostString in Java7
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -8,6 +8,8 @@ import java.net.InetSocketAddress
|
||||||
import java.net.SocketAddress
|
import java.net.SocketAddress
|
||||||
import java.nio.channels.DatagramChannel
|
import java.nio.channels.DatagramChannel
|
||||||
import java.nio.channels.ServerSocketChannel
|
import java.nio.channels.ServerSocketChannel
|
||||||
|
import java.net.NetworkInterface
|
||||||
|
import java.net.StandardProtocolFamily
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities to get free socket address.
|
* Utilities to get free socket address.
|
||||||
|
|
@ -37,4 +39,17 @@ object SocketUtil {
|
||||||
} collect { case (socket, address) ⇒ socket.close(); address }
|
} collect { case (socket, address) ⇒ socket.close(); address }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (String, Int) = {
|
||||||
|
val socketAddress = temporaryServerAddress(interface)
|
||||||
|
socketAddress.getHostString → socketAddress.getPort
|
||||||
|
}
|
||||||
|
|
||||||
|
def temporaryUdpIpv6Port(iface: NetworkInterface) = {
|
||||||
|
val serverSocket = DatagramChannel.open(StandardProtocolFamily.INET6).socket()
|
||||||
|
serverSocket.bind(new InetSocketAddress(iface.getInetAddresses.nextElement(), 0))
|
||||||
|
val port = serverSocket.getLocalPort
|
||||||
|
serverSocket.close()
|
||||||
|
port
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue