diff --git a/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/28960-stream-snapshots.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/28960-stream-snapshots.backwards.excludes new file mode 100644 index 0000000000..dd9f4ad4a9 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.9.backwards.excludes/28960-stream-snapshots.backwards.excludes @@ -0,0 +1,11 @@ +# internal changes +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.StreamSupervisor$GetChildrenSnapshots$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productPrefix") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.canEqual") +ProblemFilters.exclude[FinalMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.toString") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor.takeSnapshotsOfChildren") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElementName") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.StreamSupervisor#GetChildrenSnapshots.productElementNames") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 7feff97654..f5f981a17b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -8,25 +8,19 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.immutable import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future import scala.concurrent.duration._ - import com.github.ghik.silencer.silent - import akka.actor._ import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.dispatch.Dispatchers import akka.event.LoggingAdapter -import akka.pattern.ask -import akka.pattern.pipe -import akka.pattern.retry +import akka.pattern.StatusReply import akka.stream._ import akka.stream.impl.fusing.ActorGraphInterpreter import akka.stream.impl.fusing.GraphInterpreterShell import akka.stream.snapshot.StreamSnapshot import akka.util.OptionVal -import akka.util.Timeout /** * ExtendedActorMaterializer used by subtypes which delegates in-island wiring to [[akka.stream.impl.PhaseIsland]]s @@ -203,10 +197,11 @@ private[akka] class SubFusingActorMaterializerImpl( extends DeadLetterSuppression with NoSerializationVerificationNeeded - case object GetChildrenSnapshots + final case class GetChildrenSnapshots(timeout: FiniteDuration) final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot]) extends DeadLetterSuppression with NoSerializationVerificationNeeded + private final case class CollectorCompleted(ref: ActorRef) /** Testing purpose */ case object GetChildren @@ -229,34 +224,78 @@ private[akka] class SubFusingActorMaterializerImpl( implicit val ec: ExecutionContextExecutor = context.dispatcher override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy + private var snapshotCollectors: Set[ActorRef] = Set.empty + def receive: Receive = { case Materialize(props, name) => val impl = context.actorOf(props, name) sender() ! impl case GetChildren => sender() ! Children(context.children.toSet) - case GetChildrenSnapshots => - takeSnapshotsOfChildren().map(ChildrenSnapshots.apply _).pipeTo(sender()) - + case GetChildrenSnapshots(timeout) => + val collector = + context.actorOf( + SnapshotCollector + .props(context.children.toSet -- snapshotCollectors, timeout, sender()) + .withDispatcher(context.props.dispatcher)) + context.watchWith(collector, CollectorCompleted(collector)) + snapshotCollectors += collector case StopChildren => context.children.foreach(context.stop) sender() ! StoppedChildren - } - - def takeSnapshotsOfChildren(): Future[immutable.Seq[StreamSnapshot]] = { - // Arbitrary timeout but should always be quick, the failure scenario is that - // the child/stream stopped, and we do retry below - implicit val timeout: Timeout = 1.second - def takeSnapshot() = { - val futureSnapshots = - context.children.toList.map(child => (child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot]) - Future.sequence(futureSnapshots) - } - - // If the timeout hits it is likely because one of the streams stopped between looking at the list - // of children and asking it for a snapshot. We retry the entire snapshot in that case - retry(() => takeSnapshot(), 3) + case CollectorCompleted(collector) => + snapshotCollectors -= collector } override def postStop(): Unit = haveShutDown.set(true) } + +@InternalApi +private[akka] object SnapshotCollector { + case object SnapshotTimeout + def props(streamActors: Set[ActorRef], timeout: FiniteDuration, replyTo: ActorRef): Props = + Props(new SnapshotCollector(streamActors, timeout, replyTo)) +} + +@InternalApi +private[akka] final class SnapshotCollector(streamActors: Set[ActorRef], timeout: FiniteDuration, replyTo: ActorRef) + extends Actor + with Timers { + + import SnapshotCollector._ + + var leftToRespond = streamActors + var collected: List[StreamSnapshot] = Nil + if (streamActors.isEmpty) { + replyTo ! StatusReply.Success(StreamSupervisor.ChildrenSnapshots(Nil)) + context.stop(self) + } else { + streamActors.foreach { ref => + context.watch(ref) + ref ! ActorGraphInterpreter.Snapshot + } + } + + timers.startSingleTimer(SnapshotTimeout, SnapshotTimeout, timeout) + + override def receive: Receive = { + case snap: StreamSnapshot => + collected = snap :: collected + leftToRespond -= sender() + completeIfDone() + case Terminated(streamActor) => + leftToRespond -= streamActor + completeIfDone() + case SnapshotTimeout => + replyTo ! StatusReply.Error( + s"Didn't get replies from all stream actors within the timeout of ${timeout.toMillis} ms") + context.stop(self) + } + + def completeIfDone(): Unit = { + if (leftToRespond.isEmpty) { + replyTo ! StatusReply.Success(StreamSupervisor.ChildrenSnapshots(collected)) + context.stop(self) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala index 1166c5dd31..e861a622fc 100644 --- a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala +++ b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala @@ -56,7 +56,10 @@ object MaterializerState { implicit ec: ExecutionContext): Future[immutable.Seq[StreamSnapshot]] = { // Arbitrary timeout: operation should always be quick, when it times out it will be because the materializer stopped implicit val timeout: Timeout = 10.seconds - (supervisor ? StreamSupervisor.GetChildrenSnapshots).mapTo[StreamSupervisor.ChildrenSnapshots].map(_.seq) + supervisor + .askWithStatus(StreamSupervisor.GetChildrenSnapshots(timeout.duration)) + .mapTo[StreamSupervisor.ChildrenSnapshots] + .map(_.seq) } /** INTERNAL API */