+str Add startAfterNrOfConsumers to BroadcastHub.

This commit is contained in:
He-Pin 2023-03-27 20:16:09 +08:00 committed by kerr
parent 04332a6a86
commit 5b97885623
3 changed files with 166 additions and 46 deletions

View file

@ -305,6 +305,52 @@ class HubSpec extends StreamSpec {
f2.futureValue should ===(1 to 10) f2.futureValue should ===(1 to 10)
} }
"broadcast elements to downstream after at least one subscriber" in {
val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](1, 256))
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
Await.result(resultOne, 1.second) should be(1 to 10) // fails
}
"broadcast all elements to all consumers" in {
val sourceQueue = Source.queue[Int](10) // used to block the source until we say so
val (queue, broadcast) = sourceQueue.toMat(BroadcastHub.sink(2, 256))(Keep.both).run()
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
for (i <- 1 to 5) {
queue.offer(i)
}
val resultTwo = broadcast.runWith(Sink.seq)
for (i <- 6 to 10) {
queue.offer(i)
}
queue.complete() // only now is the source emptied
Await.result(resultOne, 1.second) should be(1 to 10)
Await.result(resultTwo, 1.second) should be(1 to 10)
}
"broadcast all elements to all consumers with hot upstream" in {
val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](2, 256))
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
val resultTwo = broadcast.runWith(Sink.seq)
Await.result(resultOne, 1.second) should be(1 to 10)
Await.result(resultTwo, 1.second) should be(1 to 10)
}
"broadcast all elements to all consumers with hot upstream even some subscriber unsubscribe" in {
val broadcast = Source(1 to 10).runWith(BroadcastHub.sink[Int](2, 256))
val sub = broadcast.runWith(TestSink.apply())
sub.request(1)
Thread.sleep(1000)
sub.cancel()
val resultOne = broadcast.runWith(Sink.seq) // nothing happening yet
val resultTwo = broadcast.runWith(Sink.seq) // nothing happening yet
Await.result(resultOne, 1.second) should be(1 to 10)
Await.result(resultTwo, 1.second) should be(1 to 10)
}
"send the same prefix to consumers attaching around the same time if one cancels earlier" in { "send the same prefix to consumers attaching around the same time if one cancels earlier" in {
val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 20)).toMat(BroadcastHub.sink(8))(Keep.both).run() val (firstElem, source) = Source.maybe[Int].concat(Source(2 to 20)).toMat(BroadcastHub.sink(8))(Keep.both).run()

View file

