Refactor, optimize, remove non-working code
This commit is contained in:
parent
d05a24cb1a
commit
e7efcf4781
2 changed files with 47 additions and 66 deletions
|
|
@ -18,8 +18,8 @@ import se.scalablesolutions.akka.AkkaException
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object DataFlow {
|
||||
case object Start
|
||||
case object Exit
|
||||
object Start
|
||||
object Exit
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
||||
|
|
@ -27,18 +27,7 @@ object DataFlow {
|
|||
import se.scalablesolutions.akka.actor.Actor
|
||||
import se.scalablesolutions.akka.dispatch.CompletableFuture
|
||||
|
||||
def thread(body: => Unit) = {
|
||||
val thread = actorOf(new IsolatedEventBasedThread(body)).start
|
||||
thread ! Start
|
||||
thread
|
||||
}
|
||||
|
||||
private class IsolatedEventBasedThread(body: => Unit) extends Actor {
|
||||
def receive = {
|
||||
case Start => body
|
||||
case Exit => self.stop
|
||||
}
|
||||
}
|
||||
def thread(body: => Unit): Unit = spawn(body)
|
||||
|
||||
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
|
||||
actorOf(new ReactiveEventBasedThread(body)).start
|
||||
|
|
@ -59,7 +48,7 @@ object DataFlow {
|
|||
|
||||
private sealed abstract class DataFlowVariableMessage
|
||||
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
|
||||
private case object Get extends DataFlowVariableMessage
|
||||
private object Get extends DataFlowVariableMessage
|
||||
|
||||
private val value = new AtomicReference[Option[T]](None)
|
||||
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
|
||||
|
|
@ -73,33 +62,34 @@ object DataFlow {
|
|||
dataFlow.blockedReaders.poll ! s
|
||||
} else throw new DataFlowVariableException(
|
||||
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
|
||||
case Exit => self.stop
|
||||
case Exit => self.stop
|
||||
}
|
||||
}
|
||||
|
||||
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
|
||||
self.timeout = TIME_OUT
|
||||
private var readerFuture: Option[CompletableFuture[T]] = None
|
||||
private var readerFuture: Option[CompletableFuture[Any]] = None
|
||||
def receive = {
|
||||
case Get =>
|
||||
val ref = dataFlow.value.get
|
||||
if (ref.isDefined) self.reply(ref.get)
|
||||
else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]]
|
||||
case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
|
||||
case Exit => self.stop
|
||||
dataFlow.value.get match {
|
||||
case Some(value) => self reply value
|
||||
case None => readerFuture = self.senderFuture
|
||||
}
|
||||
case Set(v:T) => readerFuture.map(_ completeWithResult v)
|
||||
case Exit => self.stop
|
||||
}
|
||||
}
|
||||
|
||||
private[this] val in = actorOf(new In(this)).start
|
||||
|
||||
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
|
||||
def <<(ref: DataFlowVariable[T]) = if(this.value.get.isEmpty) in ! Set(ref())
|
||||
|
||||
def <<(value: T) = in ! Set(value)
|
||||
def <<(value: T): Unit = if(this.value.get.isEmpty) in ! Set(value)
|
||||
|
||||
def apply(): T = {
|
||||
value.get getOrElse {
|
||||
val out = actorOf(new Out(this)).start
|
||||
blockedReaders.offer(out)
|
||||
blockedReaders offer out
|
||||
val result = (out !! Get).as[T]
|
||||
out ! Exit
|
||||
result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
|
||||
|
|
@ -112,21 +102,19 @@ object DataFlow {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class DataFlowStream[T <: Any] extends Seq[T] {
|
||||
/*FIXME I do not work
|
||||
class DataFlowStream[T <: Any] extends Seq[T] {
|
||||
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
|
||||
|
||||
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
|
||||
def <<<(ref: DataFlowVariable[T]): Boolean = queue offer ref
|
||||
|
||||
def <<<(value: T) = {
|
||||
def <<<(value: T): Boolean = {
|
||||
val ref = new DataFlowVariable[T]
|
||||
ref << value
|
||||
queue.offer(ref)
|
||||
queue offer ref
|
||||
}
|
||||
|
||||
def apply(): T = {
|
||||
val ref = queue.take
|
||||
ref()
|
||||
}
|
||||
def apply(): T = queue.take.apply
|
||||
|
||||
def take: DataFlowVariable[T] = queue.take
|
||||
|
||||
|
|
@ -141,13 +129,13 @@ object DataFlow {
|
|||
}
|
||||
|
||||
def iterator: Iterator[T] = new Iterator[T] {
|
||||
private val iter = queue.iterator
|
||||
def hasNext: Boolean = iter.hasNext
|
||||
def next: T = { val ref = iter.next; ref() }
|
||||
private val i = queue.iterator
|
||||
def hasNext: Boolean = i.hasNext
|
||||
def next: T = { val ref = i.next; ref() }
|
||||
}
|
||||
|
||||
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
|
||||
}
|
||||
}*/
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue