Make sure to close resource in UnfoldResourceAsync (#24842)

* Make sure to close resource in UnfoldResourceAsync #24839

* Better fix for #24839
This commit is contained in:
Johan Andrén 2018-04-09 07:30:24 +02:00 committed by Konrad `ktoso` Malawski
parent 8106722b26
commit b94e064a34
2 changed files with 38 additions and 2 deletions

View file

@ -36,7 +36,7 @@ import scala.util.control.NonFatal
state = Some(resource)
if (isAvailable(out)) onPull()
case Failure(t) failStage(t)
}.invoke _
}.invokeWithFeedback _
private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) decider(ex) match {
@ -102,7 +102,16 @@ import scala.util.control.NonFatal
}
private def createResource(): Unit = {
create().onComplete(createdCallback)
create().onComplete { resource
createdCallback(resource).recover {
case _: StreamDetachedException
// stream stopped
resource match {
case Success(r) close(r)
case Failure(ex) throw ex // failed to open but stream is stopped already
}
}
}
}
setHandler(out, this)