One minute is shorter, and cleaned up blocking readers impl

This commit is contained in:
Viktor Klang 2010-08-20 12:41:37 +02:00
parent 0869b8b137
commit b84a6730f0

View file

@ -21,11 +21,11 @@ object DataFlow {
case object Start
case object Exit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
def thread(body: => Unit) = {
val thread = actorOf(new IsolatedEventBasedThread(body)).start
@ -46,7 +46,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
case Exit => self.stop
case Exit => self.stop
case message => self.reply(body(message.asInstanceOf[A]))
}
}
@ -55,7 +55,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class DataFlowVariable[T <: Any] {
val TIME_OUT = 10000 * 60 // 60 seconds default timeout
val TIME_OUT = 1000 * 60 // 60 seconds default timeout
private sealed abstract class DataFlowVariableMessage
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
@ -67,14 +67,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = TIME_OUT
def receive = {
case Set(v) =>
case s@Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
val iterator = dataFlow.blockedReaders.iterator
while (iterator.hasNext) iterator.next ! Set(v)
dataFlow.blockedReaders.clear
while(dataFlow.blockedReaders.peek ne null)
dataFlow.blockedReaders.poll ! s
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
case Exit => self.stop
case Exit => self.stop
}
}
@ -87,7 +86,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
if (ref.isDefined) self.reply(ref.get)
else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]]
case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => self.stop
case Exit => self.stop
}
}
@ -98,15 +97,12 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
def <<(value: T) = in ! Set(value)
def apply(): T = {
val ref = value.get
if (ref.isDefined) ref.get
else {
value.get getOrElse {
val out = actorOf(new Out(this)).start
blockedReaders.offer(out)
val result = (out !! Get).as[T]
out ! Exit
if (result.isDefined) result.get
else throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")
result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
}
}