diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala index 5d1ebf9572..0d4afcf8be 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HubSpec.scala @@ -185,6 +185,40 @@ class HubSpec extends StreamSpec { } + "complete after draining control is invoked and all connected producers complete" in assertAllStagesStopped { + val downstream = TestSubscriber.probe[Int]() + val (sink, draining) = + MergeHub.sourceWithDraining[Int](16).take(20).toMat(Sink.fromSubscriber(downstream))(Keep.left).run() + Source(1 to 10).runWith(sink) + Source(11 to 20).runWith(sink) + + draining.drainAndComplete() + + downstream.request(20) + downstream.expectNextN(20).sorted should ===(1 to 20) + downstream.expectComplete() + } + + "immediately cancel new producers while draining" in assertAllStagesStopped { + val downstream = TestSubscriber.probe[Int]() + val (sink, draining) = + MergeHub.sourceWithDraining[Int](16).take(20).toMat(Sink.fromSubscriber(downstream))(Keep.left).run() + Source(1 to 10).runWith(sink) + Source(11 to 20).runWith(sink) + draining.drainAndComplete() + + downstream.request(10) + downstream.expectNextN(10).sorted should ===(1 to 10) + + val upstream = TestPublisher.probe[Int]() + Source.fromPublisher(upstream).runWith(sink) + upstream.expectCancellation() + + downstream.request(10) + downstream.expectNextN(10).sorted should ===(11 to 20) + + downstream.expectComplete() + } } "BroadcastHub" must { diff --git a/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/30059-mergehub-draining-support.excludes b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/30059-mergehub-draining-support.excludes new file mode 100644 index 0000000000..1f5f77fb5a --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.13.backwards.excludes/30059-mergehub-draining-support.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub.this") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.MergeHub.createLogicAndMaterializedValue") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala index 61c54ab5fb..2b7eef93d2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -5,7 +5,6 @@ package akka.stream.javadsl import java.util.function.{ BiFunction, Supplier, ToLongBiFunction } - import akka.NotUsed import akka.annotation.DoNotInherit import akka.util.unused @@ -19,6 +18,26 @@ import akka.util.unused */ object MergeHub { + /** + * A DrainingControl object is created during the materialization of a MergeHub and allows to initiate the draining + * and eventual completion of the Hub from the outside. + * + * Not for user extension + */ + @DoNotInherit + sealed trait DrainingControl { + + /** + * Set the operation mode of the linked MergeHub to draining. In this mode the Hub will cancel any new producer and + * will complete as soon as all the currently connected producers complete. + */ + def drainAndComplete(): Unit + } + + private final class DrainingControlImpl(c: akka.stream.scaladsl.MergeHub.DrainingControl) extends DrainingControl { + override def drainAndComplete(): Unit = c.drainAndComplete() + } + /** * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized @@ -37,6 +56,35 @@ object MergeHub { akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize).mapMaterializedValue(_.asJava[T]).asJava } + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrarily many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * The materialized [[DrainingControl]] can be used to drain the Hub: any new produces using the [[Sink]] will be cancelled + * and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete. + * + * @param clazz Type of elements this hub emits and consumes + * @param perProducerBufferSize Buffer space used per producer. Default value is 16. + */ + def withDraining[T]( + @unused clazz: Class[T], + perProducerBufferSize: Int): Source[T, akka.japi.Pair[Sink[T, NotUsed], DrainingControl]] = { + akka.stream.scaladsl.MergeHub + .sourceWithDraining[T](perProducerBufferSize) + .mapMaterializedValue { + case (sink, draining) => + akka.japi.Pair(sink.asJava[T], new DrainingControlImpl(draining): DrainingControl) + } + .asJava + } + /** * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized @@ -52,6 +100,25 @@ object MergeHub { */ def of[T](clazz: Class[T]): Source[T, Sink[T, NotUsed]] = of(clazz, 16) + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrarily many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * The materialized [[DrainingControl]] can be used to drain the Hub: any new produces using the [[Sink]] will be cancelled + * and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete. + * + * @param clazz Type of elements this hub emits and consumes + */ + def withDraining[T](clazz: Class[T]): Source[T, akka.japi.Pair[Sink[T, NotUsed], DrainingControl]] = + withDraining(clazz, 16) + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index 3775e08503..ed7ce7c477 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -9,14 +9,12 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReferenceArray - import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.Queue import scala.collection.mutable.LongMap import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } - import akka.NotUsed import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -35,6 +33,20 @@ import akka.stream.stage._ object MergeHub { private val Cancel = -1 + /** + * A DrainingControl object is created during the materialization of a MergeHub and allows to initiate the draining + * and eventual completion of the Hub from the outside. + */ + sealed trait DrainingControl { + + /** + * Set the operation mode of the linked MergeHub to draining. In this mode the Hub will cancel any new producer and + * will complete as soon as all the currently connected producers complete. + * + */ + def drainAndComplete(): Unit + } + /** * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized @@ -49,7 +61,26 @@ object MergeHub { * @param perProducerBufferSize Buffer space used per producer. Default value is 16. */ def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = - Source.fromGraph(new MergeHub[T](perProducerBufferSize)) + Source.fromGraph(new MergeHub[T](perProducerBufferSize, false)).mapMaterializedValue(_._1) + + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * The materialized [[DrainingControl]] can be used to drain the Hub: any new producers using the [[Sink]] will be cancelled + * and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete. + * + * @param perProducerBufferSize Buffer space used per producer. Default value is 16. + */ + def sourceWithDraining[T](perProducerBufferSize: Int): Source[T, (Sink[T, NotUsed], DrainingControl)] = + Source.fromGraph(new MergeHub[T](perProducerBufferSize, true)) /** * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned @@ -64,14 +95,38 @@ object MergeHub { */ def source[T]: Source[T, Sink[T, NotUsed]] = source(perProducerBufferSize = 16) + /** + * Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned + * by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized + * arbitrary many times and each of the materializations will feed the elements into the original [[Source]]. + * + * Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own + * [[Sink]] for feeding that materialization. + * + * Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed + * and any new producers using the [[Sink]] will be cancelled. + * + * The materialized [[DrainingControl]] can be used to drain the Hub: any new producers using the [[Sink]] will be cancelled + * and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete. + */ + def sourceWithDraining[T](): Source[T, (Sink[T, NotUsed], DrainingControl)] = + sourceWithDraining(perProducerBufferSize = 16) + final class ProducerFailed(msg: String, cause: Throwable) extends RuntimeException(msg, cause) } /** * INTERNAL API */ -private[akka] class MergeHub[T](perProducerBufferSize: Int) - extends GraphStageWithMaterializedValue[SourceShape[T], Sink[T, NotUsed]] { +@InternalApi +private[akka] final class MergeHubDrainingControlImpl(drainAction: () => Unit) extends MergeHub.DrainingControl { + override def drainAndComplete(): Unit = { + drainAction() + } +} + +private[akka] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Boolean = false) + extends GraphStageWithMaterializedValue[SourceShape[T], (Sink[T, NotUsed], MergeHub.DrainingControl)] { require(perProducerBufferSize > 0, "Buffer size must be positive") val out: Outlet[T] = Outlet("MergeHub.out") @@ -114,6 +169,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) private val queue = new AbstractNodeQueue[Event] {} @volatile private[this] var needWakeup = false @volatile private[this] var shuttingDown = false + @volatile private[this] var draining = false private[this] val demands = scala.collection.mutable.LongMap.empty[InputState] private[this] val wakeupCallback = getAsyncCallback[NotUsed]( @@ -121,6 +177,16 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) // We are only allowed to dequeue if we are not backpressured. See comment in tryProcessNext() for details. if (isAvailable(out)) tryProcessNext(firstAttempt = true)) + private[MergeHub] val drainingCallback: Option[AsyncCallback[NotUsed]] = { + // Only create an async callback if the draining support is enabled in order to avoid book-keeping costs. + if (drainingEnabled) { + Some(getAsyncCallback[NotUsed] { _ => + draining = true + tryCompleteOnDraining() + }) + } else None + } + setHandler(out, this) // Returns true when we have not consumed demand, false otherwise @@ -134,9 +200,16 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) true case Deregister(id) => demands.remove(id) + if (drainingEnabled && draining) tryCompleteOnDraining() true } + private def tryCompleteOnDraining(): Unit = { + if (demands.isEmpty && (queue.peek() eq null)) { + completeStage() + } + } + override def onPull(): Unit = tryProcessNext(firstAttempt = true) @tailrec private def tryProcessNext(firstAttempt: Boolean): Unit = { @@ -157,10 +230,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) // and have been enqueued just after it if (firstAttempt) tryProcessNext(firstAttempt = false) + else if (drainingEnabled && draining) + tryCompleteOnDraining() } } def isShuttingDown: Boolean = shuttingDown + def isDraining: Boolean = drainingEnabled && draining // External API def enqueue(ev: Event): Unit = { @@ -220,7 +296,8 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) } } - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Sink[T, NotUsed]) = { + override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes): (GraphStageLogic, (Sink[T, NotUsed], MergeHub.DrainingControl)) = { val idCounter = new AtomicLong() val logic: MergedSourceLogic = new MergedSourceLogic(shape) @@ -237,7 +314,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) private[this] val id = idCounter.getAndIncrement() override def preStart(): Unit = { - if (!logic.isShuttingDown) { + if (!logic.isDraining && !logic.isShuttingDown) { logic.enqueue(Register(id, getAsyncCallback(onDemand))) // At this point, we could be in the unfortunate situation that: @@ -294,7 +371,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) case None => Sink.fromGraph(sink) } - (logic, sinkWithAttributes) + val drainingAction = logic.drainingCallback match { + case Some(cbk) => () => cbk.invoke(NotUsed) + case None => () => throw new IllegalStateException("Draining control not enabled") + } + val drainingControl = new MergeHubDrainingControlImpl(drainingAction) + + (logic, (sinkWithAttributes, drainingControl)) } }