Proper threadsafe collection of stream snapshots, ignoring stopping streams. #28960
This commit is contained in:
parent
93a69c42ff
commit
5d279b6c9c
3 changed files with 80 additions and 27 deletions
|
|
@ -0,0 +1,11 @@
|
|||
# internal changes
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.StreamSupervisor$GetChildrenSnapshots$")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productPrefix")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productArity")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElement")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productIterator")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.canEqual")
|
||||
ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.toString")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor.takeSnapshotsOfChildren")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElementName")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElementNames")
|
||||
|
|
@ -8,25 +8,19 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.ExecutionContextExecutor
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
import akka.actor._
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.pattern.ask
|
||||
import akka.pattern.pipe
|
||||
import akka.pattern.retry
|
||||
import akka.pattern.StatusReply
|
||||
import akka.stream._
|
||||
import akka.stream.impl.fusing.ActorGraphInterpreter
|
||||
import akka.stream.impl.fusing.GraphInterpreterShell
|
||||
import akka.stream.snapshot.StreamSnapshot
|
||||
import akka.util.OptionVal
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* ExtendedActorMaterializer used by subtypes which delegates in-island wiring to [[akka.stream.impl.PhaseIsland]]s
|
||||
|
|
@ -203,10 +197,11 @@ private[akka] class SubFusingActorMaterializerImpl(
|
|||
extends DeadLetterSuppression
|
||||
with NoSerializationVerificationNeeded
|
||||
|
||||
case object GetChildrenSnapshots
|
||||
final case class GetChildrenSnapshots(timeout: FiniteDuration)
|
||||
final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot])
|
||||
extends DeadLetterSuppression
|
||||
with NoSerializationVerificationNeeded
|
||||
private final case class CollectorCompleted(ref: ActorRef)
|
||||
|
||||
/** Testing purpose */
|
||||
case object GetChildren
|
||||
|
|
@ -229,34 +224,78 @@ private[akka] class SubFusingActorMaterializerImpl(
|
|||
implicit val ec: ExecutionContextExecutor = context.dispatcher
|
||||
override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||
|
||||
private var snapshotCollectors: Set[ActorRef] = Set.empty
|
||||
|
||||
def receive: Receive = {
|
||||
case Materialize(props, name) =>
|
||||
val impl = context.actorOf(props, name)
|
||||
sender() ! impl
|
||||
case GetChildren =>
|
||||
sender() ! Children(context.children.toSet)
|
||||
case GetChildrenSnapshots =>
|
||||
takeSnapshotsOfChildren().map(ChildrenSnapshots.apply _).pipeTo(sender())
|
||||
|
||||
case GetChildrenSnapshots(timeout) =>
|
||||
val collector =
|
||||
context.actorOf(
|
||||
SnapshotCollector
|
||||
.props(context.children.toSet -- snapshotCollectors, timeout, sender())
|
||||
.withDispatcher(context.props.dispatcher))
|
||||
context.watchWith(collector, CollectorCompleted(collector))
|
||||
snapshotCollectors += collector
|
||||
case StopChildren =>
|
||||
context.children.foreach(context.stop)
|
||||
sender() ! StoppedChildren
|
||||
}
|
||||
|
||||
def takeSnapshotsOfChildren(): Future[immutable.Seq[StreamSnapshot]] = {
|
||||
// Arbitrary timeout but should always be quick, the failure scenario is that
|
||||
// the child/stream stopped, and we do retry below
|
||||
implicit val timeout: Timeout = 1.second
|
||||
def takeSnapshot() = {
|
||||
val futureSnapshots =
|
||||
context.children.toList.map(child => (child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot])
|
||||
Future.sequence(futureSnapshots)
|
||||
}
|
||||
|
||||
// If the timeout hits it is likely because one of the streams stopped between looking at the list
|
||||
// of children and asking it for a snapshot. We retry the entire snapshot in that case
|
||||
retry(() => takeSnapshot(), 3)
|
||||
case CollectorCompleted(collector) =>
|
||||
snapshotCollectors -= collector
|
||||
}
|
||||
|
||||
override def postStop(): Unit = haveShutDown.set(true)
|
||||
}
|
||||
|
||||
@InternalApi
|
||||
private[akka] object SnapshotCollector {
|
||||
case object SnapshotTimeout
|
||||
def props(streamActors: Set[ActorRef], timeout: FiniteDuration, replyTo: ActorRef): Props =
|
||||
Props(new SnapshotCollector(streamActors, timeout, replyTo))
|
||||
}
|
||||
|
||||
@InternalApi
|
||||
private[akka] final class SnapshotCollector(streamActors: Set[ActorRef], timeout: FiniteDuration, replyTo: ActorRef)
|
||||
extends Actor
|
||||
with Timers {
|
||||
|
||||
import SnapshotCollector._
|
||||
|
||||
var leftToRespond = streamActors
|
||||
var collected: List[StreamSnapshot] = Nil
|
||||
if (streamActors.isEmpty) {
|
||||
replyTo ! StatusReply.Success(StreamSupervisor.ChildrenSnapshots(Nil))
|
||||
context.stop(self)
|
||||
} else {
|
||||
streamActors.foreach { ref =>
|
||||
context.watch(ref)
|
||||
ref ! ActorGraphInterpreter.Snapshot
|
||||
}
|
||||
}
|
||||
|
||||
timers.startSingleTimer(SnapshotTimeout, SnapshotTimeout, timeout)
|
||||
|
||||
override def receive: Receive = {
|
||||
case snap: StreamSnapshot =>
|
||||
collected = snap :: collected
|
||||
leftToRespond -= sender()
|
||||
completeIfDone()
|
||||
case Terminated(streamActor) =>
|
||||
leftToRespond -= streamActor
|
||||
completeIfDone()
|
||||
case SnapshotTimeout =>
|
||||
replyTo ! StatusReply.Error(
|
||||
s"Didn't get replies from all stream actors within the timeout of ${timeout.toMillis} ms")
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
def completeIfDone(): Unit = {
|
||||
if (leftToRespond.isEmpty) {
|
||||
replyTo ! StatusReply.Success(StreamSupervisor.ChildrenSnapshots(collected))
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,10 @@ object MaterializerState {
|
|||
implicit ec: ExecutionContext): Future[immutable.Seq[StreamSnapshot]] = {
|
||||
// Arbitrary timeout: operation should always be quick, when it times out it will be because the materializer stopped
|
||||
implicit val timeout: Timeout = 10.seconds
|
||||
(supervisor ? StreamSupervisor.GetChildrenSnapshots).mapTo[StreamSupervisor.ChildrenSnapshots].map(_.seq)
|
||||
supervisor
|
||||
.askWithStatus(StreamSupervisor.GetChildrenSnapshots(timeout.duration))
|
||||
.mapTo[StreamSupervisor.ChildrenSnapshots]
|
||||
.map(_.seq)
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue