Merged with release_1_0_RC1 plus fixed some tests
This commit is contained in:
commit
dc15562ce1
15 changed files with 169 additions and 51 deletions
|
|
@ -97,18 +97,6 @@ object Actor extends Logging {
|
|||
tf.setAccessible(true)
|
||||
val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]]
|
||||
subclassAudits.synchronized {subclassAudits.clear}
|
||||
|
||||
// Clear and reset j.u.l.Level.known (due to Configgy)
|
||||
log.slf4j.info("Removing Configgy-installed log levels")
|
||||
import java.util.logging.Level
|
||||
val lf = classOf[Level].getDeclaredField("known")
|
||||
lf.setAccessible(true)
|
||||
val known = lf.get(null).asInstanceOf[java.util.ArrayList[Level]]
|
||||
known.synchronized {
|
||||
known.clear
|
||||
List(Level.OFF,Level.SEVERE,Level.WARNING,Level.INFO,Level.CONFIG,
|
||||
Level.FINE,Level.FINER,Level.FINEST,Level.ALL) foreach known.add
|
||||
}
|
||||
}
|
||||
}
|
||||
Runtime.getRuntime.addShutdownHook(new Thread(hook))
|
||||
|
|
|
|||
|
|
@ -1096,14 +1096,14 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
start
|
||||
|
||||
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
|
||||
ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType)
|
||||
ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType, loader)
|
||||
|
||||
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
|
||||
message: Any,
|
||||
timeout: Long,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||
val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType)
|
||||
val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType, loader)
|
||||
if (future.isDefined) future.get
|
||||
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -238,7 +238,8 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
|
|||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType): Option[CompletableFuture[T]]
|
||||
actorType: ActorType,
|
||||
loader: Option[ClassLoader]): Option[CompletableFuture[T]]
|
||||
|
||||
//TODO: REVISIT: IMPLEMENT OR REMOVE
|
||||
//private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef
|
||||
|
|
|
|||
|
|
@ -11,17 +11,18 @@ import akka.util._
|
|||
import com.google.protobuf.{Message, ByteString}
|
||||
|
||||
object MessageSerializer extends Logging {
|
||||
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
|
||||
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
|
||||
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
|
||||
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
|
||||
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
|
||||
private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java
|
||||
private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
|
||||
private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
|
||||
private def SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
|
||||
private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = {
|
||||
SERIALIZER_JAVA.classLoader = Some(cl)
|
||||
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
|
||||
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
|
||||
SERIALIZER_SBINARY.classLoader = Some(cl)
|
||||
val someCl = Some(cl)
|
||||
SERIALIZER_JAVA.classLoader = someCl
|
||||
SERIALIZER_JAVA_JSON.classLoader = someCl
|
||||
SERIALIZER_SCALA_JSON.classLoader = someCl
|
||||
SERIALIZER_SBINARY.classLoader = someCl
|
||||
}
|
||||
|
||||
def deserialize(messageProtocol: MessageProtocol): Any = {
|
||||
|
|
|
|||
|
|
@ -63,6 +63,11 @@ case class RemoteClientShutdown(
|
|||
*/
|
||||
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* Returned when a remote exception cannot be instantiated or parsed
|
||||
*/
|
||||
case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage)
|
||||
|
||||
/**
|
||||
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
|
||||
*
|
||||
|
|
@ -83,8 +88,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: AkkaActorType): Option[CompletableFuture[T]] =
|
||||
clientFor(remoteAddress, None).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
||||
actorType: AkkaActorType,
|
||||
loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
|
||||
clientFor(remoteAddress, loader).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
|
||||
|
||||
private[akka] def clientFor(
|
||||
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVIST: synchronized here seems bottlenecky
|
||||
|
|
@ -376,11 +382,14 @@ class RemoteClientHandler(
|
|||
log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
|
||||
log.slf4j.debug("Trying to map back to future: {}",replyUuid)
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
|
||||
if (reply.hasMessage) {
|
||||
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
|
||||
val message = MessageSerializer.deserialize(reply.getMessage)
|
||||
future.completeWithResult(message)
|
||||
} else {
|
||||
val exception = parseException(reply, client.loader)
|
||||
|
||||
if (reply.hasSupervisorUuid()) {
|
||||
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
|
||||
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
|
||||
|
|
@ -388,10 +397,10 @@ class RemoteClientHandler(
|
|||
val supervisedActor = supervisors.get(supervisorUuid)
|
||||
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
|
||||
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
|
||||
else supervisedActor.supervisor.get ! Exit(supervisedActor, exception)
|
||||
}
|
||||
|
||||
future.completeWithException(parseException(reply, client.loader))
|
||||
future.completeWithException(exception)
|
||||
}
|
||||
|
||||
case other =>
|
||||
|
|
@ -460,11 +469,18 @@ class RemoteClientHandler(
|
|||
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
|
||||
val exception = reply.getException
|
||||
val classname = exception.getClassname
|
||||
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
|
||||
else Class.forName(classname)
|
||||
exceptionClass
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
try {
|
||||
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
|
||||
else Class.forName(classname)
|
||||
exceptionClass
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
} catch {
|
||||
case problem =>
|
||||
log.debug("Couldn't parse exception returned from RemoteServer",problem)
|
||||
log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage)
|
||||
UnparsableException(classname, exception.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1119,10 +1135,10 @@ class RemoteServerHandler(
|
|||
val uuid = actorInfo.getUuid
|
||||
val id = actorInfo.getId
|
||||
val sessionActorRefOrNull = findSessionActor(id, channel)
|
||||
if (sessionActorRefOrNull ne null)
|
||||
if (sessionActorRefOrNull ne null) {
|
||||
log.debug("found session actor with id {} for channel {}",id, channel)
|
||||
sessionActorRefOrNull
|
||||
else
|
||||
{
|
||||
} else {
|
||||
// we dont have it in the session either, see if we have a factory for it
|
||||
val actorFactoryOrNull = findActorFactory(id)
|
||||
if (actorFactoryOrNull ne null) {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import sjson.json.{Serializer => SJSONSerializer}
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable trait Serializer {
|
||||
var classLoader: Option[ClassLoader] = None
|
||||
@volatile var classLoader: Option[ClassLoader] = None
|
||||
def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass))
|
||||
|
||||
def toBinary(obj: AnyRef): Array[Byte]
|
||||
|
|
|
|||
|
|
@ -54,17 +54,16 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
|
|||
|
||||
"A remote session Actor" should {
|
||||
"create a new session actor per connection" in {
|
||||
remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec])
|
||||
|
||||
val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port)
|
||||
|
||||
val default1 = session1 !! GetUser()
|
||||
default1.as[String].get must equal ("anonymous")
|
||||
default1.as[String] must equal (Some("anonymous"))
|
||||
|
||||
session1 ! Login("session[1]")
|
||||
val result1 = session1 !! GetUser()
|
||||
result1.as[String].get must equal ("session[1]")
|
||||
|
||||
session1.stop
|
||||
result1.as[String] must equal (Some("session[1]"))
|
||||
|
||||
remote.shutdownClientModule
|
||||
|
||||
|
|
@ -72,9 +71,7 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest {
|
|||
|
||||
// since this is a new session, the server should reset the state
|
||||
val default2 = session2 !! GetUser()
|
||||
default2.as[String].get must equal ("anonymous")
|
||||
|
||||
session2.stop()
|
||||
default2.as[String] must equal (Some("anonymous"))
|
||||
}
|
||||
|
||||
/*"stop the actor when the client disconnects" in {
|
||||
|
|
|
|||
|
|
@ -41,9 +41,10 @@ class ProtobufActorMessageSerializationSpec extends AkkaRemoteTest {
|
|||
|
||||
"A ProtobufMessage" should {
|
||||
"SendReplyAsync" in {
|
||||
remote.register("RemoteActorSpecActorBidirectional",actorOf[RemoteActorSpecActorBidirectional])
|
||||
val actor = remote.actorFor("RemoteActorSpecActorBidirectional", 5000L, host, port)
|
||||
val result = actor !! ProtobufPOJO.newBuilder.setId(11).setStatus(true).setName("Coltrane").build
|
||||
result.as[Long].get must be (12)
|
||||
result.as[Long] must equal (Some(12))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ trait AkkaBaseProject extends BasicScalaProject {
|
|||
// is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
|
||||
|
||||
// for development version resolve to .ivy2/local
|
||||
// val akkaModuleConfig = ModuleConfiguration("akka", AkkaRepo)
|
||||
// val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
|
||||
|
||||
val aspectwerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", AkkaRepo)
|
||||
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
|
||||
|
|
|
|||
|
|
@ -175,5 +175,8 @@ trait Transactor extends Actor {
|
|||
/**
|
||||
* Default catch-all for the different Receive methods.
|
||||
*/
|
||||
def doNothing: Receive = { case _ => }
|
||||
def doNothing: Receive = new Receive {
|
||||
def apply(any: Any) = {}
|
||||
def isDefinedAt(any: Any) = false
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,8 +59,22 @@ object TransactorIncrement {
|
|||
}
|
||||
}
|
||||
|
||||
object SimpleTransactor {
|
||||
case class Set(ref: Ref[Int], value: Int, latch: CountDownLatch)
|
||||
|
||||
class Setter extends Transactor {
|
||||
def atomically = {
|
||||
case Set(ref, value, latch) => {
|
||||
ref.set(value)
|
||||
latch.countDown
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TransactorSpec extends WordSpec with MustMatchers {
|
||||
import TransactorIncrement._
|
||||
import SimpleTransactor._
|
||||
|
||||
val numCounters = 5
|
||||
val timeout = 5 seconds
|
||||
|
|
@ -97,4 +111,17 @@ class TransactorSpec extends WordSpec with MustMatchers {
|
|||
failer.stop
|
||||
}
|
||||
}
|
||||
|
||||
"Transactor" should {
|
||||
"be usable without overriding normally" in {
|
||||
val transactor = Actor.actorOf(new Setter).start
|
||||
val ref = Ref(0)
|
||||
val latch = new CountDownLatch(1)
|
||||
transactor ! Set(ref, 5, latch)
|
||||
latch.await(timeout.length, timeout.unit)
|
||||
val value = atomic { ref.get }
|
||||
value must be === 5
|
||||
transactor.stop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -841,12 +841,13 @@ private[akka] abstract class ActorAspect {
|
|||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
|
||||
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
|
||||
//TODO: REVISIT: MAKE REGISTRY COME FROM ACTORREF
|
||||
|
||||
val future = ActorRegistry.remote.send[AnyRef](
|
||||
message, None, None, remoteAddress.get,
|
||||
timeout, isOneWay, actorRef,
|
||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
||||
ActorType.TypedActor)
|
||||
ActorType.TypedActor,
|
||||
None) //TODO: REVISIT: Use another classloader?
|
||||
|
||||
if (isOneWay) null // for void methods
|
||||
else if (TypedActor.returnsFuture_?(methodRtti)) future.get
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -0,0 +1,83 @@
|
|||
<?xml version='1.0' encoding='UTF-8'?>
|
||||
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>net.lag</groupId>
|
||||
<artifactId>configgy</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>2.0.2-nologgy</version>
|
||||
<name>Configgy</name>
|
||||
<description>Configgy logging removed</description>
|
||||
<url>http://github.com/derekjw/configgy</url>
|
||||
<licenses>
|
||||
<license>
|
||||
<name>Apache 2</name>
|
||||
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
|
||||
<distribution>repo</distribution>
|
||||
</license>
|
||||
</licenses>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>2.8.1</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>scalatoolsorg</id>
|
||||
<name>scala-tools.org</name>
|
||||
<url>http://scala-tools.org/repo-releases/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>atlassian</id>
|
||||
<name>atlassian</name>
|
||||
<url>https://m2proxy.atlassian.com/repository/public/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>lagnet</id>
|
||||
<name>lag.net</name>
|
||||
<url>http://www.lag.net/repo/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>testingscalatoolsorg</id>
|
||||
<name>testing.scala-tools.org</name>
|
||||
<url>http://scala-tools.org/repo-releases/testing/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>oauthnet</id>
|
||||
<name>oauth.net</name>
|
||||
<url>http://oauth.googlecode.com/svn/code/maven/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>downloadjavanet</id>
|
||||
<name>download.java.net</name>
|
||||
<url>http://download.java.net/maven/2/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>oldtwittercom</id>
|
||||
<name>old.twitter.com</name>
|
||||
<url>http://www.lag.net/nest/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>twittercom</id>
|
||||
<name>twitter.com</name>
|
||||
<url>http://maven.twttr.com/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>powermockapi</id>
|
||||
<name>powermock-api</name>
|
||||
<url>http://powermock.googlecode.com/svn/repo/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>ibiblio</id>
|
||||
<name>ibiblio</name>
|
||||
<url>http://mirrors.ibiblio.org/pub/mirrors/maven2/</url>
|
||||
</repository>
|
||||
<repository>
|
||||
<id>ScalaToolsMaven2Repository</id>
|
||||
<name>Scala-Tools Maven2 Repository</name>
|
||||
<url>http://scala-tools.org/repo-releases/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
</project>
|
||||
|
|
@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2
|
||||
|
||||
lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2
|
||||
lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2
|
||||
|
||||
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
|
||||
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
|
||||
|
|
@ -538,4 +538,4 @@ trait McPom { self: DefaultProject =>
|
|||
|
||||
rewrite(rule)(node.theSeq)(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue