diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index ba5125717b..61c70ba10b 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -17,11 +17,10 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
-import se.scalablesolutions.akka.util.Logging
-
import org.codehaus.aspectwerkz.proxy.Uuid
import org.multiverse.api.ThreadLocalTransaction._
+import se.scalablesolutions.akka.util.{HashCode, Logging}
/**
* Mix in this trait to give an actor TransactionRequired semantics.
@@ -215,6 +214,7 @@ trait Actor extends TransactionManagement {
implicit protected val self: Actor = this
// FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait
+ // Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = Uuid.newUuid.toString
def uuid = _uuid
@@ -959,5 +959,17 @@ trait Actor extends TransactionManagement {
} else message
} else message
+ override def hashCode(): Int = {
+ var result = HashCode.SEED
+ result = HashCode.hash(result, _uuid)
+ result
+ }
+
+ override def equals(that: Any): Boolean = {
+ that != null &&
+ that.isInstanceOf[Actor] &&
+ that.asInstanceOf[Actor]._uuid == _uuid
+ }
+
override def toString(): String = "Actor[" + id + ":" + uuid + "]"
}
diff --git a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
index da5f6cce1d..297c1f7087 100644
--- a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
+++ b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala
@@ -126,14 +126,13 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B
val iterator = invocations.iterator
while (iterator.hasNext) {
val invocation = iterator.next
+ if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (concurrentMode) {
val invoker = messageHandlers.get(invocation.receiver)
- if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
- result.put(invocation, invoker)
+ result.put(invocation, invoker)
} else if (!busyInvokers.contains(invocation.receiver)) {
val invoker = messageHandlers.get(invocation.receiver)
- if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]")
if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null")
result.put(invocation, invoker)
busyInvokers.add(invocation.receiver)
diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala
index c790928ae5..befa25e807 100644
--- a/akka-actors/src/main/scala/dispatch/Reactor.scala
+++ b/akka-actors/src/main/scala/dispatch/Reactor.scala
@@ -67,10 +67,12 @@ class MessageInvocation(val receiver: Actor,
}
override def toString(): String = synchronized {
- "MessageInvocation[message = " + message +
- ", receiver = " + receiver +
- ", sender = " + sender +
- ", future = " + future +
- ", tx = " + tx + "]"
+ "MessageInvocation[" +
+ "\n\tmessage = " + message +
+ "\n\treceiver = " + receiver +
+ "\n\tsender = " + sender +
+ "\n\tfuture = " + future +
+ "\n\ttx = " + tx +
+ "\n]"
}
}
diff --git a/akka-actors/src/main/scala/stm/DataFlowVariable.scala b/akka-actors/src/main/scala/stm/DataFlowVariable.scala
index 83ab9d141b..2a2bcf8e0e 100644
--- a/akka-actors/src/main/scala/stm/DataFlowVariable.scala
+++ b/akka-actors/src/main/scala/stm/DataFlowVariable.scala
@@ -4,61 +4,59 @@
package se.scalablesolutions.akka.state
-import scala.actors.Actor
-import scala.actors.OutputChannel
-import scala.actors.Future
-import scala.actors.Actor._
-
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
+import se.scalablesolutions.akka.actor.Actor
+
/**
* Implements Oz-style dataflow (single assignment) variables.
- *
+ *
* @author Jonas Bonér
*/
-object DataFlow {
- def thread(body: => Unit) = {
+object DataFlow {
+ case object Start
+ case object Exit
+
+ def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
- thread ! 'start
+ thread send Start
thread
}
- def thread[MessageType, ReturnType](body: MessageType => ReturnType) =
- new ReactiveEventBasedThread(body).start
-
private class IsolatedEventBasedThread(body: => Unit) extends Actor {
- def act = loop {
- react {
- case 'start => body
- case 'exit => exit()
- }
+ def receive = {
+ case Start => body
+ case Exit => exit
}
}
- private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor {
- def act = loop {
- react {
- case 'exit => exit()
- case message => sender ! body(message.asInstanceOf[MessageType])
- }
+ def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
+ new ReactiveEventBasedThread(body).start
+
+ private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
+ extends Actor {
+ def receive = {
+ case Exit => exit
+ case message => reply(body(message.asInstanceOf[A]))
}
}
/**
* @author Jonas Bonér
*/
- sealed class DataFlowVariable[T] {
-
- private sealed abstract class DataFlowVariableMessage
- private case class Set[T](value: T) extends DataFlowVariableMessage
- private case object Get extends DataFlowVariableMessage
-
- private val value = new AtomicReference[Option[T]](None)
- private val blockedReaders = new ConcurrentLinkedQueue[Actor]
+ sealed class DataFlowVariable[T <: Any] {
+ val TIME_OUT = 10000
- private class In[T](dataFlow: DataFlowVariable[T]) extends Actor {
- def act = loop { react {
+ private sealed abstract class DataFlowVariableMessage
+ private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
+ private case object Get extends DataFlowVariableMessage
+
+ private val value = new AtomicReference[Option[T]](None)
+ private val blockedReaders = new ConcurrentLinkedQueue[Actor]
+
+ private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
+ def receive = {
case Set(v) =>
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
val iterator = dataFlow.blockedReaders.iterator
@@ -66,73 +64,75 @@ object DataFlow {
dataFlow.blockedReaders.clear
} else throw new DataFlowVariableException(
"Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])")
- case 'exit => exit()
- }}
+ case Exit => exit
+ }
}
-
- private class Out[T](dataFlow: DataFlowVariable[T]) extends Actor {
- var reader: Option[OutputChannel[Any]] = None
- def act = loop { react {
+
+ private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
+ var reader: Option[Actor] = None
+ def receive = {
case Get =>
val ref = dataFlow.value.get
- if (ref.isDefined) reply(ref.get) else reader = Some(sender)
+ if (ref.isDefined) reply(ref.get)
+ else reader = Some(sender.getOrElse(throw new IllegalStateException("No reader to DataFlowVariable is in scope")))
case Set(v) => if (reader.isDefined) reader.get ! v
- case 'exit => exit()
- }}
+ case Exit => exit
+ }
}
-
+
private[this] val in = { val in = new In(this); in.start; in }
- def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
+ def <<(ref: DataFlowVariable[T]) = in send Set(ref())
- def <<(value: T) = in ! Set(value)
-
- def apply(): T = {
+ def <<(value: T) = in send Set(value)
+
+ def apply(): T = {
val ref = value.get
if (ref.isDefined) ref.get
else {
val out = { val out = new Out(this); out.start; out }
blockedReaders.offer(out)
- val future: Future[T] = out !! (Get, {case t: T => t})
- val result = future()
- out ! 'exit
- result
+ val result = out !! (Get, TIME_OUT)
+ out send Exit
+ result.getOrElse(throw new DataFlowVariableException(
+ "Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
}
}
-
- def shutdown = in ! 'exit
+
+ def shutdown = in send Exit
}
/**
* @author Jonas Bonér
*/
- class DataFlowStream[T] extends Seq[T] {
+ class DataFlowStream[T <: Any] extends Seq[T] {
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
-
- def <<<(value: T) = {
+
+ def <<<(value: T) = {
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
- }
-
+ }
+
def apply(): T = {
val ref = queue.take
ref()
}
-
+
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
-
+
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
- else throw new UnsupportedOperationException("Access by index other than '0' is not supported by DataFlowSream")
- }
-
+ else throw new UnsupportedOperationException(
+ "Access by index other than '0' is not supported by DataFlowStream")
+ }
+
override def elements: Iterator[T] = new Iterator[T] {
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
@@ -141,7 +141,7 @@ object DataFlow {
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
-
+
/**
* @author Jonas Bonér
*/
@@ -158,8 +158,8 @@ object Test1 extends Application {
// =======================================
// This example is rom Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language)
- /*
- thread
+ /*
+ thread
Z = X+Y % will wait until both X and Y are bound to a value.
{Browse Z} % shows the value of Z.
end
@@ -183,7 +183,7 @@ object Test2 extends Application {
/*
fun {Ints N Max}
if N == Max then nil
- else
+ else
{Delay 1000}
N|{Ints N+1 Max}
end
@@ -224,11 +224,11 @@ object Test2 extends Application {
object Test3 extends Application {
// Using DataFlowStream and foldLeft to calculate sum
-
+
/*
fun {Ints N Max}
if N == Max then nil
- else
+ else
{Delay 1000}
N|{Ints N+1 Max}
end
@@ -248,20 +248,20 @@ object Test3 extends Application {
import DataFlow._
- def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
+ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
println("Generating int: " + n)
stream <<< n
ints(n + 1, max, stream)
}
- def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
+ def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
println("Calculating: " + s)
out <<< s
sum(in() + s, in, out)
}
def printSum(stream: DataFlowStream[Int]): Unit = {
- println("Result: " + stream())
+ println("Result: " + stream())
printSum(stream)
}
@@ -269,22 +269,22 @@ object Test3 extends Application {
val consumer = new DataFlowStream[Int]
thread { ints(0, 1000, producer) }
- thread {
+ thread {
Thread.sleep(1000)
- println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _))
+ println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _))
}
}
// =======================================
-object Test4 extends Application {
+object Test4 extends Application {
// Using DataFlowStream and recursive function to calculate sum
-
+
/*
fun {Ints N Max}
if N == Max then nil
- else
+ else
{Delay 1000}
N|{Ints N+1 Max}
end
@@ -304,20 +304,20 @@ object Test4 extends Application {
import DataFlow._
- def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
+ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
println("Generating int: " + n)
stream <<< n
ints(n + 1, max, stream)
}
- def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
+ def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
println("Calculating: " + s)
out <<< s
sum(in() + s, in, out)
}
def printSum(stream: DataFlowStream[Int]): Unit = {
- println("Result: " + stream())
+ println("Result: " + stream())
printSum(stream)
}
@@ -332,6 +332,7 @@ object Test4 extends Application {
// =======================================
object Test5 extends Application {
+ import Actor.Sender.Self
import DataFlow._
// create four 'Int' data flow variables
@@ -339,20 +340,20 @@ object Test5 extends Application {
val main = thread {
println("Thread 'main'")
-
+
x << 1
println("'x' set to: " + x())
-
+
println("Waiting for 'y' to be set...")
-
- if (x() > y()) {
+
+ if (x() > y()) {
z << x
println("'z' set to 'x': " + z())
- } else {
+ } else {
z << y
println("'z' set to 'y': " + z())
}
-
+
// main completed, shut down the data flow variables
x.shutdown
y.shutdown
@@ -365,18 +366,20 @@ object Test5 extends Application {
Thread.sleep(5000)
y << 2
println("'y' set to: " + y())
- }
+ }
val setV = thread {
println("Thread 'setV'")
v << y
- println("'v' set to 'y': " + v())
+ println("'v' set to 'y': " + v())
}
- // shut down the threads
- main ! 'exit
- setY ! 'exit
- setV ! 'exit
+ // shut down the threads
+ main ! Exit
+ setY ! Exit
+ setV ! Exit
//System.gc
-}
\ No newline at end of file
+}
+
+