#20142: Fix implicit sender closure in TcpConnectionsStage

This commit is contained in:
Endre Sándor Varga 2016-04-11 13:36:04 +02:00
parent 81579bd403
commit da66d3e1cd
2 changed files with 28 additions and 2 deletions

View file

@ -535,6 +535,27 @@ class TcpSpec extends AkkaSpec("akka.stream.materializer.subscription-timeout.ti
Await.result(rejected, 5.seconds) should ===(100)
}
}
"not thrown on unbind after system has been shut down" in {
val sys2 = ActorSystem("shutdown-test-system")
val mat2 = ActorMaterializer()(sys2)
try {
val address = temporaryServerAddress()
val bindingFuture = Tcp().bindAndHandle(Flow[ByteString], address.getHostName, address.getPort)(mat2)
// Ensure server is running
Await.result(
Source.single(ByteString(0)).via(Tcp().outgoingConnection(address)).runWith(Sink.ignore),
3.seconds)
Await.result(sys2.terminate(), 3.seconds)
val binding = Await.result(bindingFuture, 3.seconds)
Await.result(binding.unbind(), 3.seconds)
} finally sys2.terminate()
}
}
def validateServerClientCommunication(testData: ByteString,

View file

@ -64,8 +64,13 @@ private[stream] class ConnectionSourceStage(val tcpManager: ActorRef,
listener = sender
stageActor.watch(listener)
if (isAvailable(out)) listener ! ResumeAccepting(1)
val target = self
bindingPromise.success(ServerBinding(localAddress)(() { target ! Unbind; unbindPromise.future }))
val thisStage = self
bindingPromise.success(ServerBinding(localAddress)(() {
// Beware, sender must be explicit since stageActor.ref will be invalid to access after the stage
// stopped.
thisStage.tell(Unbind, thisStage)
unbindPromise.future
}))
case f: CommandFailed
val ex = BindFailedException
bindingPromise.failure(ex)