diff --git a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala index 6608f6075b..329682de52 100644 --- a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala @@ -150,4 +150,46 @@ object DataFlow { def shutdown = in ! Exit } -} \ No newline at end of file + + /** + * @author Jonas Bonér + */ + class DataFlowStream[T <: Any] extends Seq[T] { + private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] + + def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) + + def <<<(value: T) = { + val ref = new DataFlowVariable[T] + ref << value + queue.offer(ref) + } + + def apply(): T = { + val ref = queue.take + val result = ref() + ref.shutdown + result + } + + def take: DataFlowVariable[T] = queue.take + + //==== For Seq ==== + + def length: Int = queue.size + + def apply(i: Int): T = { + if (i == 0) apply() + else throw new UnsupportedOperationException( + "Access by index other than '0' is not supported by DataFlowStream") + } + + def iterator: Iterator[T] = new Iterator[T] { + private val iter = queue.iterator + def hasNext: Boolean = iter.hasNext + def next: T = { val ref = iter.next; ref() } + } + + override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] + } +}