Merge pull request #17808 from drewhk/wip-17786-addendum-fix-shutdown-drewhk

fix actorsystem shutdown in tests
This commit is contained in:
drewhk 2015-06-23 13:35:19 +02:00
commit 4f02192a04
2 changed files with 26 additions and 10 deletions

View file

@ -38,6 +38,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
import system.dispatcher import system.dispatcher
implicit val materializer = ActorFlowMaterializer() implicit val materializer = ActorFlowMaterializer()
val testConf2: Config =
ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s")
.withFallback(testConf)
val system2 = ActorSystem(getClass.getSimpleName, testConf2)
val materializer2 = ActorFlowMaterializer.create(system2)
"The low-level HTTP infrastructure" should { "The low-level HTTP infrastructure" should {
"properly bind a server" in { "properly bind a server" in {
@ -133,12 +139,6 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
} }
"log materialization errors in `bindAndHandle`" which { "log materialization errors in `bindAndHandle`" which {
val testConf2: Config =
ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s")
.withFallback(testConf)
val system2 = ActorSystem(getClass.getSimpleName, testConf2)
import system2.dispatcher
val materializer2 = ActorFlowMaterializer.create(system2)
"are triggered in `transform`" in Utils.assertAllStagesStopped { "are triggered in `transform`" in Utils.assertAllStagesStopped {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
@ -149,7 +149,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
val (_, responseFuture) = val (_, responseFuture) =
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] try Await.result(responseFuture, 5.second).status should ===(StatusCodes.InternalServerError)
catch {
case _: StreamTcpException
// Also fine, depends on the race between abort and 500, caused by materialization panic which
// tries to tear down everything, but the order is nondeterministic
}
}(system2) }(system2)
Await.result(b1.unbind(), 1.second) Await.result(b1.unbind(), 1.second)
}(materializer2) }(materializer2)
@ -163,7 +168,12 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
val (_, responseFuture) = val (_, responseFuture) =
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] try Await.result(responseFuture, 5.seconds).status should ===(StatusCodes.InternalServerError)
catch {
case _: StreamTcpException
// Also fine, depends on the race between abort and 500, caused by materialization panic which
// tries to tear down everything, but the order is nondeterministic
}
}(system2) }(system2)
Await.result(b1.unbind(), 1.second) Await.result(b1.unbind(), 1.second)
}(materializer2) }(materializer2)
@ -276,7 +286,10 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
} }
} }
override def afterAll() = system.shutdown() override def afterAll() = {
system.shutdown()
system2.shutdown()
}
class TestSetup { class TestSetup {
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()

View file

@ -8,6 +8,7 @@ import akka.io.Tcp._
import akka.stream.scaladsl.Tcp.IncomingConnection import akka.stream.scaladsl.Tcp.IncomingConnection
import akka.stream.scaladsl.{Flow, _} import akka.stream.scaladsl.{Flow, _}
import akka.stream.testkit.TestUtils.temporaryServerAddress import akka.stream.testkit.TestUtils.temporaryServerAddress
import scala.util.control.NonFatal
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException} import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException}
@ -392,7 +393,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
a[StreamTcpException] should be thrownBy a[StreamTcpException] should be thrownBy
Await.result(result, 3.seconds) Await.result(result, 3.seconds)
binding.map(_.unbind()).foreach(_ system2.shutdown()) binding.map(_.unbind()).recover {case NonFatal(_) => ()} foreach(_ system2.shutdown())
} }
} }
@ -523,6 +524,8 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
} }
def validateServerClientCommunication(testData: ByteString, def validateServerClientCommunication(testData: ByteString,
serverConnection: ServerConnection, serverConnection: ServerConnection,
readProbe: TcpReadProbe, readProbe: TcpReadProbe,