Adding the old tests for the DataFlowStream
This commit is contained in:
parent
08d8d78ca7
commit
0dfc810311
1 changed files with 92 additions and 0 deletions
|
|
@ -69,5 +69,97 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
|||
result.get should equal (sum(0,ints(0,1000)))
|
||||
List(x,y,z).foreach(_.shutdown)
|
||||
}
|
||||
|
||||
it("should be able to join streams") {
|
||||
import DataFlow._
|
||||
ActorRegistry.shutdownAll
|
||||
|
||||
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
|
||||
stream <<< n
|
||||
ints(n + 1, max, stream)
|
||||
}
|
||||
|
||||
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
|
||||
out <<< s
|
||||
sum(in() + s, in, out)
|
||||
}
|
||||
|
||||
val producer = new DataFlowStream[Int]
|
||||
val consumer = new DataFlowStream[Int]
|
||||
val latch = new CountDownLatch(1)
|
||||
val result = new AtomicInteger(0)
|
||||
|
||||
val t1 = thread { ints(0, 1000, producer) }
|
||||
val t2 = thread {
|
||||
Thread.sleep(1000)
|
||||
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
|
||||
latch.countDown
|
||||
}
|
||||
|
||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||
result.get should equal (332833500)
|
||||
}
|
||||
|
||||
it("should be able to sum streams recursively") {
|
||||
import DataFlow._
|
||||
|
||||
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
|
||||
stream <<< n
|
||||
ints(n + 1, max, stream)
|
||||
}
|
||||
|
||||
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
|
||||
out <<< s
|
||||
sum(in() + s, in, out)
|
||||
}
|
||||
|
||||
val result = new AtomicLong(0)
|
||||
|
||||
val producer = new DataFlowStream[Int]
|
||||
val consumer = new DataFlowStream[Int]
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
|
||||
val x = stream()
|
||||
|
||||
if(result.addAndGet(x) == 166666500)
|
||||
latch.countDown
|
||||
|
||||
recurseSum(stream)
|
||||
}
|
||||
|
||||
thread { ints(0, 1000, producer) }
|
||||
thread { sum(0, producer, consumer) }
|
||||
thread { recurseSum(consumer) }
|
||||
|
||||
latch.await(15,TimeUnit.SECONDS) should equal (true)
|
||||
}
|
||||
|
||||
/* Test not ready for prime time, causes some sort of deadlock */
|
||||
/* it("should be able to conditionally set variables") {
|
||||
|
||||
import DataFlow._
|
||||
ActorRegistry.shutdownAll
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
val x, y, z, v = new DataFlowVariable[Int]
|
||||
|
||||
val main = thread {
|
||||
x << 1
|
||||
z << Math.max(x(),y())
|
||||
latch.countDown
|
||||
}
|
||||
|
||||
val setY = thread {
|
||||
// Thread.sleep(2000)
|
||||
y << 2
|
||||
}
|
||||
|
||||
val setV = thread {
|
||||
v << y
|
||||
}
|
||||
List(x,y,z,v) foreach (_.shutdown)
|
||||
latch.await(2,TimeUnit.SECONDS) should equal (true)
|
||||
}*/
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue