diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index c87d1460f9..0d6b4fee77 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -109,18 +109,6 @@ object Actor extends Logging { tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] subclassAudits.synchronized {subclassAudits.clear} - - // Clear and reset j.u.l.Level.known (due to Configgy) - log.slf4j.info("Removing Configgy-installed log levels") - import java.util.logging.Level - val lf = classOf[Level].getDeclaredField("known") - lf.setAccessible(true) - val known = lf.get(null).asInstanceOf[java.util.ArrayList[Level]] - known.synchronized { - known.clear - List(Level.OFF,Level.SEVERE,Level.WARNING,Level.INFO,Level.CONFIG, - Level.FINE,Level.FINER,Level.FINEST,Level.ALL) foreach known.add - } } } Runtime.getRuntime.addShutdownHook(new Thread(hook)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a0663b3244..4fc1df6f03 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1238,7 +1238,7 @@ private[akka] case class RemoteActorRef private[akka] ( def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = RemoteClientModule.send[Any]( - message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType) + message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType, loader) def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -1246,7 +1246,7 @@ private[akka] case class RemoteActorRef private[akka] ( senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { val future = RemoteClientModule.send[T]( - message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType) + message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType, loader) if (future.isDefined) future.get else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index fa179b078a..01ae02ef39 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -64,9 +64,19 @@ object Config extends Logging { "\n\tdue to: " + e.toString) } Configgy.config + } else if (getClass.getClassLoader.getResource(confName) ne null) { + try { + Configgy.configureFromResource(confName, getClass.getClassLoader) + log.slf4j.info("Config [{}] loaded from the application classpath.",confName) + } catch { + case e: ParseException => throw new ConfigurationException( + "Can't load '" + confName + "' config file from application classpath," + + "\n\tdue to: " + e.toString) + } + Configgy.config } else if (HOME.isDefined) { try { - val configFile = HOME.getOrElse(throwNoAkkaHomeException) + "/config/" + confName + val configFile = HOME.get + "/config/" + confName Configgy.configure(configFile) log.slf4j.info( "AKKA_HOME is defined as [{}], config loaded from [{}].", @@ -79,16 +89,6 @@ object Config extends Logging { "\n\tdue to: " + e.toString) } Configgy.config - } else if (getClass.getClassLoader.getResource(confName) ne null) { - try { - Configgy.configureFromResource(confName, getClass.getClassLoader) - log.slf4j.info("Config [{}] loaded from the application classpath.",confName) - } catch { - case e: ParseException => throw new ConfigurationException( - "Can't load '" + confName + "' config file from application classpath," + - "\n\tdue to: " + e.toString) - } - Configgy.config } else { log.slf4j.warn( "\nCan't load '" + confName + "'." + diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 94661b9f1e..a22aa918cd 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -92,9 +92,10 @@ object ReflectiveAccess extends Logging { isOneWay: Boolean, actorRef: ActorRef, typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): Option[CompletableFuture[T]] = { + actorType: ActorType, + loader: Option[ClassLoader] = None): Option[CompletableFuture[T]] = { ensureEnabled - clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T]( + clientFor(remoteAddress.getHostName, remoteAddress.getPort, loader).send[T]( message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType) } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b68bb0bca6..aca0333582 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -11,17 +11,18 @@ import akka.util._ import com.google.protobuf.{Message, ByteString} object MessageSerializer extends Logging { - private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java - private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON - private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON - private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary - private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf + private def SERIALIZER_JAVA: Serializer.Java = Serializer.Java + private def SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON + private def SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON + private def SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary + private def SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf def setClassLoader(cl: ClassLoader) = { - SERIALIZER_JAVA.classLoader = Some(cl) - SERIALIZER_JAVA_JSON.classLoader = Some(cl) - SERIALIZER_SCALA_JSON.classLoader = Some(cl) - SERIALIZER_SBINARY.classLoader = Some(cl) + val someCl = Some(cl) + SERIALIZER_JAVA.classLoader = someCl + SERIALIZER_JAVA_JSON.classLoader = someCl + SERIALIZER_SCALA_JSON.classLoader = someCl + SERIALIZER_SBINARY.classLoader = someCl } def deserialize(messageProtocol: MessageProtocol): Any = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala index 2b137d8788..45500b561f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteClient.scala @@ -56,6 +56,8 @@ case class RemoteClientShutdown( */ class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message) +case class UnparsableException private[akka] (originalClassName: String, originalMessage: String) extends AkkaException(originalMessage) + /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. * @@ -414,32 +416,33 @@ class RemoteClientHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { - val result = event.getMessage - if (result.isInstanceOf[RemoteMessageProtocol]) { - val reply = result.asInstanceOf[RemoteMessageProtocol] - val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow) - log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) - val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]] - if (reply.hasMessage) { - if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") - val message = MessageSerializer.deserialize(reply.getMessage) - future.completeWithResult(message) - } else { - if (reply.hasSupervisorUuid()) { - val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) - if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( - "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") - val supervisedActor = supervisors.get(supervisorUuid) - if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( - "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) + event.getMessage match { + case reply: RemoteMessageProtocol => + val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) + log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply) + val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + + if (reply.hasMessage) { + if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") + val message = MessageSerializer.deserialize(reply.getMessage) + future.completeWithResult(message) + } else { + val exception = parseException(reply, client.loader) + if (reply.hasSupervisorUuid()) { + val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow) + if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException( + "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found") + val supervisedActor = supervisors.get(supervisorUuid) + if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( + "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") + else supervisedActor.supervisor.get ! Exit(supervisedActor, exception) + } + + future.completeWithException(exception) } - val exception = parseException(reply, client.loader) - future.completeWithException(exception) - } - futures remove replyUuid - } else { - val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) + + case message => + val exception = new RemoteClientException("Unknown message received in remote client handler: " + message, client) client.notifyListeners(RemoteClientError(exception, client)) throw exception } @@ -506,10 +509,17 @@ class RemoteClientHandler( private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { val exception = reply.getException val classname = exception.getClassname - val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) - else Class.forName(classname) - exceptionClass - .getConstructor(Array[Class[_]](classOf[String]): _*) - .newInstance(exception.getMessage).asInstanceOf[Throwable] + try { + val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) + else Class.forName(classname) + exceptionClass + .getConstructor(Array[Class[_]](classOf[String]): _*) + .newInstance(exception.getMessage).asInstanceOf[Throwable] + } catch { + case problem => + log.debug("Couldn't parse exception returned from RemoteServer",problem) + log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage) + UnparsableException(classname, exception.getMessage) + } } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index b93b472f51..d64e0552ab 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -295,7 +295,10 @@ object RemoteActorSerialization { } val actorInfo = actorInfoBuilder.build val messageBuilder = RemoteMessageProtocol.newBuilder - .setUuid(uuidProtocol) + .setUuid({ + val messageUuid = newUuid + UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build + }) .setActorInfo(actorInfo) .setOneWay(isOneWay) diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index e30e615322..20c91c8559 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -18,7 +18,7 @@ import sjson.json.{Serializer => SJSONSerializer} * @author Jonas Bonér */ @serializable trait Serializer { - var classLoader: Option[ClassLoader] = None + @volatile var classLoader: Option[ClassLoader] = None def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) def toBinary(obj: AnyRef): Array[Byte] diff --git a/akka-stm/src/main/scala/akka/transactor/Transactor.scala b/akka-stm/src/main/scala/akka/transactor/Transactor.scala index 72ba139d85..98871e42a4 100644 --- a/akka-stm/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-stm/src/main/scala/akka/transactor/Transactor.scala @@ -175,5 +175,8 @@ trait Transactor extends Actor { /** * Default catch-all for the different Receive methods. */ - def doNothing: Receive = { case _ => } + def doNothing: Receive = new Receive { + def apply(any: Any) = {} + def isDefinedAt(any: Any) = false + } } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index c329672486..f10a0e1d2c 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -59,8 +59,22 @@ object TransactorIncrement { } } +object SimpleTransactor { + case class Set(ref: Ref[Int], value: Int, latch: CountDownLatch) + + class Setter extends Transactor { + def atomically = { + case Set(ref, value, latch) => { + ref.set(value) + latch.countDown + } + } + } +} + class TransactorSpec extends WordSpec with MustMatchers { import TransactorIncrement._ + import SimpleTransactor._ val numCounters = 5 val timeout = 5 seconds @@ -97,4 +111,17 @@ class TransactorSpec extends WordSpec with MustMatchers { failer.stop } } + + "Transactor" should { + "be usable without overriding normally" in { + val transactor = Actor.actorOf(new Setter).start + val ref = Ref(0) + val latch = new CountDownLatch(1) + transactor ! Set(ref, 5, latch) + latch.await(timeout.length, timeout.unit) + val value = atomic { ref.get } + value must be === 5 + transactor.stop + } + } } diff --git a/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar new file mode 100644 index 0000000000..8a6a0a0e5b Binary files /dev/null and b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.jar differ diff --git a/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom new file mode 100644 index 0000000000..9238472de5 --- /dev/null +++ b/embedded-repo/net/lag/configgy/2.0.2-nologgy/configgy-2.0.2-nologgy.pom @@ -0,0 +1,83 @@ + + + 4.0.0 + net.lag + configgy + jar + 2.0.2-nologgy + Configgy + Configgy logging removed + http://github.com/derekjw/configgy + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + org.scala-lang + scala-library + 2.8.1 + compile + + + + + scalatoolsorg + scala-tools.org + http://scala-tools.org/repo-releases/ + + + atlassian + atlassian + https://m2proxy.atlassian.com/repository/public/ + + + lagnet + lag.net + http://www.lag.net/repo/ + + + testingscalatoolsorg + testing.scala-tools.org + http://scala-tools.org/repo-releases/testing/ + + + oauthnet + oauth.net + http://oauth.googlecode.com/svn/code/maven/ + + + downloadjavanet + download.java.net + http://download.java.net/maven/2/ + + + oldtwittercom + old.twitter.com + http://www.lag.net/nest/ + + + twittercom + twitter.com + http://maven.twttr.com/ + + + powermockapi + powermock-api + http://powermock.googlecode.com/svn/repo/ + + + ibiblio + ibiblio + http://mirrors.ibiblio.org/pub/mirrors/maven2/ + + + ScalaToolsMaven2Repository + Scala-Tools Maven2 Repository + http://scala-tools.org/repo-releases/ + + + \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 9356f5504e..886e9cd407 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -139,7 +139,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2 - lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2 + lazy val configgy = "net.lag" % "configgy" % "2.0.2-nologgy" % "compile" //ApacheV2 lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 @@ -538,4 +538,4 @@ trait McPom { self: DefaultProject => rewrite(rule)(node.theSeq)(0) } -} \ No newline at end of file +}