diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 8c09e80f39..82f22e1c89 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -113,7 +113,8 @@ class CodecBenchmark { Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .runWith(new LatchSink(N, latch))(materializer) - latch.await(30, TimeUnit.SECONDS) + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") } @Benchmark @@ -131,7 +132,8 @@ class CodecBenchmark { .map(envelope => envelopePool.release(envelope)) .runWith(new LatchSink(N, latch))(materializer) - latch.await(30, TimeUnit.SECONDS) + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") } @Benchmark @@ -164,7 +166,8 @@ class CodecBenchmark { .via(decoder) .runWith(new LatchSink(N, latch))(materializer) - latch.await(30, TimeUnit.SECONDS) + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") } @Benchmark @@ -195,7 +198,8 @@ class CodecBenchmark { .via(decoder) .runWith(new LatchSink(N, latch))(materializer) - latch.await(30, TimeUnit.SECONDS) + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") } } diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala index c60bead1ce..d66a6814d0 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala @@ -4,6 +4,7 @@ package akka.remote.artery import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier import akka.stream.Attributes import akka.stream.Inlet @@ -34,3 +35,29 @@ class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[S setHandler(in, this) } } + +class BarrierSink(countDownAfter: Int, latch: CountDownLatch, barrierAfter: Int, barrier: CyclicBarrier) + extends GraphStage[SinkShape[Any]] { + val in: Inlet[Any] = Inlet("BarrierSink") + override val shape: SinkShape[Any] = SinkShape(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler { + + var n = 0 + + override def preStart(): Unit = pull(in) + + override def onPush(): Unit = { + n += 1 + grab(in) + if (n == countDownAfter) + latch.countDown() + else if (n % barrierAfter == 0) + barrier.await() + pull(in) + } + + setHandler(in, this) + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala new file mode 100644 index 0000000000..f54ca42f0e --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.TimeUnit +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ +import scala.concurrent.Lock +import scala.util.Success +import akka.stream.impl.fusing.GraphStages +import org.reactivestreams._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import java.util.concurrent.Semaphore +import akka.stream.OverflowStrategy +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.CountDownLatch +import akka.stream.KillSwitches +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +@Warmup(iterations = 4) +@Measurement(iterations = 10) +class SendQueueBenchmark { + + val config = ConfigFactory.parseString( + """ + """) + + implicit val system = ActorSystem("SendQueueBenchmark", config) + + var materializer: ActorMaterializer = _ + + @Setup + def setup(): Unit = { + val settings = ActorMaterializerSettings(system) + materializer = ActorMaterializer(settings) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(100000) + def queue(): Unit = { + val latch = new CountDownLatch(1) + val barrier = new CyclicBarrier(2) + val N = 100000 + val burstSize = 1000 + + val source = Source.queue[Int](1024, OverflowStrategy.dropBuffer) + + val (queue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both) + .toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer) + + var n = 1 + while (n <= N) { + queue.offer(n) + if (n % burstSize == 0 && n < N) { + barrier.await() + } + n += 1 + } + + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") + killSwitch.shutdown() + } + + @Benchmark + @OperationsPerInvocation(100000) + def actorRef(): Unit = { + val latch = new CountDownLatch(1) + val barrier = new CyclicBarrier(2) + val N = 100000 + val burstSize = 1000 + + val source = Source.actorRef(1024, OverflowStrategy.dropBuffer) + + val (ref, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both) + .toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer) + + var n = 1 + while (n <= N) { + ref ! n + if (n % burstSize == 0 && n < N) { + barrier.await() + } + n += 1 + } + + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") + killSwitch.shutdown() + } + + @Benchmark + @OperationsPerInvocation(100000) + def sendQueue(): Unit = { + val latch = new CountDownLatch(1) + val barrier = new CyclicBarrier(2) + val N = 100000 + val burstSize = 1000 + + val queue = new ManyToOneConcurrentArrayQueue[Int](1024) + val source = Source.fromGraph(new SendQueue[Int]) + + val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both) + .toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer) + sendQueue.inject(queue) + + var n = 1 + while (n <= N) { + if (!sendQueue.offer(n)) + println(s"offer failed $n") // should not happen + if (n % burstSize == 0 && n < N) { + barrier.await() + } + n += 1 + } + + if (!latch.await(30, TimeUnit.SECONDS)) + throw new RuntimeException("Latch didn't complete in time") + killSwitch.shutdown() + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index a5d5338395..db4322573d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -464,7 +464,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // InboundContext override def sendControl(to: Address, message: ControlMessage) = - association(to).outboundControlIngress.sendControlMessage(message) + association(to).sendControl(message) override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { val cached = recipient.cachedAssociation diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 2064c67cdd..38b1980a62 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -3,18 +3,18 @@ */ package akka.remote.artery +import java.util.Queue + import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.function.{ Function ⇒ JFunction } - +import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import scala.util.Success - import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage @@ -31,11 +31,19 @@ import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery import akka.stream.AbruptTerminationException import akka.stream.Materializer -import akka.stream.OverflowStrategy import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source -import akka.stream.scaladsl.SourceQueueWithComplete import akka.util.{ Unsafe, WildcardTree } +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue + +/** + * INTERNAL API + */ +private[remote] object Association { + final case class QueueWrapper(queue: Queue[Send]) extends SendQueue.ProducerApi[Send] { + override def offer(message: Send): Boolean = queue.offer(message) + } +} /** * INTERNAL API @@ -43,25 +51,37 @@ import akka.util.{ Unsafe, WildcardTree } * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific * remote address. */ -private[akka] class Association( +private[remote] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, override val controlSubject: ControlMessageSubject, largeMessageDestinations: WildcardTree[NotUsed]) extends AbstractAssociation with OutboundContext { + import Association._ private val log = Logging(transport.system, getClass.getName) private val controlQueueSize = transport.provider.remoteSettings.SysMsgBufferSize + // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue + // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption + private val queueSize = 3072 + private val largeQueueSize = 256 private val restartTimeout: FiniteDuration = 5.seconds // FIXME config private val maxRestarts = 5 // FIXME config private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) private val largeMessageChannelEnabled = largeMessageDestinations.children.nonEmpty - @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var largeQueue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _ + // We start with the raw wrapped queue and then it is replaced with the materialized value of + // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to + // start sending (enqueuing) to the Association immediate after construction. + + def createQueue(capacity: Int): Queue[Send] = + new ManyToOneConcurrentArrayQueue[Send](capacity) + + @volatile private[this] var queue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(queueSize)) + @volatile private[this] var largeQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(largeQueueSize)) + @volatile private[this] var controlQueue: SendQueue.ProducerApi[Send] = QueueWrapper(createQueue(controlQueueSize)) @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ @volatile private[this] var materializing = new CountDownLatch(1) @@ -137,17 +157,20 @@ private[akka] class Association( // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage | ClearSystemMessageDelivery ⇒ - implicit val ec = materializer.executionContext - controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure { - case e ⇒ - quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") + val send = Send(message, senderOption, recipient, None) + if (!controlQueue.offer(send)) { + quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") + transport.system.deadLetters ! send } case _ ⇒ val send = Send(message, senderOption, recipient, None) - if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) - largeQueue.offer(send) - else - queue.offer(send) + val offerOk = + if (largeMessageChannelEnabled && isLargeMessageDestination(recipient)) + largeQueue.offer(send) + else + queue.offer(send) + if (!offerOk) + transport.system.deadLetters ! send } } else if (log.isDebugEnabled) log.debug("Dropping message to quarantined system {}", remoteAddress) @@ -218,17 +241,23 @@ private[akka] class Association( } - // Idempotent + /** + * Called once after construction when the `Association` instance + * wins the CAS in the `AssociationRegistry`. It will materialize + * the streams. It is possible to sending (enqueuing) to the association + * before this method is called. + */ def associate(): Unit = { - if (controlQueue eq null) { - // it's important to materialize the outboundControl stream first, - // so that outboundControlIngress is ready when stages for all streams start - runOutboundControlStream() - runOutboundOrdinaryMessagesStream() + if (!controlQueue.isInstanceOf[QueueWrapper]) + throw new IllegalStateException("associate() must only be called once") - if (largeMessageChannelEnabled) { - runOutboundLargeMessagesStream() - } + // it's important to materialize the outboundControl stream first, + // so that outboundControlIngress is ready when stages for all streams start + runOutboundControlStream() + runOutboundOrdinaryMessagesStream() + + if (largeMessageChannelEnabled) { + runOutboundLargeMessagesStream() } } @@ -236,10 +265,15 @@ private[akka] class Association( // stage in the control stream may access the outboundControlIngress before returned here // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress materializing = new CountDownLatch(1) - val (q, (control, completed)) = Source.queue(controlQueueSize, OverflowStrategy.backpressure) + + val wrapper = getOrCreateQueueWrapper(controlQueue, queueSize) + controlQueue = wrapper // use new underlying queue immediately for restarts + val (queueValue, (control, completed)) = Source.fromGraph(new SendQueue[Send]) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) - controlQueue = q + queueValue.inject(wrapper.queue) + // replace with the materialized value, still same underlying queue + controlQueue = queueValue _outboundControlIngress = control materializing.countDown() attachStreamRestart("Outbound control stream", completed, cause ⇒ { @@ -251,19 +285,35 @@ private[akka] class Association( }) } + private def getOrCreateQueueWrapper(q: SendQueue.ProducerApi[Send], capacity: Int): QueueWrapper = + q match { + case existing: QueueWrapper ⇒ existing + case _ ⇒ + // use new queue for restarts + QueueWrapper(createQueue(capacity)) + } + private def runOutboundOrdinaryMessagesStream(): Unit = { - val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer) + val wrapper = getOrCreateQueueWrapper(queue, queueSize) + queue = wrapper // use new underlying queue immediately for restarts + val (queueValue, completed) = Source.fromGraph(new SendQueue[Send]) .toMat(transport.outbound(this))(Keep.both) .run()(materializer) - queue = q + queueValue.inject(wrapper.queue) + // replace with the materialized value, still same underlying queue + queue = queueValue attachStreamRestart("Outbound message stream", completed, _ ⇒ runOutboundOrdinaryMessagesStream()) } private def runOutboundLargeMessagesStream(): Unit = { - val (q, completed) = Source.queue(256, OverflowStrategy.dropBuffer) + val wrapper = getOrCreateQueueWrapper(queue, largeQueueSize) + largeQueue = wrapper // use new underlying queue immediately for restarts + val (queueValue, completed) = Source.fromGraph(new SendQueue[Send]) .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) - largeQueue = q + queueValue.inject(wrapper.queue) + // replace with the materialized value, still same underlying queue + largeQueue = queueValue attachStreamRestart("Outbound large message stream", completed, _ ⇒ runOutboundLargeMessagesStream()) } @@ -297,21 +347,21 @@ private[akka] class Association( * INTERNAL API */ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Association) { - // FIXME: This does locking on putIfAbsent, we need something smarter - private[this] val associationsByAddress = new ConcurrentHashMap[Address, Association]() - private[this] val associationsByUid = new ConcurrentHashMap[Long, Association]() + private[this] val associationsByAddress = new AtomicReference[Map[Address, Association]](Map.empty) + private[this] val associationsByUid = new ConcurrentHashMap[Long, Association]() // FIXME replace with specialized Long Map - def association(remoteAddress: Address): Association = { - val current = associationsByAddress.get(remoteAddress) - if (current ne null) current - else { - associationsByAddress.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { - override def apply(remoteAddress: Address): Association = { - val newAssociation = createAssociation(remoteAddress) - newAssociation.associate() // This is a bit costly for this blocking method :( + @tailrec final def association(remoteAddress: Address): Association = { + val currentMap = associationsByAddress.get + currentMap.get(remoteAddress) match { + case Some(existing) ⇒ existing + case None ⇒ + val newAssociation = createAssociation(remoteAddress) + val newMap = currentMap.updated(remoteAddress, newAssociation) + if (associationsByAddress.compareAndSet(currentMap, newMap)) { + newAssociation.associate() // start it, only once newAssociation - } - }) + } else + association(remoteAddress) // lost CAS, retry } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala new file mode 100644 index 0000000000..ea10fba5b5 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.Queue +import akka.stream.stage.GraphStage +import akka.stream.stage.OutHandler +import akka.stream.Attributes +import akka.stream.Outlet +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue +import akka.stream.stage.GraphStageWithMaterializedValue +import org.agrona.concurrent.ManyToOneConcurrentLinkedQueueTail +import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import scala.annotation.tailrec +import scala.concurrent.Promise +import scala.util.Try +import scala.util.Success +import scala.util.Failure + +/** + * INTERNAL API + */ +private[remote] object SendQueue { + trait ProducerApi[T] { + def offer(message: T): Boolean + } + + trait QueueValue[T] extends ProducerApi[T] { + def inject(queue: Queue[T]): Unit + } + + private trait WakeupSignal { + def wakeup(): Unit + } +} + +/** + * INTERNAL API + */ +private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] { + import SendQueue._ + + val out: Outlet[T] = Outlet("SendQueue.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, QueueValue[T]) = { + @volatile var needWakeup = false + val queuePromise = Promise[Queue[T]]() + + val logic = new GraphStageLogic(shape) with OutHandler with WakeupSignal { + + // using a local field for the consumer side of queue to avoid volatile access + private var consumerQueue: Queue[T] = null + + private val wakeupCallback = getAsyncCallback[Unit] { _ ⇒ + if (isAvailable(out)) + tryPush() + } + + override def preStart(): Unit = { + implicit val ec = materializer.executionContext + queuePromise.future.onComplete(getAsyncCallback[Try[Queue[T]]] { + case Success(q) ⇒ + consumerQueue = q + needWakeup = true + if (isAvailable(out)) + tryPush() + case Failure(e) ⇒ + failStage(e) + }.invoke) + } + + override def onPull(): Unit = { + if (consumerQueue ne null) + tryPush() + } + + @tailrec private def tryPush(firstAttempt: Boolean = true): Unit = { + consumerQueue.poll() match { + case null ⇒ + needWakeup = true + // additional poll() to grab any elements that might missed the needWakeup + // and have been enqueued just after it + if (firstAttempt) + tryPush(firstAttempt = false) + case elem ⇒ + needWakeup = false // there will be another onPull + push(out, elem) + } + } + + // external call + override def wakeup(): Unit = { + wakeupCallback.invoke(()) + } + + override def postStop(): Unit = { + if (consumerQueue ne null) + consumerQueue.clear() + super.postStop() + } + + setHandler(out, this) + } + + val queueValue = new QueueValue[T] { + @volatile private var producerQueue: Queue[T] = null + + override def inject(q: Queue[T]): Unit = { + producerQueue = q + queuePromise.success(q) + } + + override def offer(message: T): Boolean = { + val q = producerQueue + if (q eq null) throw new IllegalStateException("offer not allowed before injecting the queue") + val result = q.offer(message) + if (result && needWakeup) + logic.wakeup() + result + } + } + + (logic, queueValue) + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala new file mode 100644 index 0000000000..1655432cdc --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.Queue + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.Props +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Source +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue + +object SendQueueSpec { + + case class ProduceToQueue(from: Int, until: Int, queue: Queue[Msg]) + case class ProduceToQueueValue(from: Int, until: Int, queue: SendQueue.QueueValue[Msg]) + case class Msg(fromProducer: String, value: Int) + + def producerProps(producerId: String): Props = + Props(new Producer(producerId)) + + class Producer(producerId: String) extends Actor { + def receive = { + case ProduceToQueue(from, until, queue) ⇒ + var i = from + while (i < until) { + if (!queue.offer(Msg(producerId, i))) + throw new IllegalStateException(s"offer failed from $producerId value $i") + i += 1 + } + case ProduceToQueueValue(from, until, queue) ⇒ + var i = from + while (i < until) { + if (!queue.offer(Msg(producerId, i))) + throw new IllegalStateException(s"offer failed from $producerId value $i") + i += 1 + } + } + + } +} + +class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender { + import SendQueueSpec._ + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + "SendQueue" must { + + "deliver all messages" in { + val queue = new ManyToOneConcurrentArrayQueue[String](128) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + .toMat(TestSink.probe)(Keep.both).run() + + downstream.request(10) + sendQueue.inject(queue) + sendQueue.offer("a") + sendQueue.offer("b") + sendQueue.offer("c") + downstream.expectNext("a") + downstream.expectNext("b") + downstream.expectNext("c") + downstream.cancel() + } + + "deliver messages enqueued before materialization" in { + val queue = new ManyToOneConcurrentArrayQueue[String](128) + queue.offer("a") + queue.offer("b") + + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + .toMat(TestSink.probe)(Keep.both).run() + + downstream.request(10) + downstream.expectNoMsg(200.millis) + sendQueue.inject(queue) + downstream.expectNext("a") + downstream.expectNext("b") + + sendQueue.offer("c") + downstream.expectNext("c") + downstream.cancel() + } + + "deliver bursts of messages" in { + // this test verifies that the wakeup signal is triggered correctly + val queue = new ManyToOneConcurrentArrayQueue[Int](128) + val burstSize = 100 + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int]) + .grouped(burstSize) + .async + .toMat(TestSink.probe)(Keep.both).run() + + downstream.request(10) + sendQueue.inject(queue) + + for (round ← 1 to 100000) { + for (n ← 1 to burstSize) { + if (!sendQueue.offer(round * 1000 + n)) + fail(s"offer failed at round $round message $n") + } + downstream.expectNext((1 to burstSize).map(_ + round * 1000).toList) + downstream.request(1) + } + + downstream.cancel() + } + + "support multiple producers" in { + val numberOfProducers = 5 + val queue = new ManyToOneConcurrentArrayQueue[Msg](numberOfProducers * 512) + val producers = Vector.tabulate(numberOfProducers)(i ⇒ system.actorOf(producerProps(s"producer-$i"))) + + // send 100 per producer before materializing + producers.foreach(_ ! ProduceToQueue(0, 100, queue)) + + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg]) + .toMat(TestSink.probe)(Keep.both).run() + + sendQueue.inject(queue) + producers.foreach(_ ! ProduceToQueueValue(100, 200, sendQueue)) + + // send 100 more per producer + downstream.request(producers.size * 200) + val msgByProducer = downstream.expectNextN(producers.size * 200).groupBy(_.fromProducer) + (0 until producers.size).foreach { i ⇒ + msgByProducer(s"producer-$i").map(_.value) should ===(0 until 200) + } + + // send 500 per producer + downstream.request(producers.size * 1000) // more than enough + producers.foreach(_ ! ProduceToQueueValue(200, 700, sendQueue)) + val msgByProducer2 = downstream.expectNextN(producers.size * 500).groupBy(_.fromProducer) + (0 until producers.size).foreach { i ⇒ + msgByProducer2(s"producer-$i").map(_.value) should ===(200 until 700) + } + + downstream.cancel() + } + + } + +}