diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala index 9371ff3e3f..790e38b7f8 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala @@ -4,23 +4,19 @@ package akka.http.scaladsl.server -import java.net.InetSocketAddress -import java.nio.channels.ServerSocketChannel import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.actor.ActorSystem import akka.event.Logging import akka.http.scaladsl.{ TestUtils, Http } import akka.http.scaladsl.model.{ HttpResponse, Uri, HttpRequest } -import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl } -import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy } +import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ Sink, Source } -import akka.testkit.TestProbe +import akka.stream.testkit.Utils.assertAllStagesStopped import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, BeforeAndAfterAll, WordSpecLike } -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, Future } +import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } @@ -38,32 +34,10 @@ class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers val log = Logging(system, getClass) - // TODO DUPLICATED - def assertAllStagesStopped[T](name: String)(block: ⇒ T)(implicit materializer: Materializer): T = - materializer match { - case impl: ActorMaterializerImpl ⇒ - val probe = TestProbe()(impl.system) - probe.send(impl.supervisor, StreamSupervisor.StopChildren) - probe.expectMsg(StreamSupervisor.StoppedChildren) - val result = block - probe.within(5.seconds) { - probe.awaitAssert { - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - val children = probe.expectMsgType[StreamSupervisor.Children].children.filter { c ⇒ - c.path.toString contains name - } - assert(children.isEmpty, - s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") - } - } - result - case _ ⇒ block - } - "Http.superPool" should { - "not leak connection Actors when hitting non-existing endpoint" ignore { - assertAllStagesStopped("InternalConnectionFlow") { + "not leak connection Actors when hitting non-existing endpoint" in { + assertAllStagesStopped { val reqsCount = 100 val clientFlow = Http().superPool[Int]() val (_, _, port) = TestUtils.temporaryServerHostnameAndPort() diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index b1d00c1d58..64f3377c19 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -271,7 +271,7 @@ object AkkaBuild extends Build { lazy val httpTests = Project( id = "akka-http-tests", base = file("akka-http-tests"), - dependencies = Seq(httpTestkit % "test", testkit % "test->test", httpSprayJson, httpXml, httpJackson) + dependencies = Seq(httpTestkit % "test", streamTestkit % "test->test", testkit % "test->test", httpSprayJson, httpXml, httpJackson) ) lazy val httpMarshallersScala = Project(