From f95ab52f10cf42227583b1acf4ef70efd34cbefa Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 28 Nov 2017 14:53:02 +0100 Subject: [PATCH] additional test of SendQueue --- .../akka/remote/artery/SendQueueSpec.scala | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala index 851c757fa2..fff1547938 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala @@ -13,6 +13,7 @@ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source +import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender @@ -147,6 +148,67 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with downstream.cancel() } + "deliver first message" in { + + def test(f: (ManyToOneConcurrentArrayQueue[String], SendQueue.QueueValue[String], TestSubscriber.Probe[String]) ⇒ Unit): Unit = { + + (1 to 100).foreach { n ⇒ + val queue = new ManyToOneConcurrentArrayQueue[String](16) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + .toMat(TestSink.probe)(Keep.both).run() + + f(queue, sendQueue, downstream) + downstream.expectNext("a") + + sendQueue.offer("b") + downstream.expectNext("b") + sendQueue.offer("c") + sendQueue.offer("d") + downstream.expectNext("c") + downstream.expectNext("d") + downstream.cancel() + } + } + + test { (queue, sendQueue, downstream) ⇒ + queue.offer("a") + downstream.request(10) + sendQueue.inject(queue) + } + test { (queue, sendQueue, downstream) ⇒ + sendQueue.inject(queue) + queue.offer("a") + downstream.request(10) + } + + test { (queue, sendQueue, downstream) ⇒ + queue.offer("a") + sendQueue.inject(queue) + downstream.request(10) + } + test { (queue, sendQueue, downstream) ⇒ + downstream.request(10) + queue.offer("a") + sendQueue.inject(queue) + } + + test { (queue, sendQueue, downstream) ⇒ + sendQueue.inject(queue) + downstream.request(10) + sendQueue.offer("a") + } + test { (queue, sendQueue, downstream) ⇒ + downstream.request(10) + sendQueue.inject(queue) + sendQueue.offer("a") + } + test { (queue, sendQueue, downstream) ⇒ + sendQueue.inject(queue) + sendQueue.offer("a") + downstream.request(10) + } + } + } }