2009-03-12 21:19:21 +01:00
|
|
|
/**
|
2010-12-22 15:35:50 +01:00
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-03-12 21:19:21 +01:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dataflow
|
2009-03-12 21:19:21 +01:00
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
|
|
|
|
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
|
|
|
|
|
2011-03-02 18:19:17 +01:00
|
|
|
import akka.actor.{Actor, ActorRef, EventHandler}
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.actor.Actor._
|
|
|
|
|
import akka.dispatch.CompletableFuture
|
|
|
|
|
import akka.AkkaException
|
2010-11-24 14:49:10 +01:00
|
|
|
import akka.japi.{ Function, Effect }
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2009-04-27 20:06:48 +02:00
|
|
|
/**
|
|
|
|
|
* Implements Oz-style dataflow (single assignment) variables.
|
2009-12-08 16:17:22 +01:00
|
|
|
*
|
2009-04-27 20:06:48 +02:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-12-08 16:17:22 +01:00
|
|
|
object DataFlow {
|
2010-08-23 15:51:02 +02:00
|
|
|
object Start
|
|
|
|
|
object Exit
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-08-23 16:28:08 +02:00
|
|
|
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
2010-03-18 08:37:44 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Executes the supplied thunk in another thread.
|
2010-08-23 16:28:08 +02:00
|
|
|
*/
|
2010-08-23 15:51:02 +02:00
|
|
|
def thread(body: => Unit): Unit = spawn(body)
|
2009-03-12 21:19:21 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* JavaAPI.
|
|
|
|
|
* Executes the supplied Effect in another thread.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
2010-11-24 14:49:10 +01:00
|
|
|
def thread(body: Effect): Unit = spawn(body.apply)
|
2010-09-15 16:49:05 +02:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Executes the supplied function in another thread.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
2009-12-08 16:17:22 +01:00
|
|
|
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
|
2010-05-16 20:15:08 +02:00
|
|
|
actorOf(new ReactiveEventBasedThread(body)).start
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* JavaAPI.
|
|
|
|
|
* Executes the supplied Function in another thread.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
|
|
|
|
def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) =
|
|
|
|
|
actorOf(new ReactiveEventBasedThread(body.apply)).start
|
|
|
|
|
|
2009-12-08 16:17:22 +01:00
|
|
|
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
|
|
|
|
|
extends Actor {
|
|
|
|
|
def receive = {
|
2010-08-20 12:41:37 +02:00
|
|
|
case Exit => self.stop
|
2010-05-16 10:59:06 +02:00
|
|
|
case message => self.reply(body(message.asInstanceOf[A]))
|
2009-03-12 21:19:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-23 16:28:08 +02:00
|
|
|
private object DataFlowVariable {
|
|
|
|
|
private sealed abstract class DataFlowVariableMessage
|
|
|
|
|
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
|
|
|
|
|
private object Get extends DataFlowVariableMessage
|
|
|
|
|
}
|
|
|
|
|
|
2009-04-27 20:06:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2011-03-23 00:14:12 +01:00
|
|
|
@deprecated("Superceeded by Future and CompletableFuture as of 1.1")
|
2010-08-23 16:28:08 +02:00
|
|
|
sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
|
|
|
|
|
import DataFlowVariable._
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-08-23 16:28:08 +02:00
|
|
|
def this() = this(1000 * 60)
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2009-03-12 21:19:21 +01:00
|
|
|
private val value = new AtomicReference[Option[T]](None)
|
2010-05-06 08:13:12 +02:00
|
|
|
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
|
2009-03-12 21:19:21 +01:00
|
|
|
|
2009-12-08 16:17:22 +01:00
|
|
|
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
|
2010-08-23 16:28:08 +02:00
|
|
|
self.timeout = timeoutMs
|
2009-12-08 16:17:22 +01:00
|
|
|
def receive = {
|
2010-08-20 12:41:37 +02:00
|
|
|
case s@Set(v) =>
|
2009-07-07 22:11:27 +02:00
|
|
|
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
|
2010-08-20 12:41:37 +02:00
|
|
|
while(dataFlow.blockedReaders.peek ne null)
|
|
|
|
|
dataFlow.blockedReaders.poll ! s
|
2009-03-12 21:19:21 +01:00
|
|
|
} else throw new DataFlowVariableException(
|
|
|
|
|
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
|
2010-08-23 15:51:02 +02:00
|
|
|
case Exit => self.stop
|
2009-12-08 16:17:22 +01:00
|
|
|
}
|
2009-03-12 21:19:21 +01:00
|
|
|
}
|
2009-12-08 16:17:22 +01:00
|
|
|
|
|
|
|
|
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
|
2010-08-23 16:28:08 +02:00
|
|
|
self.timeout = timeoutMs
|
2010-08-23 15:51:02 +02:00
|
|
|
private var readerFuture: Option[CompletableFuture[Any]] = None
|
2009-12-08 16:17:22 +01:00
|
|
|
def receive = {
|
2010-08-23 16:06:52 +02:00
|
|
|
case Get => dataFlow.value.get match {
|
|
|
|
|
case Some(value) => self reply value
|
|
|
|
|
case None => readerFuture = self.senderFuture
|
|
|
|
|
}
|
2010-08-23 15:51:02 +02:00
|
|
|
case Set(v:T) => readerFuture.map(_ completeWithResult v)
|
|
|
|
|
case Exit => self.stop
|
2009-12-08 16:17:22 +01:00
|
|
|
}
|
2009-03-12 21:19:21 +01:00
|
|
|
}
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-05-16 20:15:08 +02:00
|
|
|
private[this] val in = actorOf(new In(this)).start
|
2009-03-12 21:19:21 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Sets the value of this variable (if unset) with the value of the supplied variable.
|
2010-08-23 16:28:08 +02:00
|
|
|
*/
|
2010-08-24 13:11:41 +02:00
|
|
|
def <<(ref: DataFlowVariable[T]) {
|
|
|
|
|
if (this.value.get.isEmpty) in ! Set(ref())
|
|
|
|
|
else throw new DataFlowVariableException(
|
|
|
|
|
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
|
|
|
|
|
}
|
2009-03-12 21:19:21 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* JavaAPI.
|
|
|
|
|
* Sets the value of this variable (if unset) with the value of the supplied variable.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
|
|
|
|
def set(ref: DataFlowVariable[T]) { this << ref }
|
|
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Sets the value of this variable (if unset).
|
2010-08-23 16:28:08 +02:00
|
|
|
*/
|
2010-08-24 13:11:41 +02:00
|
|
|
def <<(value: T) {
|
|
|
|
|
if (this.value.get.isEmpty) in ! Set(value)
|
|
|
|
|
else throw new DataFlowVariableException(
|
|
|
|
|
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
|
|
|
|
|
}
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* JavaAPI.
|
|
|
|
|
* Sets the value of this variable (if unset) with the value of the supplied variable.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
|
|
|
|
def set(value: T) { this << value }
|
|
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
|
2010-09-15 16:49:05 +02:00
|
|
|
*/
|
|
|
|
|
def get(): T = this()
|
|
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
/**
|
|
|
|
|
* Retrieves the value of variable, throws a DataFlowVariableException if it times out.
|
2010-08-23 16:28:08 +02:00
|
|
|
*/
|
2009-12-08 16:17:22 +01:00
|
|
|
def apply(): T = {
|
2010-08-20 12:41:37 +02:00
|
|
|
value.get getOrElse {
|
2010-05-16 20:15:08 +02:00
|
|
|
val out = actorOf(new Out(this)).start
|
2010-08-24 13:11:41 +02:00
|
|
|
|
|
|
|
|
val result = try {
|
|
|
|
|
blockedReaders offer out
|
|
|
|
|
(out !! Get).as[T]
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception =>
|
2011-03-18 23:04:48 +01:00
|
|
|
EventHandler.error(e, this, e.getMessage)
|
2010-08-24 13:11:41 +02:00
|
|
|
out ! Exit
|
|
|
|
|
throw e
|
|
|
|
|
}
|
2010-10-29 16:33:31 +02:00
|
|
|
|
2010-11-24 21:03:42 +01:00
|
|
|
result.getOrElse(throw new DataFlowVariableException(
|
|
|
|
|
"Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
|
2009-03-12 21:19:21 +01:00
|
|
|
}
|
|
|
|
|
}
|
2009-12-08 16:17:22 +01:00
|
|
|
|
2010-03-30 23:58:50 +02:00
|
|
|
def shutdown = in ! Exit
|
2009-03-12 21:19:21 +01:00
|
|
|
}
|
2010-09-19 21:32:06 +02:00
|
|
|
}
|