diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 9b9c4bb206..0d94c7a7dc 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -76,7 +76,8 @@ object AMQP { def newConsumer(connection: ActorRef, consumerParameters: ConsumerParameters): ActorRef = { val consumer: ActorRef = actorOf(new ConsumerActor(consumerParameters)) - consumer.startLink(consumerParameters.deliveryHandler) + val handler = consumerParameters.deliveryHandler + if (handler.supervisor.isEmpty) consumer.startLink(handler) connection.startLink(consumer) consumer ! Start consumer diff --git a/akka-amqp/src/main/scala/ConsumerActor.scala b/akka-amqp/src/main/scala/ConsumerActor.scala index 4bb46ceeec..90c8d7deec 100644 --- a/akka-amqp/src/main/scala/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/ConsumerActor.scala @@ -35,9 +35,8 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) queueName match { case Some(name) => log.debug("Declaring new queue [%s] for %s", name, toString) - if (queuePassive) { - ch.queueDeclarePassive(name) - } else { + if (queuePassive) ch.queueDeclarePassive(name) + else { ch.queueDeclare( name, queueDurable, queueExclusive, queueAutoDelete, JavaConversions.asMap(configurationArguments)) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 47ff788f1c..955af0c762 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1264,8 +1264,8 @@ class LocalActorRef private[akka]( * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( -// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader]) + // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { extends ActorRef { _uuid = uuuid timeout = _timeout diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 724a71a642..e17c7f57ef 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -48,44 +48,48 @@ case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty v case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +class RemoteClientException private[akka](message: String) extends RuntimeException(message) + /** + * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. + * * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] - private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] + private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] // FIXME: simplify overloaded methods when we have Scala 2.8 - def actorFor(className: String, hostname: String, port: Int): ActorRef = - actorFor(className, className, 5000L, hostname, port, None) + def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None) - def actorFor(className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(className, className, 5000L, hostname, port, Some(loader)) + def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader)) - def actorFor(uuid: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(uuid, className, 5000L, hostname, port, None) + def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(serviceId, className, 5000L, hostname, port, None) - def actorFor(uuid: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(uuid, className, 5000L, hostname, port, Some(loader)) + def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(serviceId, className, 5000L, hostname, port, Some(loader)) - def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = - actorFor(className, className, timeout, hostname, port, None) + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None) - def actorFor(className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - actorFor(className, className, timeout, hostname, port, Some(loader)) + def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader)) - def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, None) + def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, None) - private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, Some(loader)) + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader)) - private[akka] def actorFor(uuid: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = - RemoteActorRef(uuid, className, hostname, port, timeout, loader) + private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = + RemoteActorRef(serviceId, className, hostname, port, timeout, loader) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port), None) @@ -158,9 +162,11 @@ object RemoteClient extends Logging { } /** + * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. + * * @author Jonas Bonér */ -class RemoteClient private[akka] (val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging { +class RemoteClient private[akka] (val hostname: String, val port: Int, val loader: Option[ClassLoader] = None) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false @@ -226,7 +232,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O } } } else { - val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") + val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port)) throw exception } @@ -245,13 +251,14 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O /** * @author Jonas Bonér */ -class RemoteClientPipelineFactory(name: String, - futures: ConcurrentMap[Long, CompletableFuture[_]], - supervisors: ConcurrentMap[String, ActorRef], - bootstrap: ClientBootstrap, - remoteAddress: SocketAddress, - timer: HashedWheelTimer, - client: RemoteClient) extends ChannelPipelineFactory { +class RemoteClientPipelineFactory( + name: String, + futures: ConcurrentMap[Long, CompletableFuture[_]], + supervisors: ConcurrentMap[String, ActorRef], + bootstrap: ClientBootstrap, + remoteAddress: SocketAddress, + timer: HashedWheelTimer, + client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) @@ -272,9 +279,7 @@ class RemoteClientPipelineFactory(name: String, } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) - new StaticChannelPipeline(stages: _*) } } @@ -283,13 +288,14 @@ class RemoteClientPipelineFactory(name: String, * @author Jonas Bonér */ @ChannelHandler.Sharable -class RemoteClientHandler(val name: String, - val futures: ConcurrentMap[Long, CompletableFuture[_]], - val supervisors: ConcurrentMap[String, ActorRef], - val bootstrap: ClientBootstrap, - val remoteAddress: SocketAddress, - val timer: HashedWheelTimer, - val client: RemoteClient) +class RemoteClientHandler( + val name: String, + val futures: ConcurrentMap[Long, CompletableFuture[_]], + val supervisors: ConcurrentMap[String, ActorRef], + val bootstrap: ClientBootstrap, + val remoteAddress: SocketAddress, + val timer: HashedWheelTimer, + val client: RemoteClient) extends SimpleChannelUpstreamHandler with Logging { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { @@ -318,13 +324,13 @@ class RemoteClientHandler(val name: String, val supervisedActor = supervisors.get(supervisorUuid) if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed") - else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply)) + else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader)) } - future.completeWithException(null, parseException(reply)) + future.completeWithException(null, parseException(reply, client.loader)) } futures.remove(reply.getId) } else { - val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) + val exception = new RemoteClientException("Unknown message received in remote client handler: " + result) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port)) throw exception } @@ -358,24 +364,20 @@ class RemoteClientHandler(val name: String, log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) } - if(RemoteServer.SECURE){ - val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - sslHandler.handshake().addListener( new ChannelFutureListener { - def operationComplete(future : ChannelFuture) : Unit = { - if(future.isSuccess) - connect - //else - //FIXME: What is the correct action here? - } - }) - } else { - connect - } + if (RemoteServer.SECURE) { + val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + sslHandler.handshake.addListener(new ChannelFutureListener { + def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) connect + else throw new RemoteClientException("Could not establish SSL handshake") + } + }) + } else connect } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) + client.listeners.toArray.foreach(listener => + listener.asInstanceOf[ActorRef] ! RemoteClientDisconnected(client.hostname, client.port)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } @@ -385,9 +387,11 @@ class RemoteClientHandler(val name: String, event.getChannel.close } - private def parseException(reply: RemoteReplyProtocol): Throwable = { + private def parseException(reply: RemoteReplyProtocol, loader: Option[ClassLoader]): Throwable = { val exception = reply.getException - val exceptionClass = Class.forName(exception.getClassname) + val classname = exception.getClassname + val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname) + else Class.forName(classname) exceptionClass .getConstructor(Array[Class[_]](classOf[String]): _*) .newInstance(exception.getMessage).asInstanceOf[Throwable] diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 3dd9a204ea..89c0a6e437 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -76,34 +76,31 @@ object RemoteServer { } val SECURE = { - if(config.getBool("akka.remote.ssl.service",false)){ - + if (config.getBool("akka.remote.ssl.service",false)) { val properties = List( - ("key-store-type" ,"keyStoreType"), - ("key-store" ,"keyStore"), - ("key-store-pass" ,"keyStorePassword"), - ("trust-store-type","trustStoreType"), - ("trust-store" ,"trustStore"), - ("trust-store-pass","trustStorePassword") - ).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2)) - - //If property is not set, and we have a value from our akka.conf, use that value - for{ p <- properties if System.getProperty(p._2) eq null - c <- config.getString(p._1) - } System.setProperty(p._2,c) - - if(config.getBool("akka.remote.ssl.debug",false)) - System.setProperty("javax.net.debug","ssl") + ("key-store-type" , "keyStoreType"), + ("key-store" , "keyStore"), + ("key-store-pass" , "keyStorePassword"), + ("trust-store-type", "trustStoreType"), + ("trust-store" , "trustStore"), + ("trust-store-pass", "trustStorePassword") + ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) + // If property is not set, and we have a value from our akka.conf, use that value + for { + p <- properties if System.getProperty(p._2) eq null + c <- config.getString(p._1) + } System.setProperty(p._2, c) + + if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") true - } - else - false + } else false } object Address { def apply(hostname: String, port: Int) = new Address(hostname, port) } + class Address(val hostname: String, val port: Int) { override def hashCode: Int = { var result = HashCode.SEED @@ -120,7 +117,7 @@ object RemoteServer { } private class RemoteActorSet { - private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] } @@ -307,17 +304,14 @@ class RemoteServer extends Logging { object RemoteServerSslContext { import javax.net.ssl.SSLContext - val (client,server) = { + val (client, server) = { val protocol = "TLS" //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") //val store = KeyStore.getInstance("JKS") - val s = SSLContext.getInstance(protocol) s.init(null,null,null) - val c = SSLContext.getInstance(protocol) c.init(null,null,null) - (c,s) } } @@ -340,20 +334,18 @@ class RemoteServerPipelineFactory( engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(false) - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) - case _ => (join(),join()) + val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) } val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors) - val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) - new StaticChannelPipeline(stages: _*) } } @@ -380,23 +372,20 @@ class RemoteServerHandler( openChannels.add(ctx.getChannel) } - override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) { - if(RemoteServer.SECURE) { + override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) { + if (RemoteServer.SECURE) { val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) // Begin handshake. sslHandler.handshake().addListener( new ChannelFutureListener { - def operationComplete(future : ChannelFuture) : Unit = { - if(future.isSuccess) - openChannels.add(future.getChannel) - else - future.getChannel.close + def operationComplete(future: ChannelFuture): Unit = { + if (future.isSuccess) openChannels.add(future.getChannel) + else future.getChannel.close } }) } } - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -499,8 +488,8 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { - val name = actorInfo.getTarget val uuid = actorInfo.getUuid + val name = actorInfo.getTarget val timeout = actorInfo.getTimeout val actorRefOrNull = actors get uuid diff --git a/config/akka.conf b/config/akka.conf index 71670837d3..84b9bfbbcf 100644 --- a/config/akka.conf +++ b/config/akka.conf @@ -3,10 +3,3 @@ include "akka-reference.conf" # In this file you can override any option defined in the 'akka-reference.conf' file. # Copy in all or parts of the 'akka-reference.conf' file and modify as you please. -# -# -# -# cluster = ["localhost:6379", "localhost:6380", "localhost:6381"] -# -# -#