Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-12-21 17:41:31 +01:00
commit 417b628217
13 changed files with 187 additions and 71 deletions

View file

@ -109,18 +109,6 @@ object Actor extends Logging {
tf.setAccessible(true)
val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]]
subclassAudits.synchronized {subclassAudits.clear}
// Clear and reset j.u.l.Level.known (due to Configgy)
log.slf4j.info("Removing Configgy-installed log levels")
import java.util.logging.Level
val lf = classOf[Level].getDeclaredField("known")
lf.setAccessible(true)
val known = lf.get(null).asInstanceOf[java.util.ArrayList[Level]]
known.synchronized {
known.clear
List(Level.OFF,Level.SEVERE,Level.WARNING,Level.INFO,Level.CONFIG,
Level.FINE,Level.FINER,Level.FINEST,Level.ALL) foreach known.add
}
}
}
Runtime.getRuntime.addShutdownHook(new Thread(hook))

View file

@ -1238,7 +1238,7 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType, loader)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -1246,7 +1246,7 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType, loader)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}

View file

@ -64,9 +64,19 @@ object Config extends Logging {
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (getClass.getClassLoader.getResource(confName) ne null) {
try {
Configgy.configureFromResource(confName, getClass.getClassLoader)
log.slf4j.info("Config [{}] loaded from the application classpath.",confName)
} catch {
case e: ParseException => throw new ConfigurationException(
"Can't load '" + confName + "' config file from application classpath," +
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (HOME.isDefined) {
try {
val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/" + confName
val configFile = HOME.get + "/config/" + confName
Configgy.configure(configFile)
log.slf4j.info(
"AKKA_HOME is defined as [{}], config loaded from [{}].",
@ -79,16 +89,6 @@ object Config extends Logging {
"\n\tdue to: " + e.toString)
}
Configgy.config
} else if (getClass.getClassLoader.getResource(confName) ne null) {
try {
Configgy.configureFromResource(confName, getClass.getClassLoader)
log.slf4j.info("Config [{}] loaded from the application classpath.",confName)
} catch {
case e: ParseException => throw new ConfigurationException(
"Can't load '" + confName + "' config file from application classpath," +
"\n\tdue to: " + e.toString)
}
Configgy.config
} else {
log.slf4j.warn(
"\nCan't load '" + confName + "'." +

View file

@ -92,9 +92,10 @@ object ReflectiveAccess extends Logging {
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = {
actorType: ActorType,
loader: Option[ClassLoader] = None): Option[CompletableFuture[T]] = {
ensureEnabled
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
clientFor(remoteAddress.getHostName, remoteAddress.getPort, loader).send[T](
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
}
}

View file

@ -11,17 +11,18 @@ import akka.util._
import com.google.protobuf.{Message, ByteString}
object MessageSerializer extends Logging {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
private def SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
SERIALIZER_SBINARY.classLoader = Some(cl)
val someCl = Some(cl)
SERIALIZER_JAVA.classLoader = someCl
SERIALIZER_JAVA_JSON.classLoader = someCl
SERIALIZER_SCALA_JSON.classLoader = someCl
SERIALIZER_SBINARY.classLoader = someCl
}
def deserialize(messageProtocol: MessageProtocol): Any = {

View file

@ -56,6 +56,8 @@ case class RemoteClientShutdown(
*/
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage)
/**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
*
@ -414,32 +416,33 @@ class RemoteClientHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
val result = event.getMessage
if (result.isInstanceOf[RemoteMessageProtocol]) {
val reply = result.asInstanceOf[RemoteMessageProtocol]
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
event.getMessage match {
case reply: RemoteMessageProtocol =>
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
val exception = parseException(reply, client.loader)
if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, exception)
}
future.completeWithException(exception)
}
val exception = parseException(reply, client.loader)
future.completeWithException(exception)
}
futures remove replyUuid
} else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
case message =>
val exception = new RemoteClientException("Unknown message received in remote client handler: " + message, client)
client.notifyListeners(RemoteClientError(exception, client))
throw exception
}
@ -506,10 +509,17 @@ class RemoteClientHandler(
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
try {
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem =>
log.debug("Couldn't parse exception returned from RemoteServer",problem)
log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage)
UnparsableException(classname, exception.getMessage)
}
}
}

View file

@ -295,7 +295,10 @@ object RemoteActorSerialization {
}
val actorInfo = actorInfoBuilder.build
val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid(uuidProtocol)
.setUuid({
val messageUuid = newUuid
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
})
.setActorInfo(actorInfo)
.setOneWay(isOneWay)

View file

@ -18,7 +18,7 @@ import sjson.json.{Serializer => SJSONSerializer}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable trait Serializer {
var classLoader: Option[ClassLoader] = None
@volatile var classLoader: Option[ClassLoader] = None
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
def toBinary(obj: AnyRef): Array[Byte]

View file

@ -175,5 +175,8 @@ trait Transactor extends Actor {
/**
* Default catch-all for the different Receive methods.
*/
def doNothing: Receive = { case _ => }
def doNothing: Receive = new Receive {
def apply(any: Any) = {}
def isDefinedAt(any: Any) = false
}
}

View file

@ -59,8 +59,22 @@ object TransactorIncrement {
}
}
object SimpleTransactor {
case class Set(ref: Ref[Int], value: Int, latch: CountDownLatch)
class Setter extends Transactor {
def atomically = {
case Set(ref, value, latch) => {
ref.set(value)
latch.countDown
}
}
}
}
class TransactorSpec extends WordSpec with MustMatchers {
import TransactorIncrement._
import SimpleTransactor._
val numCounters = 5
val timeout = 5 seconds
@ -97,4 +111,17 @@ class TransactorSpec extends WordSpec with MustMatchers {
failer.stop
}
}
"Transactor" should {
"be usable without overriding normally" in {
val transactor = Actor.actorOf(new Setter).start
val ref = Ref(0)
val latch = new CountDownLatch(1)
transactor ! Set(ref, 5, latch)
latch.await(timeout.length, timeout.unit)
val value = atomic { ref.get }
value must be === 5
transactor.stop
}
}
}

View file

@ -0,0 +1,83 @@
<?xml version='1.0' encoding='UTF-8'?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<packaging>jar</packaging>
<version>2.0.2-nologgy</version>
<name>Configgy</name>
<description>Configgy logging removed</description>
<url>http://github.com/derekjw/configgy</url>
<licenses>
<license>
<name>Apache 2</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>scalatoolsorg</id>
<name>scala-tools.org</name>
<url>http://scala-tools.org/repo-releases/</url>
</repository>
<repository>
<id>atlassian</id>
<name>atlassian</name>
<url>https://m2proxy.atlassian.com/repository/public/</url>
</repository>
<repository>
<id>lagnet</id>
<name>lag.net</name>
<url>http://www.lag.net/repo/</url>
</repository>
<repository>
<id>testingscalatoolsorg</id>
<name>testing.scala-tools.org</name>
<url>http://scala-tools.org/repo-releases/testing/</url>
</repository>
<repository>
<id>oauthnet</id>
<name>oauth.net</name>
<url>http://oauth.googlecode.com/svn/code/maven/</url>
</repository>
<repository>
<id>downloadjavanet</id>
<name>download.java.net</name>
<url>http://download.java.net/maven/2/</url>
</repository>
<repository>
<id>oldtwittercom</id>
<name>old.twitter.com</name>
<url>http://www.lag.net/nest/</url>
</repository>
<repository>
<id>twittercom</id>
<name>twitter.com</name>
<url>http://maven.twttr.com/</url>
</repository>
<repository>
<id>powermockapi</id>
<name>powermock-api</name>
<url>http://powermock.googlecode.com/svn/repo/</url>
</repository>
<repository>
<id>ibiblio</id>
<name>ibiblio</name>
<url>http://mirrors.ibiblio.org/pub/mirrors/maven2/</url>
</repository>
<repository>
<id>ScalaToolsMaven2Repository</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases/</url>
</repository>
</repositories>
</project>

View file

@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2
lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2
lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
@ -538,4 +538,4 @@ trait McPom { self: DefaultProject =>
rewrite(rule)(node.theSeq)(0)
}
}
}