@ -164,6 +164,35 @@ object BroadcastHub {
pekko.stream.scaladsl.BroadcastHub.sink[T](bufferSize).mapMaterializedValue(_.asJava).asJava pekko.stream.scaladsl.BroadcastHub.sink[T](bufferSize).mapMaterializedValue(_.asJava).asJava
} }
/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
* value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
* broadcast elements from the original [[Sink]].
*
* Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
* [[Source]] for consuming the [[Sink]] of that materialization.
*
* If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
* [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
* all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
* materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
* cancelled are simply removed from the dynamic set of consumers.
*
* @param clazz Type of elements this hub emits and consumes
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the operator is starting up, i.e. it is not honored when consumers have
* been removed (canceled).
* @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
* concurrent consumers can be in terms of element. If the buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
* @since 1.1.0
*/
def of[T](@unused clazz: Class[T], startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = {
pekko.stream.scaladsl.BroadcastHub.sink[T](startAfterNrOfConsumers, bufferSize).mapMaterializedValue(
_.asJava).asJava
}
/** /**
* Creates a [[Sink]] with default buffer size 256 that receives elements from its upstream producer and broadcasts them to a dynamic set * Creates a [[Sink]] with default buffer size 256 that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized

View file

@ -424,6 +424,32 @@ object BroadcastHub {
*/ */
def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize)) def sink[T](bufferSize: Int): Sink[T, Source[T, NotUsed]] = Sink.fromGraph(new BroadcastHub[T](bufferSize))
/**
* Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
* value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
* broadcast elements from the original [[Sink]].
*
* Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
* [[Source]] for consuming the [[Sink]] of that materialization.
*
* If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
* [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
* all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
* materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
* cancelled are simply removed from the dynamic set of consumers.
*
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
* This is only used initially when the operator is starting up, i.e. it is not honored when consumers have
* been removed (canceled).
* @param bufferSize Buffer size used by the producer. Gives an upper bound on how "far" from each other two
* concurrent consumers can be in terms of element. If this buffer is full, the producer
* is backpressured. Must be a power of two and less than 4096.
* @since 1.1.0
*/
def sink[T](startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] =
Sink.fromGraph(new BroadcastHub[T](startAfterNrOfConsumers, bufferSize))
/** /**
* Creates a [[Sink]] with default buffer size 256 that receives elements from its upstream producer and broadcasts them to a dynamic set * Creates a [[Sink]] with default buffer size 256 that receives elements from its upstream producer and broadcasts them to a dynamic set
* of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
@ -446,11 +472,13 @@ object BroadcastHub {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[pekko] class BroadcastHub[T](bufferSize: Int) private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: Int)
extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] {
require(startAfterNrOfConsumers >= 0, "startAfterNrOfConsumers must >= 0")
require(bufferSize > 0, "Buffer size must be positive") require(bufferSize > 0, "Buffer size must be positive")
require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed") require(bufferSize < 4096, "Buffer size larger then 4095 is not allowed")
require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two") require((bufferSize & bufferSize - 1) == 0, "Buffer size must be a power of two")
def this(bufferSize: Int) = this(0, bufferSize)
private val Mask = bufferSize - 1 private val Mask = bufferSize - 1
private val WheelMask = (bufferSize * 2) - 1 private val WheelMask = (bufferSize * 2) - 1
@ -482,6 +510,7 @@ private[pekko] class BroadcastHub[T](bufferSize: Int)
private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise() private[this] val callbackPromise: Promise[AsyncCallback[HubEvent]] = Promise()
private[this] val noRegistrationsState = Open(callbackPromise.future, Nil) private[this] val noRegistrationsState = Open(callbackPromise.future, Nil)
val state = new AtomicReference[HubState](noRegistrationsState) val state = new AtomicReference[HubState](noRegistrationsState)
private var initialized = false
// Start from values that will almost immediately overflow. This has no effect on performance, any starting // Start from values that will almost immediately overflow. This has no effect on performance, any starting
// number will do, however, this protects from regressions as these values *almost surely* overflow and fail // number will do, however, this protects from regressions as these values *almost surely* overflow and fail
@ -511,7 +540,9 @@ private[pekko] class BroadcastHub[T](bufferSize: Int)
override def preStart(): Unit = { override def preStart(): Unit = {
setKeepGoing(true) setKeepGoing(true)
callbackPromise.success(getAsyncCallback[HubEvent](onEvent)) callbackPromise.success(getAsyncCallback[HubEvent](onEvent))
pull(in) if (startAfterNrOfConsumers == 0) {
pull(in)
}
} }
// Cannot complete immediately if there is no space in the queue to put the completion marker // Cannot complete immediately if there is no space in the queue to put the completion marker
@ -522,40 +553,14 @@ private[pekko] class BroadcastHub[T](bufferSize: Int)
if (!isFull) pull(in) if (!isFull) pull(in)
} }
private def tryPull(): Unit = {
if (initialized && !isClosed(in) && !hasBeenPulled(in) && !isFull) {
pull(in)
}
}
private def onEvent(ev: HubEvent): Unit = { private def onEvent(ev: HubEvent): Unit = {
ev match { ev match {
case RegistrationPending =>
state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer =>
val startFrom = head
activeConsumers += 1
addConsumer(consumer, startFrom)
// in case the consumer is already stopped we need to undo registration
implicit val ec = materializer.executionContext
consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach {
case _: StreamDetachedException =>
callbackPromise.future.foreach(callback =>
callback.invoke(UnRegister(consumer.id, startFrom, startFrom)))
case _ => ()
}
}
case UnRegister(id, previousOffset, finalOffset) =>
if (findAndRemoveConsumer(id, previousOffset) != null)
activeConsumers -= 1
if (activeConsumers == 0) {
if (isClosed(in)) completeStage()
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
queue(head & Mask) = null
head += 1
}
head = finalOffset
if (!hasBeenPulled(in)) pull(in)
}
} else checkUnblock(previousOffset)
case Advance(id, previousOffset) => case Advance(id, previousOffset) =>
val newOffset = previousOffset + DemandThreshold val newOffset = previousOffset + DemandThreshold
// Move the consumer from its last known offset to its new one. Check if we are unblocked. // Move the consumer from its last known offset to its new one. Check if we are unblocked.
@ -570,6 +575,43 @@ private[pekko] class BroadcastHub[T](bufferSize: Int)
// Also check if the consumer is now unblocked since we published an element since it went asleep. // Also check if the consumer is now unblocked since we published an element since it went asleep.
if (currentOffset != tail) consumer.callback.invoke(Wakeup) if (currentOffset != tail) consumer.callback.invoke(Wakeup)
checkUnblock(previousOffset) checkUnblock(previousOffset)
case RegistrationPending =>
state.getAndSet(noRegistrationsState).asInstanceOf[Open].registrations.foreach { consumer =>
val startFrom = head
activeConsumers += 1
addConsumer(consumer, startFrom)
// in case the consumer is already stopped we need to undo registration
implicit val ec = materializer.executionContext
consumer.callback.invokeWithFeedback(Initialize(startFrom)).failed.foreach {
case _: StreamDetachedException =>
callbackPromise.future.foreach(callback =>
callback.invoke(UnRegister(consumer.id, startFrom, startFrom)))
case _ => ()
}
}
if (activeConsumers >= startAfterNrOfConsumers) {
initialized = true
}
tryPull()
case UnRegister(id, previousOffset, finalOffset) =>
if (findAndRemoveConsumer(id, previousOffset) != null)
activeConsumers -= 1
if (activeConsumers == 0) {
if (isClosed(in)) completeStage()
else if (head != finalOffset) {
// If our final consumer goes away, we roll forward the buffer so a subsequent consumer does not
// see the already consumed elements. This feature is quite handy.
while (head != finalOffset) {
queue(head & Mask) = null
head += 1
}
head = finalOffset
tryPull()
}
} else checkUnblock(previousOffset)
} }
} }
@ -624,7 +666,7 @@ private[pekko] class BroadcastHub[T](bufferSize: Int)
private def checkUnblock(offsetOfConsumerRemoved: Int): Unit = { private def checkUnblock(offsetOfConsumerRemoved: Int): Unit = {
if (unblockIfPossible(offsetOfConsumerRemoved)) { if (unblockIfPossible(offsetOfConsumerRemoved)) {
if (isClosed(in)) complete() if (isClosed(in)) complete()
else if (!hasBeenPulled(in)) pull(in) else tryPull()
} }
} }
@ -1106,6 +1148,9 @@ object PartitionHub {
startAfterNrOfConsumers: Int, startAfterNrOfConsumers: Int,
bufferSize: Int) bufferSize: Int)
extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] { extends GraphStageWithMaterializedValue[SinkShape[T], Source[T, NotUsed]] {
require(partitioner != null, "partitioner must not be null")
require(startAfterNrOfConsumers >= 0, "startAfterNrOfConsumers must >= 0")
require(bufferSize > 0, "Buffer size must be positive")
import PartitionHub.ConsumerInfo import PartitionHub.ConsumerInfo
import PartitionHub.Internal._ import PartitionHub.Internal._
@ -1231,20 +1276,20 @@ object PartitionHub {
val newConsumers = (consumerInfo.consumers :+ consumer).sortBy(_.id) val newConsumers = (consumerInfo.consumers :+ consumer).sortBy(_.id)
consumerInfo = new ConsumerInfoImpl(newConsumers) consumerInfo = new ConsumerInfoImpl(newConsumers)
queue.init(consumer.id) queue.init(consumer.id)
if (newConsumers.size >= startAfterNrOfConsumers) {
initialized = true
}
consumer.callback.invoke(Initialize) consumer.callback.invoke(Initialize)
if (initialized && pending.nonEmpty) {
pending.foreach(publish)
pending = Vector.empty[T]
}
tryPull()
} }
if (consumerInfo.size >= startAfterNrOfConsumers) {
initialized = true
}
if (initialized && pending.nonEmpty) {
pending.foreach(publish)
pending = Vector.empty[T]
}
tryPull()
case UnRegister(id) => case UnRegister(id) =>
val newConsumers = consumerInfo.consumers.filterNot(_.id == id) val newConsumers = consumerInfo.consumers.filterNot(_.id == id)
consumerInfo = new ConsumerInfoImpl(newConsumers) consumerInfo = new ConsumerInfoImpl(newConsumers)