diff --git a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala index a85a47ba85..e415423c89 100644 --- a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala @@ -21,11 +21,11 @@ object DataFlow { case object Start case object Exit -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - -import se.scalablesolutions.akka.actor.Actor -import se.scalablesolutions.akka.dispatch.CompletableFuture + import java.util.concurrent.atomic.AtomicReference + import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} + import scala.collection.JavaConversions._ + import se.scalablesolutions.akka.actor.Actor + import se.scalablesolutions.akka.dispatch.CompletableFuture def thread(body: => Unit) = { val thread = actorOf(new IsolatedEventBasedThread(body)).start @@ -46,7 +46,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) extends Actor { def receive = { - case Exit => self.stop + case Exit => self.stop case message => self.reply(body(message.asInstanceOf[A])) } } @@ -55,7 +55,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture * @author Jonas Bonér */ sealed class DataFlowVariable[T <: Any] { - val TIME_OUT = 10000 * 60 // 60 seconds default timeout + val TIME_OUT = 1000 * 60 // 60 seconds default timeout private sealed abstract class DataFlowVariableMessage private case class Set[T <: Any](value: T) extends DataFlowVariableMessage @@ -67,14 +67,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { self.timeout = TIME_OUT def receive = { - case Set(v) => + case s@Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - val iterator = dataFlow.blockedReaders.iterator - while (iterator.hasNext) iterator.next ! Set(v) - dataFlow.blockedReaders.clear + while(dataFlow.blockedReaders.peek ne null) + dataFlow.blockedReaders.poll ! s } else throw new DataFlowVariableException( "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit => self.stop + case Exit => self.stop } } @@ -87,7 +86,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture if (ref.isDefined) self.reply(ref.get) else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]] case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v) - case Exit => self.stop + case Exit => self.stop } } @@ -98,15 +97,12 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture def <<(value: T) = in ! Set(value) def apply(): T = { - val ref = value.get - if (ref.isDefined) ref.get - else { + value.get getOrElse { val out = actorOf(new Out(this)).start blockedReaders.offer(out) val result = (out !! Get).as[T] out ! Exit - if (result.isDefined) result.get - else throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result") + result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")) } }