Readded a bugfixed DataFlowStream
This commit is contained in:
parent
a8f88a73b0
commit
e488b796b7
1 changed files with 43 additions and 1 deletions
|
|
@ -150,4 +150,46 @@ object DataFlow {
|
|||
|
||||
def shutdown = in ! Exit
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class DataFlowStream[T <: Any] extends Seq[T] {
|
||||
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
|
||||
|
||||
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
|
||||
|
||||
def <<<(value: T) = {
|
||||
val ref = new DataFlowVariable[T]
|
||||
ref << value
|
||||
queue.offer(ref)
|
||||
}
|
||||
|
||||
def apply(): T = {
|
||||
val ref = queue.take
|
||||
val result = ref()
|
||||
ref.shutdown
|
||||
result
|
||||
}
|
||||
|
||||
def take: DataFlowVariable[T] = queue.take
|
||||
|
||||
//==== For Seq ====
|
||||
|
||||
def length: Int = queue.size
|
||||
|
||||
def apply(i: Int): T = {
|
||||
if (i == 0) apply()
|
||||
else throw new UnsupportedOperationException(
|
||||
"Access by index other than '0' is not supported by DataFlowStream")
|
||||
}
|
||||
|
||||
def iterator: Iterator[T] = new Iterator[T] {
|
||||
private val iter = queue.iterator
|
||||
def hasNext: Boolean = iter.hasNext
|
||||
def next: T = { val ref = iter.next; ref() }
|
||||
}
|
||||
|
||||
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue