diff --git a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala index fa4325e4c2..c6eff59495 100644 --- a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala @@ -21,12 +21,10 @@ object DataFlow { object Start object Exit - import java.util.concurrent.atomic.AtomicReference - import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - import scala.collection.JavaConversions._ - import se.scalablesolutions.akka.actor.Actor - import se.scalablesolutions.akka.dispatch.CompletableFuture + class DataFlowVariableException(msg: String) extends AkkaException(msg) + /** Executes the supplied thunk in another thread + */ def thread(body: => Unit): Unit = spawn(body) def thread[A <: AnyRef, R <: AnyRef](body: A => R) = @@ -40,21 +38,25 @@ object DataFlow { } } - /** - * @author Jonas Bonér - */ - sealed class DataFlowVariable[T <: Any] { - val TIME_OUT = 1000 * 60 // 60 seconds default timeout - + private object DataFlowVariable { private sealed abstract class DataFlowVariableMessage private case class Set[T <: Any](value: T) extends DataFlowVariableMessage private object Get extends DataFlowVariableMessage + } + + /** + * @author Jonas Bonér + */ + sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { + import DataFlowVariable._ + + def this() = this(1000 * 60) private val value = new AtomicReference[Option[T]](None) private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = TIME_OUT + self.timeout = timeoutMs def receive = { case s@Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { @@ -67,7 +69,7 @@ object DataFlow { } private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = TIME_OUT + self.timeout = timeoutMs private var readerFuture: Option[CompletableFuture[Any]] = None def receive = { case Get => dataFlow.value.get match { @@ -81,63 +83,29 @@ object DataFlow { private[this] val in = actorOf(new In(this)).start - def <<(ref: DataFlowVariable[T]): Unit = if(this.value.get.isEmpty) in ! Set(ref()) + /** Sets the value of this variable (if unset) with the value of the supplied variable + */ + def <<(ref: DataFlowVariable[T]): Unit = + if(this.value.get.isEmpty) in ! Set(ref()) - def <<(value: T): Unit = if(this.value.get.isEmpty) in ! Set(value) + /** Sets the value of this variable (if unset) + */ + def <<(value: T): Unit = + if(this.value.get.isEmpty) in ! Set(value) + /** Retrieves the value of variable + * throws a DataFlowVariableException if it times out + */ def apply(): T = { value.get getOrElse { val out = actorOf(new Out(this)).start blockedReaders offer out val result = (out !! Get).as[T] out ! Exit - result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")) + result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) } } def shutdown = in ! Exit } - - /** - * @author Jonas Bonér - */ - /*FIXME I do not work - class DataFlowStream[T <: Any] extends Seq[T] { - private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] - - def <<<(ref: DataFlowVariable[T]): Boolean = queue offer ref - - def <<<(value: T): Boolean = { - val ref = new DataFlowVariable[T] - ref << value - queue offer ref - } - - def apply(): T = queue.take.apply - - def take: DataFlowVariable[T] = queue.take - - //==== For Seq ==== - - def length: Int = queue.size - - def apply(i: Int): T = { - if (i == 0) apply() - else throw new UnsupportedOperationException( - "Access by index other than '0' is not supported by DataFlowStream") - } - - def iterator: Iterator[T] = new Iterator[T] { - private val i = queue.iterator - def hasNext: Boolean = i.hasNext - def next: T = { val ref = i.next; ref() } - } - - override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] - }*/ - - /** - * @author Jonas Bonér - */ - class DataFlowVariableException(msg: String) extends AkkaException(msg) } \ No newline at end of file diff --git a/akka-core/src/test/scala/dataflow/DataFlowSpec.scala b/akka-core/src/test/scala/dataflow/DataFlowSpec.scala index 0e917c904e..cdc8bf2455 100644 --- a/akka-core/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-core/src/test/scala/dataflow/DataFlowSpec.scala @@ -20,7 +20,7 @@ import se.scalablesolutions.akka.actor.ActorRegistry @RunWith(classOf[JUnitRunner]) class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { describe("DataflowVariable") { - it("should work and generate correct results") { + it("should be able to set the value of one variable from other variables") { import DataFlow._ val latch = new CountDownLatch(1) @@ -34,13 +34,12 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { thread { x << 40 } thread { y << 2 } - latch.await(3,TimeUnit.SECONDS) should equal (true) - List(x,y,z).foreach(_.shutdown) + latch.await(10,TimeUnit.SECONDS) should equal (true) result.get should equal (42) - ActorRegistry.shutdownAll + List(x,y,z).foreach(_.shutdown) } - it("should be able to transform a stream") { + it("should be able to sum a sequence of ints") { import DataFlow._ def ints(n: Int, max: Int): List[Int] = @@ -66,106 +65,9 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { latch.countDown } - latch.await(3,TimeUnit.SECONDS) should equal (true) - List(x,y,z).foreach(_.shutdown) + latch.await(10,TimeUnit.SECONDS) should equal (true) result.get should equal (sum(0,ints(0,1000))) - ActorRegistry.shutdownAll + List(x,y,z).foreach(_.shutdown) } } - - /*it("should be able to join streams") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - thread { ints(0, 1000, producer) } - thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - ActorRegistry.shutdownAll - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - ActorRegistry.shutdownAll - }*/ - - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown - } - - val setY = thread { - Thread sleep 2000 - y << 2 - } - - val setV = thread { - v << y - } - - latch.await(2,TimeUnit.SECONDS) should equal (true) - List(x,y,z,v) foreach (_.shutdown) - List(main,setY,setV) foreach (_ ! Exit) - println("Foo") - ActorRegistry.shutdownAll - }*/ }