From 2af6adcfa0bb2972f34d668b05ad8c5159b2d198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 8 Dec 2009 16:17:22 +0100 Subject: [PATCH] fixed actor bug related to hashcode --- akka-actors/src/main/scala/actor/Actor.scala | 16 +- .../EventBasedThreadPoolDispatcher.scala | 5 +- .../src/main/scala/dispatch/Reactor.scala | 12 +- .../src/main/scala/stm/DataFlowVariable.scala | 189 +++++++++--------- 4 files changed, 119 insertions(+), 103 deletions(-) diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index ba5125717b..61c70ba10b 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -17,11 +17,10 @@ import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.Helpers.ReadWriteLock -import se.scalablesolutions.akka.util.Logging - import org.codehaus.aspectwerkz.proxy.Uuid import org.multiverse.api.ThreadLocalTransaction._ +import se.scalablesolutions.akka.util.{HashCode, Logging} /** * Mix in this trait to give an actor TransactionRequired semantics. @@ -215,6 +214,7 @@ trait Actor extends TransactionManagement { implicit protected val self: Actor = this // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait + // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid @@ -959,5 +959,17 @@ trait Actor extends TransactionManagement { } else message } else message + override def hashCode(): Int = { + var result = HashCode.SEED + result = HashCode.hash(result, _uuid) + result + } + + override def equals(that: Any): Boolean = { + that != null && + that.isInstanceOf[Actor] && + that.asInstanceOf[Actor]._uuid == _uuid + } + override def toString(): String = "Actor[" + id + ":" + uuid + "]" } diff --git a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala index da5f6cce1d..297c1f7087 100644 --- a/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala +++ b/akka-actors/src/main/scala/dispatch/EventBasedThreadPoolDispatcher.scala @@ -126,14 +126,13 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B val iterator = invocations.iterator while (iterator.hasNext) { val invocation = iterator.next + if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (concurrentMode) { val invoker = messageHandlers.get(invocation.receiver) - if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") - result.put(invocation, invoker) + result.put(invocation, invoker) } else if (!busyInvokers.contains(invocation.receiver)) { val invoker = messageHandlers.get(invocation.receiver) - if (invocation == null) throw new IllegalStateException("Message invocation is null [" + invocation + "]") if (invoker == null) throw new IllegalStateException("Message invoker for invocation [" + invocation + "] is null") result.put(invocation, invoker) busyInvokers.add(invocation.receiver) diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index c790928ae5..befa25e807 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -67,10 +67,12 @@ class MessageInvocation(val receiver: Actor, } override def toString(): String = synchronized { - "MessageInvocation[message = " + message + - ", receiver = " + receiver + - ", sender = " + sender + - ", future = " + future + - ", tx = " + tx + "]" + "MessageInvocation[" + + "\n\tmessage = " + message + + "\n\treceiver = " + receiver + + "\n\tsender = " + sender + + "\n\tfuture = " + future + + "\n\ttx = " + tx + + "\n]" } } diff --git a/akka-actors/src/main/scala/stm/DataFlowVariable.scala b/akka-actors/src/main/scala/stm/DataFlowVariable.scala index 83ab9d141b..2a2bcf8e0e 100644 --- a/akka-actors/src/main/scala/stm/DataFlowVariable.scala +++ b/akka-actors/src/main/scala/stm/DataFlowVariable.scala @@ -4,61 +4,59 @@ package se.scalablesolutions.akka.state -import scala.actors.Actor -import scala.actors.OutputChannel -import scala.actors.Future -import scala.actors.Actor._ - import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} +import se.scalablesolutions.akka.actor.Actor + /** * Implements Oz-style dataflow (single assignment) variables. - * + * * @author Jonas Bonér */ -object DataFlow { - def thread(body: => Unit) = { +object DataFlow { + case object Start + case object Exit + + def thread(body: => Unit) = { val thread = new IsolatedEventBasedThread(body).start - thread ! 'start + thread send Start thread } - def thread[MessageType, ReturnType](body: MessageType => ReturnType) = - new ReactiveEventBasedThread(body).start - private class IsolatedEventBasedThread(body: => Unit) extends Actor { - def act = loop { - react { - case 'start => body - case 'exit => exit() - } + def receive = { + case Start => body + case Exit => exit } } - private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor { - def act = loop { - react { - case 'exit => exit() - case message => sender ! body(message.asInstanceOf[MessageType]) - } + def thread[A <: AnyRef, R <: AnyRef](body: A => R) = + new ReactiveEventBasedThread(body).start + + private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T) + extends Actor { + def receive = { + case Exit => exit + case message => reply(body(message.asInstanceOf[A])) } } /** * @author Jonas Bonér */ - sealed class DataFlowVariable[T] { - - private sealed abstract class DataFlowVariableMessage - private case class Set[T](value: T) extends DataFlowVariableMessage - private case object Get extends DataFlowVariableMessage - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[Actor] + sealed class DataFlowVariable[T <: Any] { + val TIME_OUT = 10000 - private class In[T](dataFlow: DataFlowVariable[T]) extends Actor { - def act = loop { react { + private sealed abstract class DataFlowVariableMessage + private case class Set[T <: Any](value: T) extends DataFlowVariableMessage + private case object Get extends DataFlowVariableMessage + + private val value = new AtomicReference[Option[T]](None) + private val blockedReaders = new ConcurrentLinkedQueue[Actor] + + private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { + def receive = { case Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { val iterator = dataFlow.blockedReaders.iterator @@ -66,73 +64,75 @@ object DataFlow { dataFlow.blockedReaders.clear } else throw new DataFlowVariableException( "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case 'exit => exit() - }} + case Exit => exit + } } - - private class Out[T](dataFlow: DataFlowVariable[T]) extends Actor { - var reader: Option[OutputChannel[Any]] = None - def act = loop { react { + + private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { + var reader: Option[Actor] = None + def receive = { case Get => val ref = dataFlow.value.get - if (ref.isDefined) reply(ref.get) else reader = Some(sender) + if (ref.isDefined) reply(ref.get) + else reader = Some(sender.getOrElse(throw new IllegalStateException("No reader to DataFlowVariable is in scope"))) case Set(v) => if (reader.isDefined) reader.get ! v - case 'exit => exit() - }} + case Exit => exit + } } - + private[this] val in = { val in = new In(this); in.start; in } - def <<(ref: DataFlowVariable[T]) = in ! Set(ref()) + def <<(ref: DataFlowVariable[T]) = in send Set(ref()) - def <<(value: T) = in ! Set(value) - - def apply(): T = { + def <<(value: T) = in send Set(value) + + def apply(): T = { val ref = value.get if (ref.isDefined) ref.get else { val out = { val out = new Out(this); out.start; out } blockedReaders.offer(out) - val future: Future[T] = out !! (Get, {case t: T => t}) - val result = future() - out ! 'exit - result + val result = out !! (Get, TIME_OUT) + out send Exit + result.getOrElse(throw new DataFlowVariableException( + "Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")) } } - - def shutdown = in ! 'exit + + def shutdown = in send Exit } /** * @author Jonas Bonér */ - class DataFlowStream[T] extends Seq[T] { + class DataFlowStream[T <: Any] extends Seq[T] { private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) - - def <<<(value: T) = { + + def <<<(value: T) = { val ref = new DataFlowVariable[T] ref << value queue.offer(ref) - } - + } + def apply(): T = { val ref = queue.take ref() } - + def take: DataFlowVariable[T] = queue.take //==== For Seq ==== - + def length: Int = queue.size def apply(i: Int): T = { if (i == 0) apply() - else throw new UnsupportedOperationException("Access by index other than '0' is not supported by DataFlowSream") - } - + else throw new UnsupportedOperationException( + "Access by index other than '0' is not supported by DataFlowStream") + } + override def elements: Iterator[T] = new Iterator[T] { private val iter = queue.iterator def hasNext: Boolean = iter.hasNext @@ -141,7 +141,7 @@ object DataFlow { override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] } - + /** * @author Jonas Bonér */ @@ -158,8 +158,8 @@ object Test1 extends Application { // ======================================= // This example is rom Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language) - /* - thread + /* + thread Z = X+Y % will wait until both X and Y are bound to a value. {Browse Z} % shows the value of Z. end @@ -183,7 +183,7 @@ object Test2 extends Application { /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -224,11 +224,11 @@ object Test2 extends Application { object Test3 extends Application { // Using DataFlowStream and foldLeft to calculate sum - + /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -248,20 +248,20 @@ object Test3 extends Application { import DataFlow._ - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { - println("Result: " + stream()) + println("Result: " + stream()) printSum(stream) } @@ -269,22 +269,22 @@ object Test3 extends Application { val consumer = new DataFlowStream[Int] thread { ints(0, 1000, producer) } - thread { + thread { Thread.sleep(1000) - println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) + println("Sum: " + producer.map(x => x * x).foldLeft(0)(_ + _)) } } // ======================================= -object Test4 extends Application { +object Test4 extends Application { // Using DataFlowStream and recursive function to calculate sum - + /* fun {Ints N Max} if N == Max then nil - else + else {Delay 1000} N|{Ints N+1 Max} end @@ -304,20 +304,20 @@ object Test4 extends Application { import DataFlow._ - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream <<< n ints(n + 1, max, stream) } - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { println("Calculating: " + s) out <<< s sum(in() + s, in, out) } def printSum(stream: DataFlowStream[Int]): Unit = { - println("Result: " + stream()) + println("Result: " + stream()) printSum(stream) } @@ -332,6 +332,7 @@ object Test4 extends Application { // ======================================= object Test5 extends Application { + import Actor.Sender.Self import DataFlow._ // create four 'Int' data flow variables @@ -339,20 +340,20 @@ object Test5 extends Application { val main = thread { println("Thread 'main'") - + x << 1 println("'x' set to: " + x()) - + println("Waiting for 'y' to be set...") - - if (x() > y()) { + + if (x() > y()) { z << x println("'z' set to 'x': " + z()) - } else { + } else { z << y println("'z' set to 'y': " + z()) } - + // main completed, shut down the data flow variables x.shutdown y.shutdown @@ -365,18 +366,20 @@ object Test5 extends Application { Thread.sleep(5000) y << 2 println("'y' set to: " + y()) - } + } val setV = thread { println("Thread 'setV'") v << y - println("'v' set to 'y': " + v()) + println("'v' set to 'y': " + v()) } - // shut down the threads - main ! 'exit - setY ! 'exit - setV ! 'exit + // shut down the threads + main ! Exit + setY ! Exit + setV ! Exit //System.gc -} \ No newline at end of file +} + +