diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala similarity index 96% rename from akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala rename to akka-actor/src/main/scala/akka/dataflow/DataFlow.scala index 9becd42e48..c2972b31bb 100644 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlowVariable.scala +++ b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala @@ -157,8 +157,18 @@ object DataFlow { class DataFlowStream[T <: Any] extends Seq[T] { private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] - def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) + def next: DataFlowVariable[T] = queue.take + //==== Java API ==== + + def offer(ref: DataFlowVariable[T]) = <<<(ref) + def offer(value: T) = <<<(value) + def take(): T = apply() + + //==== Scala API ==== + + def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) + def <<<(value: T) = { val ref = new DataFlowVariable[T] ref << value @@ -172,8 +182,6 @@ object DataFlow { result } - def take: DataFlowVariable[T] = queue.take - //==== For Seq ==== def length: Int = queue.size