Fixed issue in AMQP by not supervising a consumer handler that is already supervised
This commit is contained in:
parent
689f910f86
commit
093589d132
6 changed files with 96 additions and 110 deletions
|
|
@ -76,34 +76,31 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
val SECURE = {
|
||||
if(config.getBool("akka.remote.ssl.service",false)){
|
||||
|
||||
if (config.getBool("akka.remote.ssl.service",false)) {
|
||||
val properties = List(
|
||||
("key-store-type" ,"keyStoreType"),
|
||||
("key-store" ,"keyStore"),
|
||||
("key-store-pass" ,"keyStorePassword"),
|
||||
("trust-store-type","trustStoreType"),
|
||||
("trust-store" ,"trustStore"),
|
||||
("trust-store-pass","trustStorePassword")
|
||||
).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2))
|
||||
|
||||
//If property is not set, and we have a value from our akka.conf, use that value
|
||||
for{ p <- properties if System.getProperty(p._2) eq null
|
||||
c <- config.getString(p._1)
|
||||
} System.setProperty(p._2,c)
|
||||
|
||||
if(config.getBool("akka.remote.ssl.debug",false))
|
||||
System.setProperty("javax.net.debug","ssl")
|
||||
("key-store-type" , "keyStoreType"),
|
||||
("key-store" , "keyStore"),
|
||||
("key-store-pass" , "keyStorePassword"),
|
||||
("trust-store-type", "trustStoreType"),
|
||||
("trust-store" , "trustStore"),
|
||||
("trust-store-pass", "trustStorePassword")
|
||||
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
|
||||
|
||||
// If property is not set, and we have a value from our akka.conf, use that value
|
||||
for {
|
||||
p <- properties if System.getProperty(p._2) eq null
|
||||
c <- config.getString(p._1)
|
||||
} System.setProperty(p._2, c)
|
||||
|
||||
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
|
||||
true
|
||||
}
|
||||
else
|
||||
false
|
||||
} else false
|
||||
}
|
||||
|
||||
object Address {
|
||||
def apply(hostname: String, port: Int) = new Address(hostname, port)
|
||||
}
|
||||
|
||||
class Address(val hostname: String, val port: Int) {
|
||||
override def hashCode: Int = {
|
||||
var result = HashCode.SEED
|
||||
|
|
@ -120,7 +117,7 @@ object RemoteServer {
|
|||
}
|
||||
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
|
|
@ -307,17 +304,14 @@ class RemoteServer extends Logging {
|
|||
object RemoteServerSslContext {
|
||||
import javax.net.ssl.SSLContext
|
||||
|
||||
val (client,server) = {
|
||||
val (client, server) = {
|
||||
val protocol = "TLS"
|
||||
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
|
||||
//val store = KeyStore.getInstance("JKS")
|
||||
|
||||
val s = SSLContext.getInstance(protocol)
|
||||
s.init(null,null,null)
|
||||
|
||||
val c = SSLContext.getInstance(protocol)
|
||||
c.init(null,null,null)
|
||||
|
||||
(c,s)
|
||||
}
|
||||
}
|
||||
|
|
@ -340,20 +334,18 @@ class RemoteServerPipelineFactory(
|
|||
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
|
||||
engine.setUseClientMode(false)
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder))
|
||||
case _ => (join(),join())
|
||||
val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
case _ => (join(), join())
|
||||
}
|
||||
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors)
|
||||
|
||||
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
||||
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -380,23 +372,20 @@ class RemoteServerHandler(
|
|||
openChannels.add(ctx.getChannel)
|
||||
}
|
||||
|
||||
override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
|
||||
if(RemoteServer.SECURE) {
|
||||
override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
|
||||
if (RemoteServer.SECURE) {
|
||||
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
||||
// Begin handshake.
|
||||
sslHandler.handshake().addListener( new ChannelFutureListener {
|
||||
def operationComplete(future : ChannelFuture) : Unit = {
|
||||
if(future.isSuccess)
|
||||
openChannels.add(future.getChannel)
|
||||
else
|
||||
future.getChannel.close
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) openChannels.add(future.getChannel)
|
||||
else future.getChannel.close
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
|
|
@ -499,8 +488,8 @@ class RemoteServerHandler(
|
|||
* Does not start the actor.
|
||||
*/
|
||||
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
||||
val name = actorInfo.getTarget
|
||||
val uuid = actorInfo.getUuid
|
||||
val name = actorInfo.getTarget
|
||||
val timeout = actorInfo.getTimeout
|
||||
|
||||
val actorRefOrNull = actors get uuid
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue