2017-06-06 16:38:05 +02:00
|
|
|
/**
|
2018-01-04 17:26:29 +00:00
|
|
|
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
2017-06-06 16:38:05 +02:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2017-06-06 16:38:05 +02:00
|
|
|
package akka.stream.impl
|
|
|
|
|
|
|
|
|
|
import akka.Done
|
|
|
|
|
import akka.annotation.InternalApi
|
|
|
|
|
import akka.stream.ActorAttributes.SupervisionStrategy
|
|
|
|
|
import akka.stream._
|
|
|
|
|
import akka.stream.impl.Stages.DefaultAttributes
|
|
|
|
|
import akka.stream.stage._
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
import scala.concurrent.Future
|
|
|
|
|
import scala.util.{ Failure, Success, Try }
|
2017-06-06 16:38:05 +02:00
|
|
|
import scala.util.control.NonFatal
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] final class UnfoldResourceSourceAsync[T, S](
|
|
|
|
|
create: () ⇒ Future[S],
|
|
|
|
|
readData: (S) ⇒ Future[Option[T]],
|
|
|
|
|
close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] {
|
|
|
|
|
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
|
|
|
|
|
override val shape = SourceShape(out)
|
|
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync
|
|
|
|
|
|
|
|
|
|
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
|
2017-11-22 13:51:24 +01:00
|
|
|
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
2018-01-16 15:01:55 +01:00
|
|
|
private implicit def ec = ActorMaterializerHelper.downcast(materializer).system.dispatcher
|
|
|
|
|
private var state: Option[S] = None
|
|
|
|
|
|
|
|
|
|
private val createdCallback = getAsyncCallback[Try[S]] {
|
|
|
|
|
case Success(resource) ⇒
|
|
|
|
|
state = Some(resource)
|
|
|
|
|
if (isAvailable(out)) onPull()
|
|
|
|
|
case Failure(t) ⇒ failStage(t)
|
|
|
|
|
}.invoke _
|
2017-06-06 16:38:05 +02:00
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
private val errorHandler: PartialFunction[Throwable, Unit] = {
|
2017-06-06 16:38:05 +02:00
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒
|
|
|
|
|
failStage(ex)
|
2018-01-16 15:01:55 +01:00
|
|
|
case Supervision.Restart ⇒ restartResource()
|
2017-06-06 16:38:05 +02:00
|
|
|
case Supervision.Resume ⇒ onPull()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
private val readCallback = getAsyncCallback[Try[Option[T]]] {
|
|
|
|
|
case Success(data) ⇒ data match {
|
2017-06-06 16:38:05 +02:00
|
|
|
case Some(d) ⇒ push(out, d)
|
2018-01-16 15:01:55 +01:00
|
|
|
case None ⇒
|
|
|
|
|
// end of resource reached, lets close it
|
|
|
|
|
state match {
|
|
|
|
|
case Some(resource) ⇒
|
|
|
|
|
close(resource).onComplete(getAsyncCallback[Try[Done]] {
|
|
|
|
|
case Success(Done) ⇒ completeStage()
|
|
|
|
|
case Failure(ex) ⇒ failStage(ex)
|
|
|
|
|
}.invoke)
|
|
|
|
|
state = None
|
|
|
|
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
// cannot happen, but for good measure
|
|
|
|
|
throw new IllegalStateException("Reached end of data but there is no open resource")
|
|
|
|
|
}
|
2017-06-06 16:38:05 +02:00
|
|
|
}
|
2018-01-16 15:01:55 +01:00
|
|
|
case Failure(t) ⇒ errorHandler(t)
|
2017-06-06 16:38:05 +02:00
|
|
|
}.invoke _
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
override def preStart(): Unit = createResource()
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit =
|
|
|
|
|
state match {
|
|
|
|
|
case Some(resource) ⇒
|
|
|
|
|
try {
|
|
|
|
|
readData(resource).onComplete(readCallback)
|
|
|
|
|
} catch errorHandler
|
|
|
|
|
case None ⇒
|
|
|
|
|
// we got a pull but there is no open resource, we are either
|
|
|
|
|
// currently creating/restarting then the read will be triggered when creating the
|
|
|
|
|
// resource completes, or shutting down and then the pull does not matter anyway
|
2017-06-06 16:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
override def postStop(): Unit = {
|
|
|
|
|
state.foreach(r ⇒ close(r))
|
|
|
|
|
}
|
2017-06-06 16:38:05 +02:00
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
private def restartResource(): Unit = {
|
|
|
|
|
state match {
|
|
|
|
|
case Some(resource) ⇒
|
|
|
|
|
// wait for the resource to close before restarting
|
|
|
|
|
close(resource).onComplete(getAsyncCallback[Try[Done]] {
|
|
|
|
|
case Success(Done) ⇒
|
|
|
|
|
createResource()
|
|
|
|
|
case Failure(ex) ⇒ failStage(ex)
|
|
|
|
|
}.invoke)
|
|
|
|
|
state = None
|
|
|
|
|
case None ⇒
|
|
|
|
|
createResource()
|
|
|
|
|
}
|
2017-06-06 16:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
private def createResource(): Unit = {
|
|
|
|
|
create().onComplete(createdCallback)
|
2017-06-06 16:38:05 +02:00
|
|
|
}
|
|
|
|
|
|
2018-01-16 15:01:55 +01:00
|
|
|
setHandler(out, this)
|
|
|
|
|
|
2017-06-06 16:38:05 +02:00
|
|
|
}
|
|
|
|
|
override def toString = "UnfoldResourceSourceAsync"
|
|
|
|
|
|
|
|
|
|
}
|