additional test of SendQueue

This commit is contained in:
Patrik Nordwall 2017-11-28 14:53:02 +01:00
parent 930d2e9133
commit f95ab52f10

View file

@ -13,6 +13,7 @@ import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
@ -147,6 +148,67 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
downstream.cancel() downstream.cancel()
} }
"deliver first message" in {
def test(f: (ManyToOneConcurrentArrayQueue[String], SendQueue.QueueValue[String], TestSubscriber.Probe[String]) Unit): Unit = {
(1 to 100).foreach { n
val queue = new ManyToOneConcurrentArrayQueue[String](16)
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
.toMat(TestSink.probe)(Keep.both).run()
f(queue, sendQueue, downstream)
downstream.expectNext("a")
sendQueue.offer("b")
downstream.expectNext("b")
sendQueue.offer("c")
sendQueue.offer("d")
downstream.expectNext("c")
downstream.expectNext("d")
downstream.cancel()
}
}
test { (queue, sendQueue, downstream)
queue.offer("a")
downstream.request(10)
sendQueue.inject(queue)
}
test { (queue, sendQueue, downstream)
sendQueue.inject(queue)
queue.offer("a")
downstream.request(10)
}
test { (queue, sendQueue, downstream)
queue.offer("a")
sendQueue.inject(queue)
downstream.request(10)
}
test { (queue, sendQueue, downstream)
downstream.request(10)
queue.offer("a")
sendQueue.inject(queue)
}
test { (queue, sendQueue, downstream)
sendQueue.inject(queue)
downstream.request(10)
sendQueue.offer("a")
}
test { (queue, sendQueue, downstream)
downstream.request(10)
sendQueue.inject(queue)
sendQueue.offer("a")
}
test { (queue, sendQueue, downstream)
sendQueue.inject(queue)
sendQueue.offer("a")
downstream.request(10)
}
}
} }
} }