diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 47f793d3ee..ba7774511d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -535,6 +535,27 @@ class TcpSpec extends AkkaSpec("akka.stream.materializer.subscription-timeout.ti Await.result(rejected, 5.seconds) should ===(100) } } + + "not thrown on unbind after system has been shut down" in { + val sys2 = ActorSystem("shutdown-test-system") + val mat2 = ActorMaterializer()(sys2) + + try { + val address = temporaryServerAddress() + + val bindingFuture = Tcp().bindAndHandle(Flow[ByteString], address.getHostName, address.getPort)(mat2) + + // Ensure server is running + Await.result( + Source.single(ByteString(0)).via(Tcp().outgoingConnection(address)).runWith(Sink.ignore), + 3.seconds) + + Await.result(sys2.terminate(), 3.seconds) + + val binding = Await.result(bindingFuture, 3.seconds) + Await.result(binding.unbind(), 3.seconds) + } finally sys2.terminate() + } } def validateServerClientCommunication(testData: ByteString, diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 3ac29119a7..048a541b04 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -64,8 +64,13 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef, listener = sender stageActor.watch(listener) if (isAvailable(out)) listener ! ResumeAccepting(1) - val target = self - bindingPromise.success(ServerBinding(localAddress)(() ⇒ { target ! Unbind; unbindPromise.future })) + val thisStage = self + bindingPromise.success(ServerBinding(localAddress)(() ⇒ { + // Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage + // stopped. + thisStage.tell(Unbind, thisStage) + unbindPromise.future + })) case f: CommandFailed ⇒ val ex = BindFailedException bindingPromise.failure(ex)