From 64566e6912db40711c6818771f7345cae1486d75 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jun 2012 22:44:01 +0200 Subject: [PATCH] Rewriting large parts of the SSL tests and adding cleanup to the code --- .../scala/akka/remote/RemoteTransport.scala | 8 +- .../remote/netty/NettyRemoteSupport.scala | 10 +- .../akka/remote/netty/NettySSLSupport.scala | 25 ++- .../remote/Ticket1978CommunicationSpec.scala | 151 ++++++------------ 4 files changed, 76 insertions(+), 118 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index aefd34ec74..f6b85dbc0d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -106,8 +106,14 @@ case class RemoteServerShutdown( case class RemoteServerError( @BeanProperty val cause: Throwable, @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { + + cause match { + case s: javax.net.ssl.SSLException ⇒ var e: Throwable = s; while (e.getCause ne null) e = e.getCause; println(Logging.stackTraceFor(e)) + case _ ⇒ + } + override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]" + override def toString: String = "RemoteServerError@" + remote + "] Error[" + Logging.stackTraceFor(cause) + "]" } /** diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5f62bb58c8..9fc64b0a68 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -65,17 +65,15 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * @param withTimeout determines whether an IdleStateHandler shall be included */ def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = - new ChannelPipelineFactory { - def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) - } + new ChannelPipelineFactory { override def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) } /** * Construct a default protocol stack, excluding the “head” handler (i.e. the one which * actually dispatches the received messages to the local target actors). */ def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = - (if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) ::: - (if (withTimeout) timeout :: Nil else Nil) ::: + (if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) ::: + (if (withTimeout) List(timeout) else Nil) ::: msgFormat ::: authenticator ::: executionHandler :: @@ -116,7 +114,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * protect the TCP port from unauthorized use (don’t rely on it too much, though, * as this is NOT a cryptographic feature). */ - def authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil + def authenticator = if (settings.RequireCookie) List(new RemoteServerAuthenticationHandler(settings.SecureCookie)) else Nil } /** diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index cca8662b2f..8915af559e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -60,11 +60,16 @@ private[akka] object NettySSLSupport { def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = try { - val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) - val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) - trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? - trustManagerFactory.init(trustStore) - val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers + val trustManagers: Array[TrustManager] = { + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init({ + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = new FileInputStream(trustStorePath) + try trustStore.load(fin, trustStorePassword.toCharArray) finally fin.close() + trustStore + }) + trustManagerFactory.getTrustManagers + } Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e) @@ -102,9 +107,13 @@ private[akka] object NettySSLSupport { def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = try { val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) - val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) - keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? - factory.init(keyStore, keyStorePassword.toCharArray) + + factory.init({ + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = new FileInputStream(keyStorePath) + try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close() + keyStore + }, keyStorePassword.toCharArray) Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 778214a869..ee82e448bd 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -9,12 +9,12 @@ import com.typesafe.config._ import akka.dispatch.{ Await, Future } import akka.pattern.ask import java.io.File -import akka.event.{ NoLogging, LoggingAdapter } import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController } import netty.{ NettySettings, NettySSLSupport } import javax.net.ssl.SSLException import akka.util.{ Timeout, Duration } import akka.util.duration._ +import akka.event.{ Logging, NoLogging, LoggingAdapter } object Configuration { // set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager" @@ -32,6 +32,7 @@ object Configuration { remote.netty { hostname = localhost + port = %d ssl { enable = on trust-store = "%s" @@ -41,41 +42,41 @@ object Configuration { sha1prng-random-source = "/dev/./urandom" } } - actor.deployment { - /blub.remote = "akka://remote-sys@localhost:12346" - /looker/child.remote = "akka://remote-sys@localhost:12346" - /looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345" - } } """ - def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try { + case class CipherConfig(runTest: Boolean, config: Config, cipher: String, localPort: Int, remotePort: Int) - if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") + def getCipherConfig(cipher: String, enabled: String*): CipherConfig = { + val localPort, remotePort = { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() } + try { - val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", ")))) - val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") - val settings = new NettySettings(fullConfig, "placeholder") + //if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled") - val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging) + val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", "))) + val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") + val settings = new NettySettings(fullConfig, "placeholder") - rng.nextInt() // Has to work - settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } + val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging) - val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine - val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet - val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet - gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) - gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) - engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get)) + rng.nextInt() // Has to work + settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } - (cipher, true, config) - } catch { - case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) ⇒ (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S + val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine + val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet + val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet + gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) + gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) + engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get)) + + CipherConfig(true, config, cipher, localPort, remotePort) + } catch { + case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) ⇒ CipherConfig(false, AkkaSpec.testConf, cipher, localPort, remotePort) // Cannot match against the message since the message might be localized :S + } } } -import Configuration.getCipherConfig +import Configuration.{ CipherConfig, getCipherConfig } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA")) @@ -99,101 +100,45 @@ class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(g class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA")) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf)) +class Ticket1978CrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "SSL_RSA_WITH_NULL_MD5")) -abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender { +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(CipherConfig(false, AkkaSpec.testConf, "NonExistingRNG", 12345, 12346)) + +abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) extends AkkaSpec(cipherConfig.config) with ImplicitSender { implicit val timeout: Timeout = Timeout(5 seconds) import RemoteCommunicationSpec._ - val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)) + lazy val other: ActorSystem = ActorSystem( + "remote-sys", + ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config)) override def atTermination() { - other.shutdown() - other.awaitTermination() + if (cipherConfig.runTest) { + other.shutdown() + other.awaitTermination() + } } - "SSL Remoting" must { - if (cipherEnabledconfig._2) { - val remote = other.actorOf(Props(new Actor { def receive = { case "ping" ⇒ sender ! (("pong", sender)) } }), "echo") + ("- SSL communication") must { + if (cipherConfig.runTest) { + val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) ⇒ sender ! ((("pong", x), sender)) } }), "echo") + val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address - val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo") - - "support remote look-ups" in { - here ! "ping" - expectMsgPF(timeout.duration) { - case ("pong", s: AnyRef) if s eq testActor ⇒ true - } - } - - "send error message for wrong address" ignore { - within(timeout.duration) { - EventFilter.error(start = "dropping", occurrences = 1).intercept { - system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" - }(other) - } + "support tell" in { + val here = system.actorFor(otherAddress.toString + "/user/echo") + for (i ← 1 to 100) here ! (("ping", i)) + for (i ← 1 to 100) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) ⇒ true } } "support ask" in { - Await.result(here ? "ping", timeout.duration) match { - case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good - case m ⇒ fail(m + " was not (pong, AskActorRef)") - } + val here = system.actorFor(otherAddress.toString + "/user/echo") + val f = for (i ← 1 to 1000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)] + Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong")) } - "send dead letters on remote if actor does not exist" in { - within(timeout.duration) { - EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { - system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(other) - } - } - - "create and supervise children on remote node" in { - within(timeout.duration) { - val r = system.actorOf(Props[Echo], "blub") - r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub" - r ! 42 - expectMsg(42) - EventFilter[Exception]("crash", occurrences = 1).intercept { - r ! new Exception("crash") - }(other) - expectMsg("preRestart") - r ! 42 - expectMsg(42) - } - } - - "look-up actors across node boundaries" in { - within(timeout.duration) { - val l = system.actorOf(Props(new Actor { - def receive = { - case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) - case s: String ⇒ sender ! context.actorFor(s) - } - }), "looker") - l ! (Props[Echo], "child") - val r = expectMsgType[ActorRef] - r ! (Props[Echo], "grandchild") - val remref = expectMsgType[ActorRef] - remref.isInstanceOf[LocalActorRef] must be(true) - val myref = system.actorFor(system / "looker" / "child" / "grandchild") - myref.isInstanceOf[RemoteActorRef] must be(true) - myref ! 43 - expectMsg(43) - lastSender must be theSameInstanceAs remref - r.asInstanceOf[RemoteActorRef].getParent must be(l) - system.actorFor("/user/looker/child") must be theSameInstanceAs r - Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l - } - } - - "not fail ask across node boundaries" in { - val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)] - Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong")) - } } else { "not be run when the cipher is not supported by the platform this test is currently being executed on" ignore {