new queue Source for remote sends

* new SendQueue Source based on agrona ManyToOneConcurrentArrayQueue
* jmh benchmark for send queue
* JMH benchmark for Source.queue, Source.actorRef and the new SendQueue
* inject the queue so that we can start sending to it before materialization
* Get rid of computeIfAbsent in the AssociationRegistry
  by making it possible to send (enque) messages to the
  Association instance immediatly after construction.
This commit is contained in:
Patrik Nordwall 2016-05-29 22:15:48 +02:00
parent b45e7dd51c
commit d236b8e152
7 changed files with 552 additions and 50 deletions

View file

@ -113,7 +113,8 @@ class CodecBenchmark {
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@ -131,7 +132,8 @@ class CodecBenchmark {
.map(envelope => envelopePool.release(envelope))
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@ -164,7 +166,8 @@ class CodecBenchmark {
.via(decoder)
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
@Benchmark
@ -195,7 +198,8 @@ class CodecBenchmark {
.via(decoder)
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
}
}

View file

@ -4,6 +4,7 @@
package akka.remote.artery
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import akka.stream.Attributes
import akka.stream.Inlet
@ -34,3 +35,29 @@ class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[S
setHandler(in, this)
}
}
class BarrierSink(countDownAfter: Int, latch: CountDownLatch, barrierAfter: Int, barrier: CyclicBarrier)
extends GraphStage[SinkShape[Any]] {
val in: Inlet[Any] = Inlet("BarrierSink")
override val shape: SinkShape[Any] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
var n = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
n += 1
grab(in)
if (n == countDownAfter)
latch.countDown()
else if (n % barrierAfter == 0)
barrier.await()
pull(in)
}
setHandler(in, this)
}
}

View file

