From 5b8ce4cc6b1adeba9d3a6d6ae52e6d70fe14b96d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jun 2012 12:17:41 +0200 Subject: [PATCH 01/13] Turning on the ssl test again --- .../test/scala/akka/remote/Ticket1978CommunicationSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 2ff63b20a4..94142b8f66 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -51,7 +51,7 @@ object Configuration { def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try { - if (true) throw new IllegalArgumentException("Test not enabled") + if (false) throw new IllegalArgumentException("Test not enabled") val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", ")))) val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") From 64566e6912db40711c6818771f7345cae1486d75 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jun 2012 22:44:01 +0200 Subject: [PATCH 02/13] Rewriting large parts of the SSL tests and adding cleanup to the code --- .../scala/akka/remote/RemoteTransport.scala | 8 +- .../remote/netty/NettyRemoteSupport.scala | 10 +- .../akka/remote/netty/NettySSLSupport.scala | 25 ++- .../remote/Ticket1978CommunicationSpec.scala | 151 ++++++------------ 4 files changed, 76 insertions(+), 118 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index aefd34ec74..f6b85dbc0d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -106,8 +106,14 @@ case class RemoteServerShutdown( case class RemoteServerError( @BeanProperty val cause: Throwable, @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { + + cause match { + case s: javax.net.ssl.SSLException ⇒ var e: Throwable = s; while (e.getCause ne null) e = e.getCause; println(Logging.stackTraceFor(e)) + case _ ⇒ + } + override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]" + override def toString: String = "RemoteServerError@" + remote + "] Error[" + Logging.stackTraceFor(cause) + "]" } /** diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5f62bb58c8..9fc64b0a68 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -65,17 +65,15 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * @param withTimeout determines whether an IdleStateHandler shall be included */ def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = - new ChannelPipelineFactory { - def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) - } + new ChannelPipelineFactory { override def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) } /** * Construct a default protocol stack, excluding the “head” handler (i.e. the one which * actually dispatches the received messages to the local target actors). */ def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = - (if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) ::: - (if (withTimeout) timeout :: Nil else Nil) ::: + (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: + (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler :: @@ -116,7 +114,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * protect the TCP port from unauthorized use (don’t rely on it too much, though, * as this is NOT a cryptographic feature). */ - def authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil + def authenticator = if (settings.RequireCookie) List(new RemoteServerAuthenticationHandler(settings.SecureCookie)) else Nil } /** diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index cca8662b2f..8915af559e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -60,11 +60,16 @@ private[akka] object NettySSLSupport { def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = try { - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) - trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? - trustManagerFactory.init(trustStore) - val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers + val trustManagers: Array[TrustManager] = { + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init({ + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = new FileInputStream(trustStorePath) + try trustStore.load(fin, trustStorePassword.toCharArray) finally fin.close() + trustStore + }) + trustManagerFactory.getTrustManagers + } Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e) @@ -102,9 +107,13 @@ private[akka] object NettySSLSupport { def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = try { val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) - val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) - keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? - factory.init(keyStore, keyStorePassword.toCharArray) + + factory.init({ + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = new FileInputStream(keyStorePath) + try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close() + keyStore + }, keyStorePassword.toCharArray) Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 778214a869..ee82e448bd 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -9,12 +9,12 @@ import com.typesafe.config._ import akka.dispatch.{ Await, Future } import akka.pattern.ask import java.io.File -import akka.event.{ NoLogging, LoggingAdapter } import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController } import netty.{ NettySettings, NettySSLSupport } import javax.net.ssl.SSLException import akka.util.{ Timeout, Duration } import akka.util.duration._ +import akka.event.{ Logging, NoLogging, LoggingAdapter } object Configuration { // set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager" @@ -32,6 +32,7 @@ object Configuration { remote.netty { hostname = localhost + port = %d ssl { enable = on trust-store = "%s" @@ -41,41 +42,41 @@ object Configuration { sha1prng-random-source = "/dev/./urandom" } } - actor.deployment { - /blub.remote = "akka://remote-sys@localhost:12346" - /looker/child.remote = "akka://remote-sys@localhost:12346" - /looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345" - } } """ - def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try { + case class CipherConfig(runTest: Boolean, config: Config, cipher: String, localPort: Int, remotePort: Int) - if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") + def getCipherConfig(cipher: String, enabled: String*): CipherConfig = { + val localPort, remotePort = { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() } + try { - val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", ")))) - val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") - val settings = new NettySettings(fullConfig, "placeholder") + //if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") - val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging) + val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", "))) + val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") + val settings = new NettySettings(fullConfig, "placeholder") - rng.nextInt() // Has to work - settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } + val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging) - val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine - val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet - val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet - gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) - gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) - engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get)) + rng.nextInt() // Has to work + settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } - (cipher, true, config) - } catch { - case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) ⇒ (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S + val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine + val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet + val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet + gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) + gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) + engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get)) + + CipherConfig(true, config, cipher, localPort, remotePort) + } catch { + case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) ⇒ CipherConfig(false, AkkaSpec.testConf, cipher, localPort, remotePort) // Cannot match against the message since the message might be localized :S + } } } -import Configuration.getCipherConfig +import Configuration.{ CipherConfig, getCipherConfig } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA")) @@ -99,101 +100,45 @@ class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(g class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA")) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf)) +class Ticket1978CrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "SSL_RSA_WITH_NULL_MD5")) -abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender { +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(CipherConfig(false, AkkaSpec.testConf, "NonExistingRNG", 12345, 12346)) + +abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) extends AkkaSpec(cipherConfig.config) with ImplicitSender { implicit val timeout: Timeout = Timeout(5 seconds) import RemoteCommunicationSpec._ - val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)) + lazy val other: ActorSystem = ActorSystem( + "remote-sys", + ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config)) override def atTermination() { - other.shutdown() - other.awaitTermination() + if (cipherConfig.runTest) { + other.shutdown() + other.awaitTermination() + } } - "SSL Remoting" must { - if (cipherEnabledconfig._2) { - val remote = other.actorOf(Props(new Actor { def receive = { case "ping" ⇒ sender ! (("pong", sender)) } }), "echo") + ("- SSL communication") must { + if (cipherConfig.runTest) { + val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) ⇒ sender ! ((("pong", x), sender)) } }), "echo") + val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address - val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo") - - "support remote look-ups" in { - here ! "ping" - expectMsgPF(timeout.duration) { - case ("pong", s: AnyRef) if s eq testActor ⇒ true - } - } - - "send error message for wrong address" ignore { - within(timeout.duration) { - EventFilter.error(start = "dropping", occurrences = 1).intercept { - system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" - }(other) - } + "support tell" in { + val here = system.actorFor(otherAddress.toString + "/user/echo") + for (i ← 1 to 100) here ! (("ping", i)) + for (i ← 1 to 100) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } "support ask" in { - Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") - } + val here = system.actorFor(otherAddress.toString + "/user/echo") + val f = for (i ← 1 to 1000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] + Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong")) } - "send dead letters on remote if actor does not exist" in { - within(timeout.duration) { - EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { - system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(other) - } - } - - "create and supervise children on remote node" in { - within(timeout.duration) { - val r = system.actorOf(Props[Echo], "blub") - r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub" - r ! 42 - expectMsg(42) - EventFilter[Exception]("crash", occurrences = 1).intercept { - r ! new Exception("crash") - }(other) - expectMsg("preRestart") - r ! 42 - expectMsg(42) - } - } - - "look-up actors across node boundaries" in { - within(timeout.duration) { - val l = system.actorOf(Props(new Actor { - def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) - case s: String ⇒ sender ! context.actorFor(s) - } - }), "looker") - l ! (Props[Echo], "child") - val r = expectMsgType[ActorRef] - r ! (Props[Echo], "grandchild") - val remref = expectMsgType[ActorRef] - remref.isInstanceOf[LocalActorRef] must be(true) - val myref = system.actorFor(system / "looker" / "child" / "grandchild") - myref.isInstanceOf[RemoteActorRef] must be(true) - myref ! 43 - expectMsg(43) - lastSender must be theSameInstanceAs remref - r.asInstanceOf[RemoteActorRef].getParent must be(l) - system.actorFor("/user/looker/child") must be theSameInstanceAs r - Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - } - } - - "not fail ask across node boundaries" in { - val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)] - Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong")) - } } else { "not be run when the cipher is not supported by the platform this test is currently being executed on" ignore { From 8a7c8a2cedff5f1cbfd404f4b853b3fec18a1229 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 00:10:06 +0200 Subject: [PATCH 03/13] Restructuring some of the SSL code to avoid the risk of races --- .../akka/remote/netty/NettyRemoteSupport.scala | 12 +++--------- .../akka/remote/netty/NettySSLSupport.scala | 17 ++++++++++------- .../akka/security/provider/AkkaProvider.scala | 6 +++--- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 9fc64b0a68..a596f783d7 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -24,7 +24,7 @@ import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefPr import akka.util.NonFatal import akka.actor.{ ExtendedActorSystem, Address, ActorRef } -object ChannelAddress extends ChannelLocal[Option[Address]] { +private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] { override def initialValue(ch: Channel): Option[Address] = None } @@ -54,9 +54,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * in implementations of ChannelPipelineFactory. */ def apply(handlers: Seq[ChannelHandler]): DefaultChannelPipeline = - handlers.foldLeft(new DefaultChannelPipeline) { - (pipe, handler) ⇒ pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe - } + (new DefaultChannelPipeline /: handlers) { (p, h) ⇒ p.addLast(Logging.simpleName(h.getClass), h); p } /** * Constructs the NettyRemoteTransport default pipeline with the give “head” handler, which @@ -73,11 +71,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider */ def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: - (if (withTimeout) List(timeout) else Nil) ::: - msgFormat ::: - authenticator ::: - executionHandler :: - Nil + (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler :: Nil /** * Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer. diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 8915af559e..0ab188425c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -60,6 +60,7 @@ private[akka] object NettySSLSupport { def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = try { + val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log) val trustManagers: Array[TrustManager] = { val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) trustManagerFactory.init({ @@ -70,7 +71,7 @@ private[akka] object NettySSLSupport { }) trustManagerFactory.getTrustManagers } - Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, rng); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e) case e: IOException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e) @@ -87,10 +88,12 @@ private[akka] object NettySSLSupport { }) match { case Some(context) ⇒ log.debug("Using client SSL context to create SSLEngine ...") - val sslEngine = context.createSSLEngine - sslEngine.setUseClientMode(true) - sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString)) - new SslHandler(sslEngine) + new SslHandler({ + val sslEngine = context.createSSLEngine + sslEngine.setUseClientMode(true) + sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString)) + sslEngine + }) case None ⇒ throw new GeneralSecurityException( """Failed to initialize client SSL because SSL context could not be found." + @@ -106,15 +109,15 @@ private[akka] object NettySSLSupport { def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = try { + val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log) val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) - factory.init({ val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) val fin = new FileInputStream(keyStorePath) try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close() keyStore }, keyStorePassword.toCharArray) - Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, rng); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e) diff --git a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala index f44aeae584..1ed93557a6 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala @@ -12,9 +12,9 @@ final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that i AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] { def run = { //SecureRandom - put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast") - put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure") - put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure") + put("SecureRandom.AES128CounterRNGFast", classOf[AES128CounterRNGFast].getName) + put("SecureRandom.AES128CounterRNGSecure", classOf[AES128CounterRNGSecure].getName) + put("SecureRandom.AES256CounterRNGSecure", classOf[AES256CounterRNGSecure].getName) //Implementation type: software or hardware put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software") From dbe72a6bf3e1464049e9f71fed21abe5a70eceb4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 00:47:11 +0200 Subject: [PATCH 04/13] Making AkkaProvider a Scala Object --- .../main/scala/akka/remote/netty/NettySSLSupport.scala | 9 ++++----- .../src/main/scala/akka/remote/netty/Settings.scala | 3 ++- .../main/scala/akka/security/provider/AkkaProvider.scala | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 0ab188425c..9a16a1e5f5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -18,8 +18,7 @@ import akka.security.provider.AkkaProvider */ private[akka] object NettySSLSupport { - val akka = new AkkaProvider - Security.addProvider(akka) + Security addProvider AkkaProvider /** * Construct a SSLHandler which can be inserted into a Netty server/client pipeline @@ -38,7 +37,7 @@ private[akka] object NettySSLSupport { val rng = rngName match { case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure")) ⇒ log.debug("SSL random number generator set to: {}", r) - SecureRandom.getInstance(r, akka) + SecureRandom.getInstance(r, AkkaProvider) case Some("SHA1PRNG") ⇒ log.debug("SSL random number generator set to: SHA1PRNG") // This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking @@ -91,7 +90,7 @@ private[akka] object NettySSLSupport { new SslHandler({ val sslEngine = context.createSSLEngine sslEngine.setUseClientMode(true) - sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString)) + sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray) sslEngine }) case None ⇒ @@ -133,7 +132,7 @@ private[akka] object NettySSLSupport { log.debug("Using server SSL context to create SSLEngine ...") val sslEngine = context.createSSLEngine sslEngine.setUseClientMode(false) - sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString)) + sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray) new SslHandler(sslEngine) case None ⇒ throw new GeneralSecurityException( """Failed to initialize server SSL because SSL context could not be found. diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index d168c67eca..ada257f674 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -8,6 +8,7 @@ import akka.util.Duration import java.util.concurrent.TimeUnit._ import java.net.InetAddress import akka.ConfigurationException +import scala.collection.JavaConverters.iterableAsScalaIterableConverter private[akka] class NettySettings(config: Config, val systemName: String) { @@ -106,7 +107,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) { case password ⇒ Some(password) } - val SSLEnabledAlgorithms = getStringList("ssl.enabled-algorithms").toArray.toSet + val SSLEnabledAlgorithms = iterableAsScalaIterableConverter(getStringList("ssl.enabled-algorithms")).asScala.toSet[String] val SSLProtocol = getString("ssl.protocol") match { case "" ⇒ None diff --git a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala index 1ed93557a6..0b85231348 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala @@ -3,13 +3,13 @@ */ package akka.security.provider -import java.security.{ PrivilegedAction, AccessController, Provider } +import java.security.{ PrivilegedAction, AccessController, Provider, Security } /** * A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom */ -final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") { - AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] { +object AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") { + AccessController.doPrivileged(new PrivilegedAction[this.type] { def run = { //SecureRandom put("SecureRandom.AES128CounterRNGFast", classOf[AES128CounterRNGFast].getName) From 95419ba82f76539152882fc25e0187f44d049f10 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 01:56:15 +0200 Subject: [PATCH 05/13] Somehow the code for the fast and the secure was flipped --- .../akka/security/provider/AES128CounterRNGFast.scala | 6 +++--- .../akka/security/provider/AES128CounterRNGSecure.scala | 6 +++--- .../src/test/scala/akka/remote/Ticket1978ConfigSpec.scala | 7 +------ 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala index c355f5a548..1c58c4f1d0 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala @@ -3,14 +3,14 @@ */ package akka.security.provider -import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } +import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } import java.security.SecureRandom /** * Internal API */ class AES128CounterRNGFast extends java.security.SecureRandomSpi { - private val rng = new AESCounterRNG(new SecureRandomSeedGenerator()) + private val rng = new AESCounterRNG() /** * This is managed internally only @@ -31,6 +31,6 @@ class AES128CounterRNGFast extends java.security.SecureRandomSpi { * @param numBytes the number of seed bytes to generate. * @return the seed bytes. */ - override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandom).generateSeed(numBytes) + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) } diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala index 846476cc2d..60beecded7 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala @@ -3,13 +3,13 @@ */ package akka.security.provider -import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } +import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } /** * Internal API */ class AES128CounterRNGSecure extends java.security.SecureRandomSpi { - private val rng = new AESCounterRNG() + private val rng = new AESCounterRNG(new SecureRandomSeedGenerator()) /** * This is managed internally only @@ -30,6 +30,6 @@ class AES128CounterRNGSecure extends java.security.SecureRandomSpi { * @param numBytes the number of seed bytes to generate. * @return the seed bytes. */ - override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandomSeedGenerator()).generateSeed(numBytes) } diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala index 4c39b94087..0a39d20a9a 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala @@ -15,12 +15,7 @@ akka { actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty { hostname = localhost - port = 12345 - } - actor.deployment { - /blub.remote = "akka://remote-sys@localhost:12346" - /looker/child.remote = "akka://remote-sys@localhost:12346" - /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" + port = 0 } } """) with ImplicitSender with DefaultTimeout { From b062539ae3280cb62c8f4ca9d079475b63555d36 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 10:58:30 +0200 Subject: [PATCH 06/13] Adding the setting of securerandom.source and support for NativePRNG --- .../scala/akka/remote/netty/NettySSLSupport.scala | 15 +++++++++------ .../akka/remote/Ticket1978CommunicationSpec.scala | 4 ++-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 9a16a1e5f5..9323fb8143 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -9,8 +9,8 @@ import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSL import akka.remote.RemoteTransportException import akka.event.LoggingAdapter import java.io.{ IOException, FileNotFoundException, FileInputStream } -import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security } import akka.security.provider.AkkaProvider +import java.security._ /** * Used for adding SSL support to Netty pipeline @@ -32,17 +32,20 @@ private[akka] object NettySSLSupport { * Using /dev/./urandom is only necessary when using SHA1PRNG on Linux * Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")' to avoid having problems */ - sourceOfRandomness foreach { path ⇒ System.setProperty("java.security.egd", path) } + sourceOfRandomness foreach { path ⇒ + System.setProperty("java.security.egd", path) + System.setProperty("securerandom.source", path) + } val rng = rngName match { case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure")) ⇒ log.debug("SSL random number generator set to: {}", r) SecureRandom.getInstance(r, AkkaProvider) - case Some("SHA1PRNG") ⇒ - log.debug("SSL random number generator set to: SHA1PRNG") - // This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking + case Some(s @ ("SHA1PRNG" | "NativePRNG")) ⇒ + log.debug("SSL random number generator set to: " + s) + // SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking // However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD). - SecureRandom.getInstance("SHA1PRNG") + SecureRandom.getInstance(s) case Some(unknown) ⇒ log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown) new SecureRandom diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 6344d0c435..c31784f313 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -128,8 +128,8 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten "support tell" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - for (i ← 1 to 100) here ! (("ping", i)) - for (i ← 1 to 100) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } + for (i ← 1 to 1000) here ! (("ping", i)) + for (i ← 1 to 1000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } "support ask" in { From 28ee78bfd5b8529a1f2fc9c752dc4d951c91e9f4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 14:29:41 +0200 Subject: [PATCH 07/13] Adding support for being able to _not_ use ExecutionHandler by setting the poolsize to 0 --- akka-remote/src/main/resources/reference.conf | 3 ++- .../akka/remote/netty/NettyRemoteSupport.scala | 18 ++++++++++-------- .../scala/akka/remote/netty/Settings.scala | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 9cd7b767be..22b0ce3226 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -122,7 +122,8 @@ akka { # (I) Length in akka.time-unit how long core threads will be kept alive if idling execution-pool-keepalive = 60s - # (I) Size of the core pool of the remote execution unit + # (I) Size in number of threads of the core pool of the remote execution unit, + # set to 0 to disable the execution pool execution-pool-size = 4 # (I) Maximum channel size, 0 for off diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a596f783d7..da67ea4f06 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -71,7 +71,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider */ def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: - (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler :: Nil + (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler /** * Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer. @@ -95,13 +95,15 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * happen on a netty thread (that could be bad if re-sending over the network for * remote-deployed actors). */ - val executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor( - settings.ExecutionPoolSize, - settings.MaxChannelMemorySize, - settings.MaxTotalMemorySize, - settings.ExecutionPoolKeepalive.length, - settings.ExecutionPoolKeepalive.unit, - system.threadFactory)) + val executionHandler = if (settings.ExecutionPoolSize != 0) + List(new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor( + settings.ExecutionPoolSize, + settings.MaxChannelMemorySize, + settings.MaxTotalMemorySize, + settings.ExecutionPoolKeepalive.length, + settings.ExecutionPoolKeepalive.unit, + system.threadFactory))) + else Nil /** * Construct and authentication handler which uses the SecureCookie to somewhat diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index ada257f674..9babf6005c 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -73,7 +73,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) { val ExecutionPoolKeepalive: Duration = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) val ExecutionPoolSize: Int = getInt("execution-pool-size") match { - case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 1") + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 0") case sz ⇒ sz } From 8fcffcab00eff254b0483d7b68e6a6e70d690c42 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 14:43:11 +0200 Subject: [PATCH 08/13] Doing a roundtrip before issuing the wave of messages --- .../akka/remote/Ticket1978CommunicationSpec.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index c31784f313..04ec0b88ec 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -121,20 +121,26 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten } } - ("- SSL communication") must { + ("-") must { if (cipherConfig.runTest) { val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) ⇒ sender ! ((("pong", x), sender)) } }), "echo") val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address "support tell" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - for (i ← 1 to 1000) here ! (("ping", i)) - for (i ← 1 to 1000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } + + Await.result(here ? (("ping", -1)) mapTo manifest[((String, Int), ActorRef)], timeout.duration)._1 must be(("pong", -1)) + + for (i ← 1 to 10000) here ! (("ping", i)) + for (i ← 1 to 10000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } "support ask" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - val f = for (i ← 1 to 1000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] + + Await.result(here ? (("ping", -1)) mapTo manifest[((String, Int), ActorRef)], timeout.duration)._1 must be(("pong", -1)) + + val f = for (i ← 1 to 10000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong")) } From 6be3acacec1470e9835a107d70e3727553a3163a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 15:01:14 +0200 Subject: [PATCH 09/13] I think we have a winner --- .../main/scala/akka/remote/netty/Client.scala | 23 +++++++++++-------- .../remote/Ticket1978CommunicationSpec.scala | 4 ---- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 0917086d4d..96484d164f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -18,6 +18,7 @@ import akka.AkkaException import akka.event.Logging import akka.actor.{ DeadLetter, Address, ActorRef } import akka.util.{ NonFatal, Switch } +import org.jboss.netty.handler.ssl.SslHandler /** * This is the abstract baseclass for netty remote clients, currently there's only an @@ -115,15 +116,19 @@ private[akka] class ActiveRemoteClient private[akka] ( */ def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { - def sendSecureCookie(connection: ChannelFuture) { - val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) - if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get) - handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(localAddress.system) - .setHostname(localAddress.host.get) - .setPort(localAddress.port.get) - .build) - connection.getChannel.write(netty.createControlEnvelope(handshake.build)) + // Returns whether the handshake was written to the channel or not + def sendSecureCookie(connection: ChannelFuture): Boolean = { + if (!settings.EnableSSL || connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly().isSuccess) { + val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) + if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get) + handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setSystem(localAddress.system) + .setHostname(localAddress.host.get) + .setPort(localAddress.port.get) + .build) + connection.getChannel.write(netty.createControlEnvelope(handshake.build)) + true + } else false } def attemptReconnect(): Boolean = { diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 04ec0b88ec..19c8c7432f 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -129,8 +129,6 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten "support tell" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - Await.result(here ? (("ping", -1)) mapTo manifest[((String, Int), ActorRef)], timeout.duration)._1 must be(("pong", -1)) - for (i ← 1 to 10000) here ! (("ping", i)) for (i ← 1 to 10000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } @@ -138,8 +136,6 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten "support ask" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - Await.result(here ? (("ping", -1)) mapTo manifest[((String, Int), ActorRef)], timeout.duration)._1 must be(("pong", -1)) - val f = for (i ← 1 to 10000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong")) } From cb4831d52e32da8ad9afa85851b62af87a8a6995 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 15:19:24 +0200 Subject: [PATCH 10/13] Rearchitecting the reconnection semantics since we need to handle the ssl handshake there as well --- .../main/scala/akka/remote/netty/Client.scala | 36 +++++++++---------- .../remote/Ticket1978CommunicationSpec.scala | 8 ++--- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 96484d164f..35c0674d23 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -118,7 +118,15 @@ private[akka] class ActiveRemoteClient private[akka] ( // Returns whether the handshake was written to the channel or not def sendSecureCookie(connection: ChannelFuture): Boolean = { - if (!settings.EnableSSL || connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly().isSuccess) { + val future = + if (!connection.isSuccess || !settings.EnableSSL) connection + else connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly() + + if (!future.isSuccess) { + notifyListeners(RemoteClientError(future.getCause, netty, remoteAddress)) + false + } else { + ChannelAddress.set(connection.getChannel, Some(remoteAddress)) val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder @@ -128,7 +136,7 @@ private[akka] class ActiveRemoteClient private[akka] ( .build) connection.getChannel.write(netty.createControlEnvelope(handshake.build)) true - } else false + } } def attemptReconnect(): Boolean = { @@ -136,14 +144,7 @@ private[akka] class ActiveRemoteClient private[akka] ( log.debug("Remote client reconnecting to [{}|{}]", remoteAddress, remoteIP) connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get)) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress)) - false - } else { - sendSecureCookie(connection) - true - } + sendSecureCookie(connection) } runSwitch switchOn { @@ -168,24 +169,19 @@ private[akka] class ActiveRemoteClient private[akka] ( openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress)) - false - } else { - ChannelAddress.set(connection.getChannel, Some(remoteAddress)) - sendSecureCookie(connection) + if (sendSecureCookie(connection)) { notifyListeners(RemoteClientStarted(netty, remoteAddress)) true + } else { + connection.getChannel.close() + openChannels.remove(connection.getChannel) + false } } match { case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ - connection.getChannel.close() - openChannels.remove(connection.getChannel) - log.debug("Remote client reconnecting to [{}]", remoteAddress) attemptReconnect() - case false ⇒ false } } diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 19c8c7432f..79196f321f 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -106,7 +106,7 @@ class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(Cip abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) extends AkkaSpec(cipherConfig.config) with ImplicitSender { - implicit val timeout: Timeout = Timeout(5 seconds) + implicit val timeout: Timeout = Timeout(10 seconds) import RemoteCommunicationSpec._ @@ -129,14 +129,14 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten "support tell" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - for (i ← 1 to 10000) here ! (("ping", i)) - for (i ← 1 to 10000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } + for (i ← 1 to 1000) here ! (("ping", i)) + for (i ← 1 to 1000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } "support ask" in { val here = system.actorFor(otherAddress.toString + "/user/echo") - val f = for (i ← 1 to 10000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] + val f = for (i ← 1 to 1000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong")) } From ebcdb5d01709bf4a75ab4eccbc72d80340959a57 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 15:45:50 +0200 Subject: [PATCH 11/13] Removing SSL debug residue --- .../src/main/scala/akka/remote/RemoteTransport.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index f6b85dbc0d..ecd59c40e0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -107,13 +107,8 @@ case class RemoteServerError( @BeanProperty val cause: Throwable, @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - cause match { - case s: javax.net.ssl.SSLException ⇒ var e: Throwable = s; while (e.getCause ne null) e = e.getCause; println(Logging.stackTraceFor(e)) - case _ ⇒ - } - override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = "RemoteServerError@" + remote + "] Error[" + Logging.stackTraceFor(cause) + "]" + override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]" } /** From 3f86abaac88c5f6800f76032f53393fcb85de538 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 15:50:22 +0200 Subject: [PATCH 12/13] Review reformatting --- .../main/scala/akka/remote/netty/NettyRemoteSupport.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index da67ea4f06..7dbce1b5af 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -71,7 +71,10 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider */ def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: - (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler + (if (withTimeout) List(timeout) else Nil) ::: + msgFormat ::: + authenticator ::: + executionHandler /** * Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer. From f2a857046d79b68af6b4c0c77475557a4ca7cc99 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 20 Jun 2012 15:59:52 +0200 Subject: [PATCH 13/13] Using softer language and warning about using 0 threads for the execution handler --- akka-remote/src/main/resources/reference.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 22b0ce3226..0ba9bb3b06 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -122,8 +122,8 @@ akka { # (I) Length in akka.time-unit how long core threads will be kept alive if idling execution-pool-keepalive = 60s - # (I) Size in number of threads of the core pool of the remote execution unit, - # set to 0 to disable the execution pool + # (I) Size in number of threads of the core pool of the remote execution unit. + # A value of 0 will turn this off, which is can lead to deadlocks under some configurations! execution-pool-size = 4 # (I) Maximum channel size, 0 for off