Cleanup of UnfoldResourceAsync and fix for #24119
* More complete test coverage plus one found bug fixed
This commit is contained in:
parent
c32a1d4174
commit
3cafdc65e0
2 changed files with 122 additions and 62 deletions
|
|
@ -5,14 +5,13 @@ package akka.stream.impl
|
|||
|
||||
import akka.Done
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.stream.ActorAttributes.SupervisionStrategy
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.stage._
|
||||
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.Try
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
|
|
@ -28,82 +27,85 @@ import scala.util.control.NonFatal
|
|||
|
||||
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
|
||||
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||
var resource = Promise[S]()
|
||||
var open = false
|
||||
implicit val context = ExecutionContexts.sameThreadExecutionContext
|
||||
private implicit def ec = ActorMaterializerHelper.downcast(materializer).system.dispatcher
|
||||
private var state: Option[S] = None
|
||||
|
||||
setHandler(out, this)
|
||||
private val createdCallback = getAsyncCallback[Try[S]] {
|
||||
case Success(resource) ⇒
|
||||
state = Some(resource)
|
||||
if (isAvailable(out)) onPull()
|
||||
case Failure(t) ⇒ failStage(t)
|
||||
}.invoke _
|
||||
|
||||
override def preStart(): Unit = createStream(false)
|
||||
|
||||
private def createStream(withPull: Boolean): Unit = {
|
||||
val createdCallback = getAsyncCallback[Try[S]] {
|
||||
case scala.util.Success(res) ⇒
|
||||
open = true
|
||||
resource.success(res)
|
||||
if (withPull) onPull()
|
||||
case scala.util.Failure(t) ⇒ failStage(t)
|
||||
}
|
||||
try {
|
||||
create().onComplete(createdCallback.invoke)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒ failStage(ex)
|
||||
}
|
||||
}
|
||||
|
||||
private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach(f)
|
||||
|
||||
val errorHandler: PartialFunction[Throwable, Unit] = {
|
||||
private val errorHandler: PartialFunction[Throwable, Unit] = {
|
||||
case NonFatal(ex) ⇒ decider(ex) match {
|
||||
case Supervision.Stop ⇒
|
||||
onResourceReady(close(_))
|
||||
failStage(ex)
|
||||
case Supervision.Restart ⇒ restartState()
|
||||
case Supervision.Restart ⇒ restartResource()
|
||||
case Supervision.Resume ⇒ onPull()
|
||||
}
|
||||
}
|
||||
|
||||
val readCallback = getAsyncCallback[Try[Option[T]]] {
|
||||
case scala.util.Success(data) ⇒ data match {
|
||||
private val readCallback = getAsyncCallback[Try[Option[T]]] {
|
||||
case Success(data) ⇒ data match {
|
||||
case Some(d) ⇒ push(out, d)
|
||||
case None ⇒ closeStage()
|
||||
case None ⇒
|
||||
// end of resource reached, lets close it
|
||||
state match {
|
||||
case Some(resource) ⇒
|
||||
close(resource).onComplete(getAsyncCallback[Try[Done]] {
|
||||
case Success(Done) ⇒ completeStage()
|
||||
case Failure(ex) ⇒ failStage(ex)
|
||||
}.invoke)
|
||||
state = None
|
||||
|
||||
case None ⇒
|
||||
// cannot happen, but for good measure
|
||||
throw new IllegalStateException("Reached end of data but there is no open resource")
|
||||
}
|
||||
}
|
||||
case scala.util.Failure(t) ⇒ errorHandler(t)
|
||||
case Failure(t) ⇒ errorHandler(t)
|
||||
}.invoke _
|
||||
|
||||
final override def onPull(): Unit =
|
||||
onResourceReady { resource ⇒
|
||||
try { readData(resource).onComplete(readCallback) } catch errorHandler
|
||||
override def preStart(): Unit = createResource()
|
||||
|
||||
override def onPull(): Unit =
|
||||
state match {
|
||||
case Some(resource) ⇒
|
||||
try {
|
||||
readData(resource).onComplete(readCallback)
|
||||
} catch errorHandler
|
||||
case None ⇒
|
||||
// we got a pull but there is no open resource, we are either
|
||||
// currently creating/restarting then the read will be triggered when creating the
|
||||
// resource completes, or shutting down and then the pull does not matter anyway
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = closeStage()
|
||||
|
||||
private def closeAndThen(f: () ⇒ Unit): Unit = {
|
||||
setKeepGoing(true)
|
||||
val closedCallback = getAsyncCallback[Try[Done]] {
|
||||
case scala.util.Success(_) ⇒
|
||||
open = false
|
||||
f()
|
||||
case scala.util.Failure(t) ⇒
|
||||
open = false
|
||||
failStage(t)
|
||||
}
|
||||
|
||||
onResourceReady(res ⇒
|
||||
try { close(res).onComplete(closedCallback.invoke) } catch {
|
||||
case NonFatal(ex) ⇒ failStage(ex)
|
||||
})
|
||||
}
|
||||
private def restartState(): Unit = closeAndThen(() ⇒ {
|
||||
resource = Promise[S]()
|
||||
createStream(true)
|
||||
})
|
||||
private def closeStage(): Unit = closeAndThen(completeStage)
|
||||
|
||||
override def postStop(): Unit = {
|
||||
if (open) closeStage()
|
||||
state.foreach(r ⇒ close(r))
|
||||
}
|
||||
|
||||
private def restartResource(): Unit = {
|
||||
state match {
|
||||
case Some(resource) ⇒
|
||||
// wait for the resource to close before restarting
|
||||
close(resource).onComplete(getAsyncCallback[Try[Done]] {
|
||||
case Success(Done) ⇒
|
||||
createResource()
|
||||
case Failure(ex) ⇒ failStage(ex)
|
||||
}.invoke)
|
||||
state = None
|
||||
case None ⇒
|
||||
createResource()
|
||||
}
|
||||
}
|
||||
|
||||
private def createResource(): Unit = {
|
||||
create().onComplete(createdCallback)
|
||||
}
|
||||
|
||||
setHandler(out, this)
|
||||
|
||||
}
|
||||
override def toString = "UnfoldResourceSourceAsync"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue