rework use of ClassLoaders, see #1736
This commit is contained in:
parent
c5fc153a10
commit
ac1ee9ae91
13 changed files with 54 additions and 67 deletions
|
|
@ -332,7 +332,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
import ActorSystem._
|
||||
|
||||
final val settings: Settings = new Settings(applicationConfig, name)
|
||||
final val threadFactory: MonitorableThreadFactory = new MonitorableThreadFactory(name, settings.Daemonicity)
|
||||
|
||||
final val threadFactory: MonitorableThreadFactory =
|
||||
MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader))
|
||||
|
||||
def logConfiguration(): Unit = log.info(settings.toString)
|
||||
|
||||
|
|
@ -391,10 +393,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
classOf[EventStream] -> eventStream,
|
||||
classOf[Scheduler] -> scheduler)
|
||||
|
||||
val loader = Thread.currentThread.getContextClassLoader match {
|
||||
case null ⇒ getClass.getClassLoader
|
||||
case l ⇒ l
|
||||
}
|
||||
val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader
|
||||
|
||||
ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, loader) match {
|
||||
case Left(e) ⇒ throw e
|
||||
|
|
@ -534,9 +533,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
|
|||
|
||||
private def loadExtensions() {
|
||||
import scala.collection.JavaConversions._
|
||||
val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse this.getClass.getClassLoader
|
||||
settings.config.getStringList("akka.extensions") foreach { fqcn ⇒
|
||||
import ReflectiveAccess._
|
||||
getObjectFor[AnyRef](fqcn).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
|
||||
import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs }
|
||||
getObjectFor[AnyRef](fqcn, loader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
|
||||
case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup());
|
||||
case Right(p: ExtensionId[_]) ⇒ registerExtension(p);
|
||||
case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
|
||||
|
|
|
|||
|
|
@ -321,8 +321,9 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
BoundedMailbox(capacity, duration)
|
||||
}
|
||||
case fqcn ⇒
|
||||
val constructorSignature = Array[Class[_]](classOf[Config])
|
||||
ReflectiveAccess.createInstance[MailboxType](fqcn, constructorSignature, Array[AnyRef](config)) match {
|
||||
val args = Seq(classOf[Config] -> config)
|
||||
val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader
|
||||
ReflectiveAccess.createInstance[MailboxType](fqcn, args, loader) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
|
|
|
|||
|
|
@ -159,6 +159,7 @@ object MonitorableThreadFactory {
|
|||
|
||||
case class MonitorableThreadFactory(name: String,
|
||||
daemonic: Boolean,
|
||||
contextClassLoader: Option[ClassLoader],
|
||||
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
|
||||
extends ThreadFactory {
|
||||
protected val counter = new AtomicLong
|
||||
|
|
@ -167,6 +168,7 @@ case class MonitorableThreadFactory(name: String,
|
|||
val t = new Thread(runnable, name + counter.incrementAndGet())
|
||||
t.setUncaughtExceptionHandler(exceptionHandler)
|
||||
t.setDaemon(daemonic)
|
||||
contextClassLoader foreach (t.setContextClassLoader(_))
|
||||
t
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -81,10 +81,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
def deserialize(bytes: Array[Byte],
|
||||
serializerId: Int,
|
||||
clazz: Option[Class[_]],
|
||||
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
|
||||
classLoader: ClassLoader): Either[Exception, AnyRef] =
|
||||
try {
|
||||
currentSystem.withValue(system) {
|
||||
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader))
|
||||
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader)))
|
||||
}
|
||||
} catch { case e: Exception ⇒ Left(e) }
|
||||
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ trait DurableMessageSerialization {
|
|||
def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath)
|
||||
|
||||
val durableMessage = RemoteMessageProtocol.parseFrom(bytes)
|
||||
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage)
|
||||
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage, getClass.getClassLoader)
|
||||
val sender = deserializeActorRef(durableMessage.getSender)
|
||||
|
||||
new Envelope(message, sender)(owner.system)
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObjec
|
|||
val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument]
|
||||
system.log.debug("Deserializing a durable message from MongoDB: {}", doc)
|
||||
val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData)
|
||||
val msg = MessageSerializer.deserialize(system, msgData)
|
||||
val msg = MessageSerializer.deserialize(system, msgData, getClass.getClassLoader)
|
||||
val ownerPath = doc.as[String]("ownerPath")
|
||||
val senderPath = doc.as[String]("senderPath")
|
||||
val sender = systemImpl.actorFor(senderPath)
|
||||
|
|
|
|||
|
|
@ -12,11 +12,11 @@ import akka.util.ReflectiveAccess
|
|||
|
||||
object MessageSerializer {
|
||||
|
||||
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: ClassLoader): AnyRef = {
|
||||
val clazz = if (messageProtocol.hasMessageManifest) {
|
||||
Option(ReflectiveAccess.getClassFor[AnyRef](
|
||||
messageProtocol.getMessageManifest.toStringUtf8,
|
||||
classLoader.getOrElse(ReflectiveAccess.loader)) match {
|
||||
classLoader) match {
|
||||
case Left(e) ⇒ throw e
|
||||
case Right(r) ⇒ r
|
||||
})
|
||||
|
|
|
|||
|
|
@ -82,13 +82,12 @@ class RemoteActorRefProvider(
|
|||
|
||||
_transport = {
|
||||
val fqn = remoteSettings.RemoteTransport
|
||||
// TODO check if this classloader is the right one; hint: this class was loaded by contextClassLoader if that was not null
|
||||
ReflectiveAccess.createInstance[RemoteTransport](
|
||||
fqn,
|
||||
Seq(classOf[RemoteSettings] -> remoteSettings,
|
||||
val args = Seq(
|
||||
classOf[RemoteSettings] -> remoteSettings,
|
||||
classOf[ActorSystemImpl] -> system,
|
||||
classOf[RemoteActorRefProvider] -> this),
|
||||
getClass.getClassLoader) match {
|
||||
classOf[RemoteActorRefProvider] -> this)
|
||||
|
||||
ReflectiveAccess.createInstance[RemoteTransport](fqn, args, getClass.getClassLoader) match {
|
||||
case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem)
|
||||
case Right(remote) ⇒ remote
|
||||
}
|
||||
|
|
@ -164,7 +163,7 @@ class RemoteActorRefProvider(
|
|||
else {
|
||||
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
|
||||
useActorOnNode(rpath, props.creator, supervisor)
|
||||
new RemoteActorRef(this, transport, rpath, supervisor, None)
|
||||
new RemoteActorRef(this, transport, rpath, supervisor)
|
||||
}
|
||||
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment)
|
||||
|
|
@ -174,12 +173,12 @@ class RemoteActorRefProvider(
|
|||
|
||||
def actorFor(path: ActorPath): InternalActorRef =
|
||||
if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements)
|
||||
else new RemoteActorRef(this, transport, path, Nobody, None)
|
||||
else new RemoteActorRef(this, transport, path, Nobody)
|
||||
|
||||
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
||||
case ActorPathExtractor(address, elems) ⇒
|
||||
if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems)
|
||||
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, None)
|
||||
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody)
|
||||
case _ ⇒ local.actorFor(ref, path)
|
||||
}
|
||||
|
||||
|
|
@ -208,8 +207,7 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
val provider: RemoteActorRefProvider,
|
||||
remote: RemoteTransport,
|
||||
val path: ActorPath,
|
||||
val getParent: InternalActorRef,
|
||||
loader: Option[ClassLoader])
|
||||
val getParent: InternalActorRef)
|
||||
extends InternalActorRef with RemoteRef {
|
||||
|
||||
def getChild(name: Iterator[String]): InternalActorRef = {
|
||||
|
|
@ -217,7 +215,7 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
s.headOption match {
|
||||
case None ⇒ this
|
||||
case Some("..") ⇒ getParent getChild name
|
||||
case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody, loader)
|
||||
case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -226,9 +224,9 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
|
||||
def isTerminated: Boolean = !running
|
||||
|
||||
def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this, loader)
|
||||
def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this)
|
||||
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
|
||||
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this)
|
||||
|
||||
def suspend(): Unit = sendSystemMessage(Suspend())
|
||||
|
||||
|
|
|
|||
|
|
@ -146,5 +146,5 @@ class RemoteConnectionManager(
|
|||
}
|
||||
|
||||
private[remote] def newConnection(remoteAddress: Address, actorPath: ActorPath) =
|
||||
new RemoteActorRef(remote, remote.transport, actorPath, Nobody, None)
|
||||
new RemoteActorRef(remote, remote.transport, actorPath, Nobody)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -217,8 +217,7 @@ abstract class RemoteTransport {
|
|||
|
||||
protected[akka] def send(message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
recipient: RemoteActorRef,
|
||||
loader: Option[ClassLoader]): Unit
|
||||
recipient: RemoteActorRef): Unit
|
||||
|
||||
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
|
||||
system.eventStream.publish(message)
|
||||
|
|
@ -228,7 +227,7 @@ abstract class RemoteTransport {
|
|||
override def toString = address.toString
|
||||
}
|
||||
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader]) {
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) {
|
||||
|
||||
def originalReceiver = input.getRecipient.getPath
|
||||
|
||||
|
|
@ -238,7 +237,7 @@ class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, class
|
|||
|
||||
lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver)
|
||||
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader)
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, getClass.getClassLoader)
|
||||
|
||||
override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender
|
||||
}
|
||||
|
|
|
|||
|
|
@ -100,8 +100,7 @@ abstract class RemoteClient private[akka] (
|
|||
class ActiveRemoteClient private[akka] (
|
||||
netty: NettyRemoteTransport,
|
||||
remoteAddress: Address,
|
||||
localAddress: Address,
|
||||
val loader: Option[ClassLoader] = None)
|
||||
localAddress: Address)
|
||||
extends RemoteClient(netty, remoteAddress) {
|
||||
|
||||
import netty.settings
|
||||
|
|
@ -253,7 +252,7 @@ class ActiveRemoteClientHandler(
|
|||
}
|
||||
|
||||
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
|
||||
client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system, client.loader))
|
||||
client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system))
|
||||
|
||||
case other ⇒
|
||||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress)
|
||||
|
|
|
|||
|
|
@ -4,31 +4,24 @@
|
|||
|
||||
package akka.remote.netty
|
||||
|
||||
import java.net.{ UnknownHostException, InetAddress }
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture }
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.channel.{ ChannelHandlerContext, ChannelFutureListener, ChannelFuture, Channel }
|
||||
import org.jboss.netty.channel.{ ChannelHandlerContext, Channel }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
|
||||
import org.jboss.netty.util.HashedWheelTimer
|
||||
import akka.actor.{ ActorSystemImpl, ActorRef, simpleName }
|
||||
|
||||
import akka.actor.{ Address, ActorSystemImpl, ActorRef }
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
|
||||
import akka.remote.{ RemoteTransport, RemoteMarshallingOps, RemoteClientWriteFailed, RemoteClientException, RemoteClientError, RemoteActorRef }
|
||||
import akka.util.Switch
|
||||
import akka.AkkaException
|
||||
import com.typesafe.config.Config
|
||||
import akka.remote.RemoteSettings
|
||||
import akka.actor.Address
|
||||
import java.net.InetSocketAddress
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.remote.{ RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef }
|
||||
|
||||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
|
|
@ -38,7 +31,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic)
|
||||
val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic, Some(getClass.getClassLoader))
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory)
|
||||
|
||||
val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
|
|
@ -58,7 +51,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
|
||||
override protected def useUntrustedMode = remoteSettings.UntrustedMode
|
||||
|
||||
val server = try new NettyRemoteServer(this, Some(getClass.getClassLoader)) catch {
|
||||
val server = try new NettyRemoteServer(this) catch {
|
||||
case ex ⇒ shutdown(); throw ex
|
||||
}
|
||||
|
||||
|
|
@ -94,8 +87,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
protected[akka] def send(
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
recipient: RemoteActorRef,
|
||||
loader: Option[ClassLoader]): Unit = {
|
||||
recipient: RemoteActorRef): Unit = {
|
||||
|
||||
val recipientAddress = recipient.path.address
|
||||
|
||||
|
|
@ -112,7 +104,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
//Recheck for addition, race between upgrades
|
||||
case Some(client) ⇒ client //If already populated by other writer
|
||||
case None ⇒ //Populate map
|
||||
val client = new ActiveRemoteClient(this, recipientAddress, address, loader)
|
||||
val client = new ActiveRemoteClient(this, recipientAddress, address)
|
||||
client.connect()
|
||||
remoteClients += recipientAddress -> client
|
||||
client
|
||||
|
|
|
|||
|
|
@ -21,9 +21,7 @@ import java.net.InetAddress
|
|||
import akka.actor.ActorSystemImpl
|
||||
import org.jboss.netty.channel.ChannelLocal
|
||||
|
||||
class NettyRemoteServer(
|
||||
val netty: NettyRemoteTransport,
|
||||
val loader: Option[ClassLoader]) {
|
||||
class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
||||
|
||||
import netty.settings
|
||||
|
||||
|
|
@ -40,7 +38,7 @@ class NettyRemoteServer(
|
|||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, loader, netty)
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("backlog", settings.Backlog)
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
|
|
@ -79,7 +77,6 @@ class NettyRemoteServer(
|
|||
class RemoteServerPipelineFactory(
|
||||
val openChannels: ChannelGroup,
|
||||
val executionHandler: ExecutionHandler,
|
||||
val loader: Option[ClassLoader],
|
||||
val netty: NettyRemoteTransport) extends ChannelPipelineFactory {
|
||||
|
||||
import netty.settings
|
||||
|
|
@ -91,7 +88,7 @@ class RemoteServerPipelineFactory(
|
|||
val messageEnc = new RemoteMessageEncoder(netty)
|
||||
|
||||
val authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil
|
||||
val remoteServer = new RemoteServerHandler(openChannels, loader, netty)
|
||||
val remoteServer = new RemoteServerHandler(openChannels, netty)
|
||||
val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
|
|
@ -131,7 +128,6 @@ object ChannelLocalSystem extends ChannelLocal[ActorSystemImpl] {
|
|||
@ChannelHandler.Sharable
|
||||
class RemoteServerHandler(
|
||||
val openChannels: ChannelGroup,
|
||||
val applicationLoader: Option[ClassLoader],
|
||||
val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
import netty.settings
|
||||
|
|
@ -164,7 +160,7 @@ class RemoteServerHandler(
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
|
||||
event.getMessage match {
|
||||
case remote: AkkaRemoteProtocol if remote.hasMessage ⇒
|
||||
netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system, applicationLoader))
|
||||
netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system))
|
||||
|
||||
case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒
|
||||
val instruction = remote.getInstruction
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue