pekko/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala

135 lines
3.1 KiB
Scala
Raw Normal View History

2011-05-19 14:43:58 -06:00
package akka.dispatch
import Future.flow
import akka.util.cps._
import akka.util.Timeout
import akka.util.duration._
2011-10-11 16:05:48 +02:00
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
2011-05-19 14:43:58 -06:00
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
2011-05-19 14:43:58 -06:00
2011-10-11 16:05:48 +02:00
"A PromiseStream" must {
2011-05-19 14:43:58 -06:00
2011-10-11 16:05:48 +02:00
"work" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow { q << (1, 2, 3) }
flow {
a << q()
b << q
c << q()
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
2011-05-19 14:43:58 -06:00
}
2011-10-11 16:05:48 +02:00
"pend" in {
val a, b, c = Promise[Int]()
val q = PromiseStream[Int]()
flow {
a << q
b << q()
c << q
}
flow { q <<< List(1, 2, 3) }
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
2011-10-11 16:05:48 +02:00
}
2011-05-19 14:43:58 -06:00
2011-10-11 16:05:48 +02:00
"pend again" in {
val a, b, c, d = Promise[Int]()
val q1, q2 = PromiseStream[Int]()
val oneTwo = Future(List(1, 2))
flow {
a << q2
b << q2
q1 << 3 << 4
}
flow {
q2 <<< oneTwo
c << q1
d << q1
}
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
2011-10-11 16:05:48 +02:00
"enque" in {
val q = PromiseStream[Int]()
val a = q.dequeue()
val b = q.dequeue()
val c, d = Promise[Int]()
flow {
c << q
d << q
}
q ++= List(1, 2, 3, 4)
2011-05-19 14:43:58 -06:00
assert(Await.result(a, timeout.duration) === 1)
assert(Await.result(b, timeout.duration) === 2)
assert(Await.result(c, timeout.duration) === 3)
assert(Await.result(d, timeout.duration) === 4)
}
2011-10-11 16:05:48 +02:00
"map" in {
val qs = PromiseStream[String]()
val qi = qs.map(_.length)
val a, c = Promise[Int]()
val b = Promise[String]()
flow {
a << qi
b << qs
c << qi
}
flow {
qs << ("Hello", "World!", "Test")
}
assert(Await.result(a, timeout.duration) === 5)
assert(Await.result(b, timeout.duration) === "World!")
assert(Await.result(c, timeout.duration) === 4)
2011-05-19 14:43:58 -06:00
}
2011-10-11 16:05:48 +02:00
"not fail under concurrent stress" in {
implicit val timeout = Timeout(60 seconds)
val q = PromiseStream[Long](timeout.duration.toMillis)
2011-10-11 16:05:48 +02:00
flow {
var n = 0L
repeatC(50000) {
n += 1
q << n
}
}
2011-10-11 16:05:48 +02:00
val future = Future sequence {
List.fill(10) {
flow {
var total = 0L
repeatC(10000) {
val n = q()
total += n
}
total
}
}
2011-10-11 16:05:48 +02:00
} map (_.sum)
2011-10-11 16:05:48 +02:00
flow {
var n = 50000L
repeatC(50000) {
n += 1
q << n
}
}
assert(Await.result(future, timeout.duration) === (1L to 100000L).sum)
2011-10-11 16:05:48 +02:00
}
2011-05-19 14:43:58 -06:00
}
}