@ -0,0 +1,138 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import scala.concurrent.Lock
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import java.util.concurrent.Semaphore
import akka.stream.OverflowStrategy
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.CountDownLatch
import akka.stream.KillSwitches
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 4)
@Measurement(iterations = 10)
class SendQueueBenchmark {
val config = ConfigFactory.parseString(
"""
""")
implicit val system = ActorSystem("SendQueueBenchmark", config)
var materializer: ActorMaterializer = _
@Setup
def setup(): Unit = {
val settings = ActorMaterializerSettings(system)
materializer = ActorMaterializer(settings)
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def queue(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val source = Source.queue[Int](1024, OverflowStrategy.dropBuffer)
val (queue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
var n = 1
while (n <= N) {
queue.offer(n)
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
@Benchmark
@OperationsPerInvocation(100000)
def actorRef(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val source = Source.actorRef(1024, OverflowStrategy.dropBuffer)
val (ref, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
var n = 1
while (n <= N) {
ref ! n
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
@Benchmark
@OperationsPerInvocation(100000)
def sendQueue(): Unit = {
val latch = new CountDownLatch(1)
val barrier = new CyclicBarrier(2)
val N = 100000
val burstSize = 1000
val queue = new ManyToOneConcurrentArrayQueue[Int](1024)
val source = Source.fromGraph(new SendQueue[Int])
val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)
sendQueue.inject(queue)
var n = 1
while (n <= N) {
if (!sendQueue.offer(n))
println(s"offer failed $n") // should not happen
if (n % burstSize == 0 && n < N) {
barrier.await()
}
n += 1
}
if (!latch.await(30, TimeUnit.SECONDS))
throw new RuntimeException("Latch didn't complete in time")
killSwitch.shutdown()
}
}

View file

@ -464,7 +464,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// InboundContext
override def sendControl(to: Address, message: ControlMessage) =
association(to).outboundControlIngress.sendControlMessage(message)
association(to).sendControl(message)
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
val cached = recipient.cachedAssociation

View file

@ -3,18 +3,18 @@
*/
package akka.remote.artery
import java.util.Queue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.function.{ Function JFunction }
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import akka.{ Done, NotUsed }
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
@ -31,11 +31,19 @@ import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
import akka.stream.AbruptTerminationException
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceQueueWithComplete
import akka.util.{ Unsafe, WildcardTree }
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
/**
* INTERNAL API
*/
private[remote] object Association {
final case class QueueWrapper(queue: Queue[Send]) extends SendQueue.ProducerApi[Send] {
override def offer(message: Send): Boolean = queue.offer(message)
}
}
/**
* INTERNAL API
@ -43,25 +51,37 @@ import akka.util.{ Unsafe, WildcardTree }
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
private[akka] class Association(
private[remote] class Association(
val transport: ArteryTransport,
val materializer: Materializer,
override val remoteAddress: Address,
override val controlSubject: ControlMessageSubject,
largeMessageDestinations: WildcardTree[NotUsed])
extends AbstractAssociation with OutboundContext {
import Association._
private val log = Logging(transport.system, getClass.getName)
private val controlQueueSize = transport.provider.remoteSettings.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var largeQueue: SourceQueueWithComplete[Send] = _
@volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
// start sending (enqueuing) to the Association immediate after construction.
def createQueue(capacity: Int): Queue[Send] =
new ManyToOneConcurrentArrayQueue[Send](capacity)
@volatile private[this] var queue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(queueSize))
@volatile private[this] var largeQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(largeQueueSize))
@volatile private[this] var controlQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(controlQueueSize))
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
@volatile private[this] var materializing = new CountDownLatch(1)
@ -137,17 +157,20 @@ private[akka] class Association(
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match {
case _: SystemMessage | ClearSystemMessageDelivery
implicit val ec = materializer.executionContext
controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
case e
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
val send = Send(message, senderOption, recipient, None)
if (!controlQueue.offer(send)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
transport.system.deadLetters ! send
}
case _
val send = Send(message, senderOption, recipient, None)
if (largeMessageChannelEnabled && isLargeMessageDestination(recipient))
largeQueue.offer(send)
else
queue.offer(send)
val offerOk =
if (largeMessageChannelEnabled && isLargeMessageDestination(recipient))
largeQueue.offer(send)
else
queue.offer(send)
if (!offerOk)
transport.system.deadLetters ! send
}
} else if (log.isDebugEnabled)
log.debug("Dropping message to quarantined system {}", remoteAddress)
@ -218,17 +241,23 @@ private[akka] class Association(
}
// Idempotent
/**
* Called once after construction when the `Association` instance
* wins the CAS in the `AssociationRegistry`. It will materialize
* the streams. It is possible to sending (enqueuing) to the association
* before this method is called.
*/
def associate(): Unit = {
if (controlQueue eq null) {
// it's important to materialize the outboundControl stream first,
// so that outboundControlIngress is ready when stages for all streams start
runOutboundControlStream()
runOutboundOrdinaryMessagesStream()
if (!controlQueue.isInstanceOf[QueueWrapper])
throw new IllegalStateException("associate() must only be called once")
if (largeMessageChannelEnabled) {
runOutboundLargeMessagesStream()
}
// it's important to materialize the outboundControl stream first,
// so that outboundControlIngress is ready when stages for all streams start
runOutboundControlStream()
runOutboundOrdinaryMessagesStream()
if (largeMessageChannelEnabled) {
runOutboundLargeMessagesStream()
}
}
@ -236,10 +265,15 @@ private[akka] class Association(
// stage in the control stream may access the outboundControlIngress before returned here
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
materializing = new CountDownLatch(1)
val (q, (control, completed)) = Source.queue(controlQueueSize, OverflowStrategy.backpressure)
val wrapper = getOrCreateQueueWrapper(controlQueue, queueSize)
controlQueue = wrapper // use new underlying queue immediately for restarts
val (queueValue, (control, completed)) = Source.fromGraph(new SendQueue[Send])
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
controlQueue = q
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
controlQueue = queueValue
_outboundControlIngress = control
materializing.countDown()
attachStreamRestart("Outbound control stream", completed, cause {
@ -251,19 +285,35 @@ private[akka] class Association(
})
}
private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[Send], capacity: Int): QueueWrapper =
q match {
case existing: QueueWrapper existing
case _
// use new queue for restarts
QueueWrapper(createQueue(capacity))
}
private def runOutboundOrdinaryMessagesStream(): Unit = {
val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer)
val wrapper = getOrCreateQueueWrapper(queue, queueSize)
queue = wrapper // use new underlying queue immediately for restarts
val (queueValue, completed) = Source.fromGraph(new SendQueue[Send])
.toMat(transport.outbound(this))(Keep.both)
.run()(materializer)
queue = q
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queue = queueValue
attachStreamRestart("Outbound message stream", completed, _ runOutboundOrdinaryMessagesStream())
}
private def runOutboundLargeMessagesStream(): Unit = {
val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer)
val wrapper = getOrCreateQueueWrapper(queue, largeQueueSize)
largeQueue = wrapper // use new underlying queue immediately for restarts
val (queueValue, completed) = Source.fromGraph(new SendQueue[Send])
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
largeQueue = q
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
largeQueue = queueValue
attachStreamRestart("Outbound large message stream", completed, _ runOutboundLargeMessagesStream())
}
@ -297,21 +347,21 @@ private[akka] class Association(
* INTERNAL API
*/
private[remote] class AssociationRegistry(createAssociation: Address Association) {
// FIXME: This does locking on putIfAbsent, we need something smarter
private[this] val associationsByAddress = new ConcurrentHashMap[Address, Association]()
private[this] val associationsByUid = new ConcurrentHashMap[Long, Association]()
private[this] val associationsByAddress = new AtomicReference[Map[Address, Association]](Map.empty)
private[this] val associationsByUid = new ConcurrentHashMap[Long, Association]() // FIXME replace with specialized Long Map
def association(remoteAddress: Address): Association = {
val current = associationsByAddress.get(remoteAddress)
if (current ne null) current
else {
associationsByAddress.computeIfAbsent(remoteAddress, new JFunction[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = createAssociation(remoteAddress)
newAssociation.associate() // This is a bit costly for this blocking method :(
@tailrec final def association(remoteAddress: Address): Association = {
val currentMap = associationsByAddress.get
currentMap.get(remoteAddress) match {
case Some(existing) existing
case None
val newAssociation = createAssociation(remoteAddress)
val newMap = currentMap.updated(remoteAddress, newAssociation)
if (associationsByAddress.compareAndSet(currentMap, newMap)) {
newAssociation.associate() // start it, only once
newAssociation
}
})
} else
association(remoteAddress) // lost CAS, retry
}
}

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.Queue
import akka.stream.stage.GraphStage
import akka.stream.stage.OutHandler
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.stage.GraphStageLogic
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
import akka.stream.stage.GraphStageWithMaterializedValue
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueueTail
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.util.Try
import scala.util.Success
import scala.util.Failure
/**
* INTERNAL API
*/
private[remote] object SendQueue {
trait ProducerApi[T] {
def offer(message: T): Boolean
}
trait QueueValue[T] extends ProducerApi[T] {
def inject(queue: Queue[T]): Unit
}
private trait WakeupSignal {
def wakeup(): Unit
}
}
/**
* INTERNAL API
*/
private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] {
import SendQueue._
val out: Outlet[T] = Outlet("SendQueue.out")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, QueueValue[T]) = {
@volatile var needWakeup = false
val queuePromise = Promise[Queue[T]]()
val logic = new GraphStageLogic(shape) with OutHandler with WakeupSignal {
// using a local field for the consumer side of queue to avoid volatile access
private var consumerQueue: Queue[T] = null
private val wakeupCallback = getAsyncCallback[Unit] { _
if (isAvailable(out))
tryPush()
}
override def preStart(): Unit = {
implicit val ec = materializer.executionContext
queuePromise.future.onComplete(getAsyncCallback[Try[Queue[T]]] {
case Success(q)
consumerQueue = q
needWakeup = true
if (isAvailable(out))
tryPush()
case Failure(e)
failStage(e)
}.invoke)
}
override def onPull(): Unit = {
if (consumerQueue ne null)
tryPush()
}
@tailrec private def tryPush(firstAttempt: Boolean = true): Unit = {
consumerQueue.poll() match {
case null
needWakeup = true
// additional poll() to grab any elements that might missed the needWakeup
// and have been enqueued just after it
if (firstAttempt)
tryPush(firstAttempt = false)
case elem
needWakeup = false // there will be another onPull
push(out, elem)
}
}
// external call
override def wakeup(): Unit = {
wakeupCallback.invoke(())
}
override def postStop(): Unit = {
if (consumerQueue ne null)
consumerQueue.clear()
super.postStop()
}
setHandler(out, this)
}
val queueValue = new QueueValue[T] {
@volatile private var producerQueue: Queue[T] = null
override def inject(q: Queue[T]): Unit = {
producerQueue = q
queuePromise.success(q)
}
override def offer(message: T): Boolean = {
val q = producerQueue
if (q eq null) throw new IllegalStateException("offer not allowed before injecting the queue")
val result = q.offer(message)
if (result && needWakeup)
logic.wakeup()
result
}
}
(logic, queueValue)
}
}

View file

@ -0,0 +1,152 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.Queue
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.Props
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
object SendQueueSpec {
case class ProduceToQueue(from: Int, until: Int, queue: Queue[Msg])
case class ProduceToQueueValue(from: Int, until: Int, queue: SendQueue.QueueValue[Msg])
case class Msg(fromProducer: String, value: Int)
def producerProps(producerId: String): Props =
Props(new Producer(producerId))
class Producer(producerId: String) extends Actor {
def receive = {
case ProduceToQueue(from, until, queue)
var i = from
while (i < until) {
if (!queue.offer(Msg(producerId, i)))
throw new IllegalStateException(s"offer failed from $producerId value $i")
i += 1
}
case ProduceToQueueValue(from, until, queue)
var i = from
while (i < until) {
if (!queue.offer(Msg(producerId, i)))
throw new IllegalStateException(s"offer failed from $producerId value $i")
i += 1
}
}
}
}
class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender {
import SendQueueSpec._
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
"SendQueue" must {
"deliver all messages" in {
val queue = new ManyToOneConcurrentArrayQueue[String](128)
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
sendQueue.inject(queue)
sendQueue.offer("a")
sendQueue.offer("b")
sendQueue.offer("c")
downstream.expectNext("a")
downstream.expectNext("b")
downstream.expectNext("c")
downstream.cancel()
}
"deliver messages enqueued before materialization" in {
val queue = new ManyToOneConcurrentArrayQueue[String](128)
queue.offer("a")
queue.offer("b")
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
downstream.expectNoMsg(200.millis)
sendQueue.inject(queue)
downstream.expectNext("a")
downstream.expectNext("b")
sendQueue.offer("c")
downstream.expectNext("c")
downstream.cancel()
}
"deliver bursts of messages" in {
// this test verifies that the wakeup signal is triggered correctly
val queue = new ManyToOneConcurrentArrayQueue[Int](128)
val burstSize = 100
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int])
.grouped(burstSize)
.async
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
sendQueue.inject(queue)
for (round 1 to 100000) {
for (n 1 to burstSize) {
if (!sendQueue.offer(round * 1000 + n))
fail(s"offer failed at round $round message $n")
}
downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList)
downstream.request(1)
}
downstream.cancel()
}
"support multiple producers" in {
val numberOfProducers = 5
val queue = new ManyToOneConcurrentArrayQueue[Msg](numberOfProducers * 512)
val producers = Vector.tabulate(numberOfProducers)(i system.actorOf(producerProps(s"producer-$i")))
// send 100 per producer before materializing
producers.foreach(_ ! ProduceToQueue(0, 100, queue))
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg])
.toMat(TestSink.probe)(Keep.both).run()
sendQueue.inject(queue)
producers.foreach(_ ! ProduceToQueueValue(100, 200, sendQueue))
// send 100 more per producer
downstream.request(producers.size * 200)
val msgByProducer = downstream.expectNextN(producers.size * 200).groupBy(_.fromProducer)
(0 until producers.size).foreach { i
msgByProducer(s"producer-$i").map(_.value) should ===(0 until 200)
}
// send 500 per producer
downstream.request(producers.size * 1000) // more than enough
producers.foreach(_ ! ProduceToQueueValue(200, 700, sendQueue))
val msgByProducer2 = downstream.expectNextN(producers.size * 500).groupBy(_.fromProducer)
(0 until producers.size).foreach { i
msgByProducer2(s"producer-$i").map(_.value) should ===(200 until 700)
}
downstream.cancel()
}
}
}