Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Martin Krasser 2010-06-10 19:41:58 +02:00
commit c227c77dd5
8 changed files with 226 additions and 33 deletions

View file

@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement {
* <p/>
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
* <p/>
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote
* This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
* actor in RemoteServer etc.But also as the identifier for persistence, which means
* that you can use a custom name to be able to retrieve the "correct" persisted state
* upon restart, remote restart etc.
@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement {
protected[akka] var _sender: Option[ActorRef] = None
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s}
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf}
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* The reference sender Actor of the last received message.
@ -243,6 +243,11 @@ trait ActorRef extends TransactionManagement {
*/
def uuid = _uuid
/**
* Tests if the actor is able to handle the message passed in as arguments.
*/
def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message)
/**
* Only for internal use. UUID is effectively final.
*/
@ -891,7 +896,6 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@ -924,7 +928,6 @@ sealed class LocalActorRef private[akka](
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
sender = senderOption
joinTransaction(message)
if (remoteAddress.isDefined) {
@ -974,9 +977,9 @@ sealed class LocalActorRef private[akka](
Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle)
return
}
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
try {
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
} catch {
@ -990,9 +993,7 @@ sealed class LocalActorRef private[akka](
val message = messageHandle.message //serializeMessage(messageHandle.message)
setTransactionSet(messageHandle.transactionSet)
try {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
"No handler matching message [" + message + "] in " + toString)
actor.base(message)
} catch {
case e =>
_isBeingRestarted = true
@ -1021,20 +1022,16 @@ sealed class LocalActorRef private[akka](
}
setTransactionSet(txSet)
def proceed = {
if (actor.base.isDefinedAt(message)) actor.base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException(
toString + " could not process message [" + message + "]" +
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
try {
if (isTransactor) {
atomic {
proceed
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} else proceed
} else {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
} catch {
case e: IllegalStateException => {}
case e =>

View file

@ -16,7 +16,7 @@ class ConfigurationException(message: String) extends RuntimeException(message)
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Config extends Logging {
val VERSION = "0.10-SNAPSHOT"
val VERSION = "0.10"
// Set Multiverse options for max speed
System.setProperty("org.multiverse.MuliverseConstants.sanityChecks", "false")

View file

@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory {
def nextId: Long = id.getAndIncrement + nodeId
}
/**
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause))
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
}
isRunning = true
@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
}
} else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port))
throw exception
}
@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId)
} else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port))
throw exception
}
} catch {
case e: Exception =>
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port))
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String,
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.listeners.toArray.foreach(l =>
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String,
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause))
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port))
log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close
}

View file

@ -0,0 +1,119 @@
/* The Computer Language Benchmarks Game
http://shootout.alioth.debian.org/
contributed by Julien Gaugaz
inspired by the version contributed by Yura Taras and modified by Isaac Gouy
*/
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.actor.Actor._
object Chameneos {
sealed trait ChameneosEvent
case class Meet(from: ActorRef, colour: Colour) extends ChameneosEvent
case class Change(colour: Colour) extends ChameneosEvent
case class MeetingCount(count: Int) extends ChameneosEvent
case object Exit extends ChameneosEvent
abstract class Colour
case object RED extends Colour
case object YELLOW extends Colour
case object BLUE extends Colour
case object FADED extends Colour
val colours = Array[Colour](BLUE, RED, YELLOW)
var start = 0L
var end = 0L
class Chameneo(var mall: ActorRef, var colour: Colour, cid: Int) extends Actor {
var meetings = 0
self.start
mall ! Meet(self, colour)
def receive = {
case Meet(from, otherColour) =>
colour = complement(otherColour)
meetings = meetings +1
from ! Change(colour)
mall ! Meet(self, colour)
case Change(newColour) =>
colour = newColour
meetings = meetings +1
mall ! Meet(self, colour)
case Exit =>
colour = FADED
self.sender.get ! MeetingCount(meetings)
}
def complement(otherColour: Colour): Colour = colour match {
case RED => otherColour match {
case RED => RED
case YELLOW => BLUE
case BLUE => YELLOW
case FADED => FADED
}
case YELLOW => otherColour match {
case RED => BLUE
case YELLOW => YELLOW
case BLUE => RED
case FADED => FADED
}
case BLUE => otherColour match {
case RED => YELLOW
case YELLOW => RED
case BLUE => BLUE
case FADED => FADED
}
case FADED => FADED
}
override def toString = cid + "(" + colour + ")"
}
class Mall(var n: Int, numChameneos: Int) extends Actor {
var waitingChameneo: Option[ActorRef] = None
var sumMeetings = 0
var numFaded = 0
override def init = {
for (i <- 0 until numChameneos) actorOf(new Chameneo(self, colours(i % 3), i))
}
def receive = {
case MeetingCount(i) =>
numFaded += 1
sumMeetings += i
if (numFaded == numChameneos) {
Chameneos.end = System.currentTimeMillis
self.stop
}
case msg @ Meet(a, c) =>
if (n > 0) {
waitingChameneo match {
case Some(chameneo) =>
n -= 1
chameneo ! msg
waitingChameneo = None
case None => waitingChameneo = self.sender
}
} else {
waitingChameneo.foreach(_ ! Exit)
self.sender.get ! Exit
}
}
}
def run {
// System.setProperty("akka.config", "akka.conf")
Chameneos.start = System.currentTimeMillis
actorOf(new Mall(1000000, 4)).start
Thread.sleep(10000)
println("Elapsed: " + (end - start))
}
def main(args : Array[String]): Unit = run
}

View file

@ -0,0 +1,74 @@
package se.scalablesolutions.akka.persistence.redis
import sbinary._
import sbinary.Operations._
import sbinary.DefaultProtocol._
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import java.util.{Calendar, Date}
object Serial {
implicit object DateFormat extends Format[Date] {
def reads(in : Input) = new Date(read[Long](in))
def writes(out: Output, value: Date) = write[Long](out, value.getTime)
}
case class Name(id: Int, name: String, address: String, dateOfBirth: Date, dateDied: Option[Date])
implicit val NameFormat: Format[Name] = asProduct5(Name)(Name.unapply(_).get)
}
case class GETFOO(s: String)
case class SETFOO(s: String)
object SampleStorage {
class RedisSampleStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val EVENT_MAP = "akka.sample.map"
private var eventMap = atomic { RedisStorage.getMap(EVENT_MAP) }
import sbinary._
import DefaultProtocol._
import Operations._
import Serial._
import java.util.Calendar
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
def receive = {
case SETFOO(str) =>
atomic {
eventMap += (str.getBytes, toByteArray[Name](n))
}
self.reply(str)
case GETFOO(str) =>
val ev = atomic {
eventMap.keySet.size
}
println("************* " + ev)
self.reply(ev)
}
}
}
import Serial._
import SampleStorage._
object Runner {
def run {
val proc = actorOf[RedisSampleStorage]
proc.start
val i: Option[String] = proc !! SETFOO("debasish")
println("i = " + i)
val ev: Option[Int] = proc !! GETFOO("debasish")
println(ev)
}
}

View file

@ -15,7 +15,7 @@
</log>
<akka>
version = "0.10-SNAPSHOT"
version = "0.10"
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor

View file

@ -1,6 +1,6 @@
project.organization=se.scalablesolutions.akka
project.name=akka
project.version=0.10-SNAPSHOT
project.version=0.10
scala.version=2.8.0.RC3
sbt.version=0.7.4
def.scala.version=2.7.7

View file

@ -46,7 +46,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
// Therefore, if repositories are defined, this must happen as def, not as val.
// -------------------------------------------------------------------------------------------------------------------
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here!
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString
val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo)
@ -365,8 +365,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
def removeDupEntries(paths: PathFinder) =
Path.lazyPathFinder {
val mapped = paths.get map { p => (p.relativePath, p) }
(Map() ++ mapped).values.toList
}
(Map() ++ mapped).values.toList
}
def allArtifacts = {
Path.fromFile(buildScalaInstance.libraryJar) +++