diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala index 0b3b77327a..0a4ecc4fc9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -11,6 +11,7 @@ import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import akka.stream.OperationAttributes import akka.stream.ActorOperationAttributes +import scala.concurrent.duration._ class FlowErrorDocSpec extends AkkaSpec { @@ -24,7 +25,7 @@ class FlowErrorDocSpec extends AkkaSpec { //#stop intercept[ArithmeticException] { - Await.result(result, remaining) + Await.result(result, 3.seconds) } } @@ -42,7 +43,7 @@ class FlowErrorDocSpec extends AkkaSpec { // result here will be a Future completed with Success(228) //#resume - Await.result(result, remaining) should be(228) + Await.result(result, 3.seconds) should be(228) } "demonstrate resume section" in { @@ -62,7 +63,7 @@ class FlowErrorDocSpec extends AkkaSpec { // result here will be a Future completed with Success(150) //#resume-section - Await.result(result, remaining) should be(150) + Await.result(result, 3.seconds) should be(150) } "demonstrate restart section" in { @@ -85,7 +86,7 @@ class FlowErrorDocSpec extends AkkaSpec { // result here will be a Future completed with Success(Vector(0, 1, 4, 0, 5, 12)) //#restart-section - Await.result(result, remaining) should be(Vector(0, 1, 4, 0, 5, 12)) + Await.result(result, 3.seconds) should be(Vector(0, 1, 4, 0, 5, 12)) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 42f4228753..9f20f00d16 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -4,9 +4,13 @@ package akka.http.impl.engine.client +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.{ SocketChannel, ServerSocketChannel } import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.util.ByteString import akka.http.scaladsl.{ TestUtils, Http } @@ -19,9 +23,33 @@ import akka.stream.scaladsl._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model._ -class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = OFF\n akka.io.tcp.trace-logging = off") { +class ConnectionPoolSpec extends AkkaSpec(""" + akka.loggers = [] + akka.loglevel = OFF + akka.io.tcp.trace-logging = off + akka.io.tcp.windows-connection-abort-workaround-enabled=auto""") { implicit val materializer = ActorFlowMaterializer() + // FIXME: Extract into proper util class to be reusable + lazy val ConnectionResetByPeerMessage: String = { + val serverSocket = ServerSocketChannel.open() + serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0)) + try { + val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort)) + @volatile var serverSideChannel: SocketChannel = null + awaitCond { + serverSideChannel = serverSocket.accept() + serverSideChannel != null + } + serverSideChannel.socket.setSoLinger(true, 0) + serverSideChannel.close() + clientSocket.read(ByteBuffer.allocate(1)) + null + } catch { + case NonFatal(e) ⇒ e.getMessage + } + } + "The host-level client infrastructure" should { "properly complete a simple request/response cycle" in new TestSetup { @@ -104,7 +132,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O val responses = Seq(responseOut.expectNext(), responseOut.expectNext()) responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") } - responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") } + responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include(ConnectionResetByPeerMessage) } } "retry failed requests" in new TestSetup(autoAccept = true) { @@ -142,7 +170,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O val responses = Seq(responseOut.expectNext(), responseOut.expectNext()) responses mustContainLike { case (Success(x), 42) ⇒ requestUri(x) should endWith("/a") } - responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include("reset by peer") } + responses mustContainLike { case (Failure(x), 43) ⇒ x.getMessage should include(ConnectionResetByPeerMessage) } remainingResponsesToKill.get() shouldEqual 0 } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index c8629f18f8..eef5b0141e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -9,6 +9,7 @@ import scala.util.control.NoStackTrace import akka.stream.{ OverflowStrategy, ActorFlowMaterializer } import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ +import scala.concurrent.duration._ class FlowFoldSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() @@ -19,19 +20,19 @@ class FlowFoldSpec extends AkkaSpec { val input = 1 to 100 val future = Source(input).runFold(0)(_ + _) val expected = input.fold(0)(_ + _) - Await.result(future, remaining) should be(expected) + Await.result(future, 3.seconds) should be(expected) } "propagate an error" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source[Unit](() ⇒ throw error).runFold(())((_, _) ⇒ ()) - the[Exception] thrownBy Await.result(future, remaining) should be(error) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } "complete future with failure when function throws" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source.single(1).runFold(0)((_, _) ⇒ throw error) - the[Exception] thrownBy Await.result(future, remaining) should be(error) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index 213e0c5ae7..ccebeb6c1a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -8,6 +8,7 @@ import akka.stream.ActorFlowMaterializer import akka.stream.testkit._ import akka.stream.testkit.Utils._ import scala.concurrent.Await +import scala.concurrent.duration._ class FlowForeachSpec extends AkkaSpec { @@ -48,7 +49,7 @@ class FlowForeachSpec extends AkkaSpec { "complete future with failure when function throws" in assertAllStagesStopped { val error = new Exception with NoStackTrace val future = Source.single(1).runForeach(_ ⇒ throw error) - the[Exception] thrownBy Await.result(future, remaining) should be(error) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) } }