diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 59ab5c06c2..5f08c7b900 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement { *
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'. * - * This field is used for logging, AspectRegistry.actorsFor, identifier for remote + * This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote * actor in RemoteServer etc.But also as the identifier for persistence, which means * that you can use a custom name to be able to retrieve the "correct" persisted state * upon restart, remote restart etc. @@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement { protected[akka] var _sender: Option[ActorRef] = None protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s} - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf} + protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s } + protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf } /** * The reference sender Actor of the last received message. @@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement { */ def uuid = _uuid + /** + * Tests if the actor is able to handle the message passed in as arguments. + */ + def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message) + /** * Only for internal use. UUID is effectively final. */ @@ -891,7 +896,6 @@ sealed class LocalActorRef private[akka]( } protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -924,7 +928,6 @@ sealed class LocalActorRef private[akka]( timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - sender = senderOption joinTransaction(message) if (remoteAddress.isDefined) { @@ -974,9 +977,9 @@ sealed class LocalActorRef private[akka]( Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) return } + sender = messageHandle.sender + senderFuture = messageHandle.senderFuture try { - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) } catch { @@ -990,9 +993,7 @@ sealed class LocalActorRef private[akka]( val message = messageHandle.message //serializeMessage(messageHandle.message) setTransactionSet(messageHandle.transactionSet) try { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - "No handler matching message [" + message + "] in " + toString) + actor.base(message) } catch { case e => _isBeingRestarted = true @@ -1021,20 +1022,16 @@ sealed class LocalActorRef private[akka]( } setTransactionSet(txSet) - def proceed = { - if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function - else throw new IllegalArgumentException( - toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") - setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit - } - try { if (isTransactor) { atomic { - proceed + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit } - } else proceed + } else { + actor.base(message) + setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit + } } catch { case e: IllegalStateException => {} case e => diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index ba1e8cb056..68842ad1e3 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message) * @author Jonas Bonér */ object Config extends Logging { - val VERSION = "0.10-SNAPSHOT" + val VERSION = "0.10" // Set Multiverse options for max speed System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false") diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index cfb8a9a5ea..da0f9be72b 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory { def nextId: Long = id.getAndIncrement + nodeId } +/** + * Life-cycle events for RemoteClient. + */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent +case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent @@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } isRunning = true @@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } } else { val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") - listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) throw exception } @@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String, futures.remove(reply.getId) } else { val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) throw exception } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String, client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause)) + l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String, } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } diff --git a/akka-core/src/test/scala/Bench.scala b/akka-core/src/test/scala/Bench.scala new file mode 100644 index 0000000000..454fc2b7e9 --- /dev/null +++ b/akka-core/src/test/scala/Bench.scala @@ -0,0 +1,119 @@ +/* The Computer Language Benchmarks Game + http://shootout.alioth.debian.org/ + contributed by Julien Gaugaz + inspired by the version contributed by Yura Taras and modified by Isaac Gouy +*/ +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.actor.Actor._ + +object Chameneos { + + sealed trait ChameneosEvent + case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent + case class Change(colour: Colour) extends ChameneosEvent + case class MeetingCount(count: Int) extends ChameneosEvent + case object Exit extends ChameneosEvent + + abstract class Colour + case object RED extends Colour + case object YELLOW extends Colour + case object BLUE extends Colour + case object FADED extends Colour + + val colours = Array[Colour](BLUE, RED, YELLOW) + + var start = 0L + var end = 0L + + class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor { + var meetings = 0 + self.start + mall ! Meet(self, colour) + + def receive = { + case Meet(from, otherColour) => + colour = complement(otherColour) + meetings = meetings +1 + from ! Change(colour) + mall ! Meet(self, colour) + + case Change(newColour) => + colour = newColour + meetings = meetings +1 + mall ! Meet(self, colour) + + case Exit => + colour = FADED + self.sender.get ! MeetingCount(meetings) + } + + def complement(otherColour: Colour): Colour = colour match { + case RED => otherColour match { + case RED => RED + case YELLOW => BLUE + case BLUE => YELLOW + case FADED => FADED + } + case YELLOW => otherColour match { + case RED => BLUE + case YELLOW => YELLOW + case BLUE => RED + case FADED => FADED + } + case BLUE => otherColour match { + case RED => YELLOW + case YELLOW => RED + case BLUE => BLUE + case FADED => FADED + } + case FADED => FADED + } + + override def toString = cid + "(" + colour + ")" + } + + class Mall(var n: Int, numChameneos: Int) extends Actor { + var waitingChameneo: Option[ActorRef] = None + var sumMeetings = 0 + var numFaded = 0 + + override def init = { + for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i)) + } + + def receive = { + case MeetingCount(i) => + numFaded += 1 + sumMeetings += i + if (numFaded == numChameneos) { + Chameneos.end = System.currentTimeMillis + self.stop + } + + case msg @ Meet(a, c) => + if (n > 0) { + waitingChameneo match { + case Some(chameneo) => + n -= 1 + chameneo ! msg + waitingChameneo = None + case None => waitingChameneo = self.sender + } + } else { + waitingChameneo.foreach(_ ! Exit) + self.sender.get ! Exit + } + } + } + + def run { +// System.setProperty("akka.config", "akka.conf") + Chameneos.start = System.currentTimeMillis + actorOf(new Mall(1000000, 4)).start + Thread.sleep(10000) + println("Elapsed: " + (end - start)) + } + + def main(args : Array[String]): Unit = run +} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala new file mode 100644 index 0000000000..22d294c735 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala @@ -0,0 +1,74 @@ +package se.scalablesolutions.akka.persistence.redis + +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.persistence.common.PersistentVector +import se.scalablesolutions.akka.stm.Transaction.Global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import java.util.{Calendar, Date} + +object Serial { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = new Date(read[Long](in)) + def writes(out: Output, value: Date) = write[Long](out, value.getTime) + } + case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date]) + implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get) +} + +case class GETFOO(s: String) +case class SETFOO(s: String) + +object SampleStorage { + class RedisSampleStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val EVENT_MAP = "akka.sample.map" + + private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) } + + import sbinary._ + import DefaultProtocol._ + import Operations._ + import Serial._ + import java.util.Calendar + + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + def receive = { + case SETFOO(str) => + atomic { + eventMap += (str.getBytes, toByteArray[Name](n)) + } + self.reply(str) + + case GETFOO(str) => + val ev = atomic { + eventMap.keySet.size + } + println("************* " + ev) + self.reply(ev) + } + } +} + +import Serial._ +import SampleStorage._ + +object Runner { + def run { + val proc = actorOf[RedisSampleStorage] + proc.start + val i: Option[String] = proc !! SETFOO("debasish") + println("i = " + i) + val ev: Option[Int] = proc !! GETFOO("debasish") + println(ev) + } +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 04ba26798d..2ac6bf42f1 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -15,7 +15,7 @@