diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d604f3f441..c3e94188b1 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -97,18 +97,6 @@ object Actor extends Logging { tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] subclassAudits.synchronized {subclassAudits.clear} - - // Clear and reset j.u.l.Level.known (due to Configgy) - log.slf4j.info("Removing Configgy-installed log levels") - import java.util.logging.Level - val lf = classOf[Level].getDeclaredField("known") - lf.setAccessible(true) - val known = lf.get(null).asInstanceOf[java.util.ArrayList[Level]] - known.synchronized { - known.clear - List(Level.OFF,Level.SEVERE,Level.WARNING,Level.INFO,Level.CONFIG, - Level.FINE,Level.FINER,Level.FINEST,Level.ALL) foreach known.add - } } } Runtime.getRuntime.addShutdownHook(new Thread(hook)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 918edfdd00..d1c55e88d2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1096,14 +1096,14 @@ private[akka] case class RemoteActorRef private[akka] ( start def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType) + ActorRegistry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType, loader) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType) + val future = ActorRegistry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index ff2fbcf3cb..ea816a7c51 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -238,7 +238,8 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => isOneWay: Boolean, actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): Option[CompletableFuture[T]] + actorType: ActorType, + loader: Option[ClassLoader]): Option[CompletableFuture[T]] //TODO: REVISIT: IMPLEMENT OR REMOVE //private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b68bb0bca6..aca0333582 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -11,17 +11,18 @@ import akka.util._ import com.google.protobuf.{Message, ByteString} object MessageSerializer extends Logging { - private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java - private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON - private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON - private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary - private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf + private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java + private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON + private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON + private def SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary + private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf def setClassLoader(cl: ClassLoader) = { - SERIALIZER_JAVA.classLoader = Some(cl) - SERIALIZER_JAVA_JSON.classLoader = Some(cl) - SERIALIZER_SCALA_JSON.classLoader = Some(cl) - SERIALIZER_SBINARY.classLoader = Some(cl) + val someCl = Some(cl) + SERIALIZER_JAVA.classLoader = someCl + SERIALIZER_JAVA_JSON.classLoader = someCl + SERIALIZER_SCALA_JSON.classLoader = someCl + SERIALIZER_SBINARY.classLoader = someCl } def deserialize(messageProtocol: MessageProtocol): Any = { diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index ab63d15b64..c5453a1830 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -63,6 +63,11 @@ case class RemoteClientShutdown( */ class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) +/** + * Returned when a remote exception cannot be instantiated or parsed + */ +case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage) + /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. * @@ -83,8 +88,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem isOneWay: Boolean, actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], - actorType: AkkaActorType): Option[CompletableFuture[T]] = - clientFor(remoteAddress, None).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) + actorType: AkkaActorType, + loader: Option[ClassLoader]): Option[CompletableFuture[T]] = + clientFor(remoteAddress, loader).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) private[akka] def clientFor( address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVIST: synchronized here seems bottlenecky @@ -376,11 +382,14 @@ class RemoteClientHandler( log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) log.slf4j.debug("Trying to map back to future: {}",replyUuid) val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + if (reply.hasMessage) { if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") val message = MessageSerializer.deserialize(reply.getMessage) future.completeWithResult(message) } else { + val exception = parseException(reply, client.loader) + if (reply.hasSupervisorUuid()) { val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( @@ -388,10 +397,10 @@ class RemoteClientHandler( val supervisedActor = supervisors.get(supervisorUuid) if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) } - future.completeWithException(parseException(reply, client.loader)) + future.completeWithException(exception) } case other => @@ -460,11 +469,18 @@ class RemoteClientHandler( private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { val exception = reply.getException val classname = exception.getClassname - val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) - else Class.forName(classname) - exceptionClass - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exception.getMessage).asInstanceOf[Throwable] + try { + val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) + else Class.forName(classname) + exceptionClass + .getConstructor(Array[Class[_]](classOf[String]): _*) + .newInstance(exception.getMessage).asInstanceOf[Throwable] + } catch { + case problem => + log.debug("Couldn't parse exception returned from RemoteServer",problem) + log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage) + UnparsableException(classname, exception.getMessage) + } } } @@ -1119,10 +1135,10 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId val sessionActorRefOrNull = findSessionActor(id, channel) - if (sessionActorRefOrNull ne null) + if (sessionActorRefOrNull ne null) { + log.debug("found session actor with id {} for channel {}",id, channel) sessionActorRefOrNull - else - { + } else { // we dont have it in the session either, see if we have a factory for it val actorFactoryOrNull = findActorFactory(id) if (actorFactoryOrNull ne null) { diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index e30e615322..20c91c8559 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -18,7 +18,7 @@ import sjson.json.{Serializer => SJSONSerializer} * @author Jonas Bonér */ @serializable trait Serializer { - var classLoader: Option[ClassLoader] = None + @volatile var classLoader: Option[ClassLoader] = None def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) def toBinary(obj: AnyRef): Array[Byte] diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala index 80ba9e0ef8..49c31b1ae7 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteSessionActorSpec.scala @@ -54,17 +54,16 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest { "A remote session Actor" should { "create a new session actor per connection" in { + remote.registerPerSession("untyped-session-actor-service", actorOf[RemoteStatefullSessionActorSpec]) val session1 = remote.actorFor("untyped-session-actor-service", 5000L, host, port) val default1 = session1 !! GetUser() - default1.as[String].get must equal ("anonymous") + default1.as[String] must equal (Some("anonymous")) session1 ! Login("session[1]") val result1 = session1 !! GetUser() - result1.as[String].get must equal ("session[1]") - - session1.stop + result1.as[String] must equal (Some("session[1]")) remote.shutdownClientModule @@ -72,9 +71,7 @@ class ServerInitiatedRemoteSessionActorSpec extends AkkaRemoteTest { // since this is a new session, the server should reset the state val default2 = session2 !! GetUser() - default2.as[String].get must equal ("anonymous") - - session2.stop() + default2.as[String] must equal (Some("anonymous")) } /*"stop the actor when the client disconnects" in { diff --git a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala index dfe39a6684..3c9aeb97f1 100644 --- a/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/ProtobufActorMessageSerializationSpec.scala @@ -41,9 +41,10 @@ class ProtobufActorMessageSerializationSpec extends AkkaRemoteTest { "A ProtobufMessage" should { "SendReplyAsync" in { + remote.register("RemoteActorSpecActorBidirectional",actorOf[RemoteActorSpecActorBidirectional]) val actor = remote.actorFor("RemoteActorSpecActorBidirectional", 5000L, host, port) val result = actor !! ProtobufPOJO.newBuilder.setId(11).setStatus(true).setName("Coltrane").build - result.as[Long].get must be (12) + result.as[Long] must equal (Some(12)) } } } diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index f9ae176721..e900724d9c 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -21,7 +21,7 @@ trait AkkaBaseProject extends BasicScalaProject { // is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. // for development version resolve to .ivy2/local - // val akkaModuleConfig = ModuleConfiguration("akka", AkkaRepo) + // val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo) val aspectwerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", AkkaRepo) val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo) diff --git a/akka-stm/src/main/scala/akka/transactor/Transactor.scala b/akka-stm/src/main/scala/akka/transactor/Transactor.scala index 72ba139d85..98871e42a4 100644 --- a/akka-stm/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-stm/src/main/scala/akka/transactor/Transactor.scala @@ -175,5 +175,8 @@ trait Transactor extends Actor { /** * Default catch-all for the different Receive methods. */ - def doNothing: Receive = { case _ => } + def doNothing: Receive = new Receive { + def apply(any: Any) = {} + def isDefinedAt(any: Any) = false + } } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index c329672486..f10a0e1d2c 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -59,8 +59,22 @@ object TransactorIncrement { } } +object SimpleTransactor { + case class Set(ref: Ref[Int], value: Int, latch: CountDownLatch) + + class Setter extends Transactor { + def atomically = { + case Set(ref, value, latch) => { + ref.set(value) + latch.countDown + } + } + } +} + class TransactorSpec extends WordSpec with MustMatchers { import TransactorIncrement._ + import SimpleTransactor._ val numCounters = 5 val timeout = 5 seconds @@ -97,4 +111,17 @@ class TransactorSpec extends WordSpec with MustMatchers { failer.stop } } + + "Transactor" should { + "be usable without overriding normally" in { + val transactor = Actor.actorOf(new Setter).start + val ref = Ref(0) + val latch = new CountDownLatch(1) + transactor ! Set(ref, 5, latch) + latch.await(timeout.length, timeout.unit) + val value = atomic { ref.get } + value must be === 5 + transactor.stop + } + } } diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 1672cdc3a8..5adabb3e54 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -841,12 +841,13 @@ private[akka] abstract class ActorAspect { val isOneWay = TypedActor.isOneWay(methodRtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues) - //TODO: REVISIT: MAKE REGISTRY COME FROM ACTORREF + val future = ActorRegistry.remote.send[AnyRef]( message, None, None, remoteAddress.get, timeout, isOneWay, actorRef, Some((interfaceClass.getName, methodRtti.getMethod.getName)), - ActorType.TypedActor) + ActorType.TypedActor, + None) //TODO: REVISIT: Use another classloader? if (isOneWay) null // for void methods else if (TypedActor.returnsFuture_?(methodRtti)) future.get diff --git a/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar new file mode 100644 index 0000000000..8a6a0a0e5b Binary files /dev/null and b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar differ diff --git a/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom new file mode 100644 index 0000000000..9238472de5 --- /dev/null +++ b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom @@ -0,0 +1,83 @@ + + + 4.0.0 + net.lag + configgy + jar + 2.0.2-nologgy + Configgy + Configgy logging removed + http://github.com/derekjw/configgy + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + org.scala-lang + scala-library + 2.8.1 + compile + + + + + scalatoolsorg + scala-tools.org + http://scala-tools.org/repo-releases/ + + + atlassian + atlassian + https://m2proxy.atlassian.com/repository/public/ + + + lagnet + lag.net + http://www.lag.net/repo/ + + + testingscalatoolsorg + testing.scala-tools.org + http://scala-tools.org/repo-releases/testing/ + + + oauthnet + oauth.net + http://oauth.googlecode.com/svn/code/maven/ + + + downloadjavanet + download.java.net + http://download.java.net/maven/2/ + + + oldtwittercom + old.twitter.com + http://www.lag.net/nest/ + + + twittercom + twitter.com + http://maven.twttr.com/ + + + powermockapi + powermock-api + http://powermock.googlecode.com/svn/repo/ + + + ibiblio + ibiblio + http://mirrors.ibiblio.org/pub/mirrors/maven2/ + + + ScalaToolsMaven2Repository + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases/ + + + \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 9356f5504e..886e9cd407 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2 - lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2 + lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2 lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 @@ -538,4 +538,4 @@ trait McPom { self: DefaultProject => rewrite(rule)(node.theSeq)(0) } -} \ No newline at end of file +}