Add draining control support to MergeHub #30057

- MergeHub materialization now produces a DrainingControl object alongside the Sink
- The DrainingControl can be invoked to initiate the draining procedure
- No new producers can be connected to the Hub
- The Hub is completed as soon as all the currently connected producers complete
This commit is contained in:
Andrea Zito 2021-04-07 11:34:59 +02:00 committed by GitHub
parent 99ec6f9d82
commit 318b9614a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 195 additions and 9 deletions

View file

@ -185,6 +185,40 @@ class HubSpec extends StreamSpec {
}
"complete after draining control is invoked and all connected producers complete" in assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]()
val (sink, draining) =
MergeHub.sourceWithDraining[Int](16).take(20).toMat(Sink.fromSubscriber(downstream))(Keep.left).run()
Source(1 to 10).runWith(sink)
Source(11 to 20).runWith(sink)
draining.drainAndComplete()
downstream.request(20)
downstream.expectNextN(20).sorted should ===(1 to 20)
downstream.expectComplete()
}
"immediately cancel new producers while draining" in assertAllStagesStopped {
val downstream = TestSubscriber.probe[Int]()
val (sink, draining) =
MergeHub.sourceWithDraining[Int](16).take(20).toMat(Sink.fromSubscriber(downstream))(Keep.left).run()
Source(1 to 10).runWith(sink)
Source(11 to 20).runWith(sink)
draining.drainAndComplete()
downstream.request(10)
downstream.expectNextN(10).sorted should ===(1 to 10)
val upstream = TestPublisher.probe[Int]()
Source.fromPublisher(upstream).runWith(sink)
upstream.expectCancellation()
downstream.request(10)
downstream.expectNextN(10).sorted should ===(11 to 20)
downstream.expectComplete()
}
}
"BroadcastHub" must {

View file

@ -0,0 +1,2 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.MergeHub.createLogicAndMaterializedValue")

View file

@ -5,7 +5,6 @@
package akka.stream.javadsl
import java.util.function.{ BiFunction, Supplier, ToLongBiFunction }
import akka.NotUsed
import akka.annotation.DoNotInherit
import akka.util.unused
@ -19,6 +18,26 @@ import akka.util.unused
*/
object MergeHub {
/**
* A DrainingControl object is created during the materialization of a MergeHub and allows to initiate the draining
* and eventual completion of the Hub from the outside.
*
* Not for user extension
*/
@DoNotInherit
sealed trait DrainingControl {
/**
* Set the operation mode of the linked MergeHub to draining. In this mode the Hub will cancel any new producer and
* will complete as soon as all the currently connected producers complete.
*/
def drainAndComplete(): Unit
}
private final class DrainingControlImpl(c: akka.stream.scaladsl.MergeHub.DrainingControl) extends DrainingControl {
override def drainAndComplete(): Unit = c.drainAndComplete()
}
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
@ -37,6 +56,35 @@ object MergeHub {
akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize).mapMaterializedValue(_.asJava[T]).asJava
}
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
* arbitrarily many times and each of the materializations will feed the elements into the original [[Source]].
*
* Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
* [[Sink]] for feeding that materialization.
*
* Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
* and any new producers using the [[Sink]] will be cancelled.
*
* The materialized [[DrainingControl]] can be used to drain the Hub: any new produces using the [[Sink]] will be cancelled
* and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete.
*
* @param clazz Type of elements this hub emits and consumes
* @param perProducerBufferSize Buffer space used per producer. Default value is 16.
*/
def withDraining[T](
@unused clazz: Class[T],
perProducerBufferSize: Int): Source[T, akka.japi.Pair[Sink[T, NotUsed], DrainingControl]] = {
akka.stream.scaladsl.MergeHub
.sourceWithDraining[T](perProducerBufferSize)
.mapMaterializedValue {
case (sink, draining) =>
akka.japi.Pair(sink.asJava[T], new DrainingControlImpl(draining): DrainingControl)
}
.asJava
}
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
@ -52,6 +100,25 @@ object MergeHub {
*/
def of[T](clazz: Class[T]): Source[T, Sink[T, NotUsed]] = of(clazz, 16)
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
* arbitrarily many times and each of the materializations will feed the elements into the original [[Source]].
*
* Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
* [[Sink]] for feeding that materialization.
*
* Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
* and any new producers using the [[Sink]] will be cancelled.
*
* The materialized [[DrainingControl]] can be used to drain the Hub: any new produces using the [[Sink]] will be cancelled
* and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete.
*
* @param clazz Type of elements this hub emits and consumes
*/
def withDraining[T](clazz: Class[T]): Source[T, akka.japi.Pair[Sink[T, NotUsed], DrainingControl]] =
withDraining(clazz, 16)
}
/**

View file

@ -9,14 +9,12 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReferenceArray
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.Queue
import scala.collection.mutable.LongMap
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
import akka.NotUsed
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
@ -35,6 +33,20 @@ import akka.stream.stage._
object MergeHub {
private val Cancel = -1
/**
* A DrainingControl object is created during the materialization of a MergeHub and allows to initiate the draining
* and eventual completion of the Hub from the outside.
*/
sealed trait DrainingControl {
/**
* Set the operation mode of the linked MergeHub to draining. In this mode the Hub will cancel any new producer and
* will complete as soon as all the currently connected producers complete.
*
*/
def drainAndComplete(): Unit
}
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
@ -49,7 +61,26 @@ object MergeHub {
* @param perProducerBufferSize Buffer space used per producer. Default value is 16.
*/
def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] =
Source.fromGraph(new MergeHub[T](perProducerBufferSize))
Source.fromGraph(new MergeHub[T](perProducerBufferSize, false)).mapMaterializedValue(_._1)
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
* arbitrary many times and each of the materializations will feed the elements into the original [[Source]].
*
* Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
* [[Sink]] for feeding that materialization.
*
* Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
* and any new producers using the [[Sink]] will be cancelled.
*
* The materialized [[DrainingControl]] can be used to drain the Hub: any new producers using the [[Sink]] will be cancelled
* and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete.
*
* @param perProducerBufferSize Buffer space used per producer. Default value is 16.
*/
def sourceWithDraining[T](perProducerBufferSize: Int): Source[T, (Sink[T, NotUsed], DrainingControl)] =
Source.fromGraph(new MergeHub[T](perProducerBufferSize, true))
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
@ -64,14 +95,38 @@ object MergeHub {
*/
def source[T]: Source[T, Sink[T, NotUsed]] = source(perProducerBufferSize = 16)
/**
* Creates a [[Source]] that emits elements merged from a dynamic set of producers. After the [[Source]] returned
* by this method is materialized, it returns a [[Sink]] as a materialized value. This [[Sink]] can be materialized
* arbitrary many times and each of the materializations will feed the elements into the original [[Source]].
*
* Every new materialization of the [[Source]] results in a new, independent hub, which materializes to its own
* [[Sink]] for feeding that materialization.
*
* Completed or failed [[Sink]]s are simply removed. Once the [[Source]] is cancelled, the Hub is considered closed
* and any new producers using the [[Sink]] will be cancelled.
*
* The materialized [[DrainingControl]] can be used to drain the Hub: any new producers using the [[Sink]] will be cancelled
* and the Hub will be closed completing the [[Source]] as soon as all currently connected producers complete.
*/
def sourceWithDraining[T](): Source[T, (Sink[T, NotUsed], DrainingControl)] =
sourceWithDraining(perProducerBufferSize = 16)
final class ProducerFailed(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
}
/**
* INTERNAL API
*/
private[akka] class MergeHub[T](perProducerBufferSize: Int)
extends GraphStageWithMaterializedValue[SourceShape[T], Sink[T, NotUsed]] {
@InternalApi
private[akka] final class MergeHubDrainingControlImpl(drainAction: () => Unit) extends MergeHub.DrainingControl {
override def drainAndComplete(): Unit = {
drainAction()
}
}
private[akka] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Boolean = false)
extends GraphStageWithMaterializedValue[SourceShape[T], (Sink[T, NotUsed], MergeHub.DrainingControl)] {
require(perProducerBufferSize > 0, "Buffer size must be positive")
val out: Outlet[T] = Outlet("MergeHub.out")
@ -114,6 +169,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
private val queue = new AbstractNodeQueue[Event] {}
@volatile private[this] var needWakeup = false
@volatile private[this] var shuttingDown = false
@volatile private[this] var draining = false
private[this] val demands = scala.collection.mutable.LongMap.empty[InputState]
private[this] val wakeupCallback = getAsyncCallback[NotUsed](
@ -121,6 +177,16 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
// We are only allowed to dequeue if we are not backpressured. See comment in tryProcessNext() for details.
if (isAvailable(out)) tryProcessNext(firstAttempt = true))
private[MergeHub] val drainingCallback: Option[AsyncCallback[NotUsed]] = {
// Only create an async callback if the draining support is enabled in order to avoid book-keeping costs.
if (drainingEnabled) {
Some(getAsyncCallback[NotUsed] { _ =>
draining = true
tryCompleteOnDraining()
})
} else None
}
setHandler(out, this)
// Returns true when we have not consumed demand, false otherwise
@ -134,9 +200,16 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
true
case Deregister(id) =>
demands.remove(id)
if (drainingEnabled && draining) tryCompleteOnDraining()
true
}
private def tryCompleteOnDraining(): Unit = {
if (demands.isEmpty && (queue.peek() eq null)) {
completeStage()
}
}
override def onPull(): Unit = tryProcessNext(firstAttempt = true)
@tailrec private def tryProcessNext(firstAttempt: Boolean): Unit = {
@ -157,10 +230,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
// and have been enqueued just after it
if (firstAttempt)
tryProcessNext(firstAttempt = false)
else if (drainingEnabled && draining)
tryCompleteOnDraining()
}
}
def isShuttingDown: Boolean = shuttingDown
def isDraining: Boolean = drainingEnabled && draining
// External API
def enqueue(ev: Event): Unit = {
@ -220,7 +296,8 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
}
}
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Sink[T, NotUsed]) = {
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes): (GraphStageLogic, (Sink[T, NotUsed], MergeHub.DrainingControl)) = {
val idCounter = new AtomicLong()
val logic: MergedSourceLogic = new MergedSourceLogic(shape)
@ -237,7 +314,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
private[this] val id = idCounter.getAndIncrement()
override def preStart(): Unit = {
if (!logic.isShuttingDown) {
if (!logic.isDraining && !logic.isShuttingDown) {
logic.enqueue(Register(id, getAsyncCallback(onDemand)))
// At this point, we could be in the unfortunate situation that:
@ -294,7 +371,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int)
case None => Sink.fromGraph(sink)
}
(logic, sinkWithAttributes)
val drainingAction = logic.drainingCallback match {
case Some(cbk) => () => cbk.invoke(NotUsed)
case None => () => throw new IllegalStateException("Draining control not enabled")
}
val drainingControl = new MergeHubDrainingControlImpl(drainingAction)
(logic, (sinkWithAttributes, drainingControl))
}
}