pekko/akka-core/src/main/scala/dataflow/DataFlowVariable.scala

156 lines
4.7 KiB
Scala
Raw Normal View History

2009-03-12 21:19:21 +01:00
/**
2009-12-27 16:01:53 +01:00
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
2009-03-12 21:19:21 +01:00
*/
2010-06-19 17:41:35 +12:00
package se.scalablesolutions.akka.dataflow
2009-03-12 21:19:21 +01:00
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
2010-05-06 08:13:12 +02:00
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
2010-05-02 10:46:49 +02:00
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.AkkaException
2009-12-08 16:17:22 +01:00
2009-04-27 20:06:48 +02:00
/**
* Implements Oz-style dataflow (single assignment) variables.
2009-12-08 16:17:22 +01:00
*
2009-04-27 20:06:48 +02:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
2009-12-08 16:17:22 +01:00
object DataFlow {
case object Start
case object Exit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import scala.collection.JavaConversions._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.dispatch.CompletableFuture
2009-12-08 16:17:22 +01:00
def thread(body: => Unit) = {
2010-05-08 15:59:11 +02:00
val thread = actorOf(new IsolatedEventBasedThread(body)).start
2010-03-30 23:58:50 +02:00
thread ! Start
2009-03-12 21:19:21 +01:00
thread
}
private class IsolatedEventBasedThread(body: => Unit) extends Actor {
2009-12-08 16:17:22 +01:00
def receive = {
case Start => body
case Exit => self.stop
2009-03-12 21:19:21 +01:00
}
}
2009-12-08 16:17:22 +01:00
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
actorOf(new ReactiveEventBasedThread(body)).start
2009-12-08 16:17:22 +01:00
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
case Exit => self.stop
case message => self.reply(body(message.asInstanceOf[A]))
2009-03-12 21:19:21 +01:00
}
}
2009-04-27 20:06:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
2009-12-08 16:17:22 +01:00
sealed class DataFlowVariable[T <: Any] {
val TIME_OUT = 1000 * 60 // 60 seconds default timeout
2009-12-08 16:17:22 +01:00
2009-03-12 21:19:21 +01:00
private sealed abstract class DataFlowVariableMessage
2009-12-08 16:17:22 +01:00
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
2009-03-12 21:19:21 +01:00
private case object Get extends DataFlowVariableMessage
2009-12-08 16:17:22 +01:00
2009-03-12 21:19:21 +01:00
private val value = new AtomicReference[Option[T]](None)
2010-05-06 08:13:12 +02:00
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
2009-03-12 21:19:21 +01:00
2009-12-08 16:17:22 +01:00
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = TIME_OUT
2009-12-08 16:17:22 +01:00
def receive = {
case s@Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
while(dataFlow.blockedReaders.peek ne null)
dataFlow.blockedReaders.poll ! s
2009-03-12 21:19:21 +01:00
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
case Exit => self.stop
2009-12-08 16:17:22 +01:00
}
2009-03-12 21:19:21 +01:00
}
2009-12-08 16:17:22 +01:00
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
self.timeout = TIME_OUT
2010-04-23 20:46:58 +02:00
private var readerFuture: Option[CompletableFuture[T]] = None
2009-12-08 16:17:22 +01:00
def receive = {
case Get =>
2009-03-12 21:19:21 +01:00
val ref = dataFlow.value.get
if (ref.isDefined) self.reply(ref.get)
else readerFuture = self.senderFuture.asInstanceOf[Option[CompletableFuture[T]]]
2010-04-23 20:46:58 +02:00
case Set(v:T) => if (readerFuture.isDefined) readerFuture.get.completeWithResult(v)
case Exit => self.stop
2009-12-08 16:17:22 +01:00
}
2009-03-12 21:19:21 +01:00
}
2009-12-08 16:17:22 +01:00
private[this] val in = actorOf(new In(this)).start
2009-03-12 21:19:21 +01:00
2010-03-30 23:58:50 +02:00
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
2009-03-12 21:19:21 +01:00
2010-03-30 23:58:50 +02:00
def <<(value: T) = in ! Set(value)
2009-12-08 16:17:22 +01:00
def apply(): T = {
value.get getOrElse {
val out = actorOf(new Out(this)).start
2009-03-12 21:19:21 +01:00
blockedReaders.offer(out)
val result = (out !! Get).as[T]
2010-03-30 23:58:50 +02:00
out ! Exit
result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
2009-03-12 21:19:21 +01:00
}
}
2009-12-08 16:17:22 +01:00
2010-03-30 23:58:50 +02:00
def shutdown = in ! Exit
2009-03-12 21:19:21 +01:00
}
2009-04-27 20:06:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
2009-12-08 16:17:22 +01:00
class DataFlowStream[T <: Any] extends Seq[T] {
2009-03-12 21:19:21 +01:00
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
2009-12-08 16:17:22 +01:00
def <<<(value: T) = {
2009-03-12 21:19:21 +01:00
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
2009-12-08 16:17:22 +01:00
}
2009-03-12 21:19:21 +01:00
def apply(): T = {
val ref = queue.take
ref()
}
2009-12-08 16:17:22 +01:00
2009-03-12 21:19:21 +01:00
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
2009-12-08 16:17:22 +01:00
2009-03-12 21:19:21 +01:00
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
2009-12-08 16:17:22 +01:00
else throw new UnsupportedOperationException(
"Access by index other than '0' is not supported by DataFlowStream")
}
2010-02-21 16:39:32 +01:00
def iterator: Iterator[T] = new Iterator[T] {
2009-03-12 21:19:21 +01:00
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
def next: T = { val ref = iter.next; ref() }
}
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
2009-12-08 16:17:22 +01:00
2009-04-27 20:06:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DataFlowVariableException(msg: String) extends AkkaException(msg)
}