Merge pull request #554 from akka/wip-ssl-unbroken-√

Wip ssl unbroken √
This commit is contained in:
viktorklang 2012-06-20 07:03:29 -07:00
commit 7700b04bfb
11 changed files with 152 additions and 196 deletions

View file

@ -122,7 +122,8 @@ akka {
# (I) Length in akka.time-unit how long core threads will be kept alive if idling
execution-pool-keepalive = 60s
# (I) Size of the core pool of the remote execution unit
# (I) Size in number of threads of the core pool of the remote execution unit.
# A value of 0 will turn this off, which is can lead to deadlocks under some configurations!
execution-pool-size = 4
# (I) Maximum channel size, 0 for off

View file

@ -106,6 +106,7 @@ case class RemoteServerShutdown(
case class RemoteServerError(
@BeanProperty val cause: Throwable,
@transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent {
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]"
}

View file

@ -18,6 +18,7 @@ import akka.AkkaException
import akka.event.Logging
import akka.actor.{ DeadLetter, Address, ActorRef }
import akka.util.{ NonFatal, Switch }
import org.jboss.netty.handler.ssl.SslHandler
/**
* This is the abstract baseclass for netty remote clients, currently there's only an
@ -115,15 +116,27 @@ private[akka] class ActiveRemoteClient private[akka] (
*/
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
def sendSecureCookie(connection: ChannelFuture) {
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setSystem(localAddress.system)
.setHostname(localAddress.host.get)
.setPort(localAddress.port.get)
.build)
connection.getChannel.write(netty.createControlEnvelope(handshake.build))
// Returns whether the handshake was written to the channel or not
def sendSecureCookie(connection: ChannelFuture): Boolean = {
val future =
if (!connection.isSuccess || !settings.EnableSSL) connection
else connection.getChannel.getPipeline.get[SslHandler](classOf[SslHandler]).handshake().awaitUninterruptibly()
if (!future.isSuccess) {
notifyListeners(RemoteClientError(future.getCause, netty, remoteAddress))
false
} else {
ChannelAddress.set(connection.getChannel, Some(remoteAddress))
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setSystem(localAddress.system)
.setHostname(localAddress.host.get)
.setPort(localAddress.port.get)
.build)
connection.getChannel.write(netty.createControlEnvelope(handshake.build))
true
}
}
def attemptReconnect(): Boolean = {
@ -131,14 +144,7 @@ private[akka] class ActiveRemoteClient private[akka] (
log.debug("Remote client reconnecting to [{}|{}]", remoteAddress, remoteIP)
connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get))
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress))
false
} else {
sendSecureCookie(connection)
true
}
sendSecureCookie(connection)
}
runSwitch switchOn {
@ -163,24 +169,19 @@ private[akka] class ActiveRemoteClient private[akka] (
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress))
false
} else {
ChannelAddress.set(connection.getChannel, Some(remoteAddress))
sendSecureCookie(connection)
if (sendSecureCookie(connection)) {
notifyListeners(RemoteClientStarted(netty, remoteAddress))
true
} else {
connection.getChannel.close()
openChannels.remove(connection.getChannel)
false
}
} match {
case true true
case false if reconnectIfAlreadyConnected
connection.getChannel.close()
openChannels.remove(connection.getChannel)
log.debug("Remote client reconnecting to [{}]", remoteAddress)
attemptReconnect()
case false false
}
}

View file

@ -24,7 +24,7 @@ import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefPr
import akka.util.NonFatal
import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
object ChannelAddress extends ChannelLocal[Option[Address]] {
private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] {
override def initialValue(ch: Channel): Option[Address] = None
}
@ -54,9 +54,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* in implementations of ChannelPipelineFactory.
*/
def apply(handlers: Seq[ChannelHandler]): DefaultChannelPipeline =
handlers.foldLeft(new DefaultChannelPipeline) {
(pipe, handler) pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe
}
(new DefaultChannelPipeline /: handlers) { (p, h) p.addLast(Logging.simpleName(h.getClass), h); p }
/**
* Constructs the NettyRemoteTransport default pipeline with the give head handler, which
@ -65,21 +63,18 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* @param withTimeout determines whether an IdleStateHandler shall be included
*/
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint)
}
new ChannelPipelineFactory { override def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) }
/**
* Construct a default protocol stack, excluding the head handler (i.e. the one which
* actually dispatches the received messages to the local target actors).
*/
def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] =
(if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) :::
(if (withTimeout) timeout :: Nil else Nil) :::
msgFormat :::
authenticator :::
executionHandler ::
Nil
(if (settings.EnableSSL) List(NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient)) else Nil) :::
(if (withTimeout) List(timeout) else Nil) :::
msgFormat :::
authenticator :::
executionHandler
/**
* Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer.
@ -103,20 +98,22 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* happen on a netty thread (that could be bad if re-sending over the network for
* remote-deployed actors).
*/
val executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(
settings.ExecutionPoolSize,
settings.MaxChannelMemorySize,
settings.MaxTotalMemorySize,
settings.ExecutionPoolKeepalive.length,
settings.ExecutionPoolKeepalive.unit,
system.threadFactory))
val executionHandler = if (settings.ExecutionPoolSize != 0)
List(new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(
settings.ExecutionPoolSize,
settings.MaxChannelMemorySize,
settings.MaxTotalMemorySize,
settings.ExecutionPoolKeepalive.length,
settings.ExecutionPoolKeepalive.unit,
system.threadFactory)))
else Nil
/**
* Construct and authentication handler which uses the SecureCookie to somewhat
* protect the TCP port from unauthorized use (dont rely on it too much, though,
* as this is NOT a cryptographic feature).
*/
def authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil
def authenticator = if (settings.RequireCookie) List(new RemoteServerAuthenticationHandler(settings.SecureCookie)) else Nil
}
/**

View file

@ -9,8 +9,8 @@ import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSL
import akka.remote.RemoteTransportException
import akka.event.LoggingAdapter
import java.io.{ IOException, FileNotFoundException, FileInputStream }
import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security }
import akka.security.provider.AkkaProvider
import java.security._
/**
* Used for adding SSL support to Netty pipeline
@ -18,8 +18,7 @@ import akka.security.provider.AkkaProvider
*/
private[akka] object NettySSLSupport {
val akka = new AkkaProvider
Security.addProvider(akka)
Security addProvider AkkaProvider
/**
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
@ -33,17 +32,20 @@ private[akka] object NettySSLSupport {
* Using /dev/./urandom is only necessary when using SHA1PRNG on Linux
* <quote>Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")'</quote> to avoid having problems
*/
sourceOfRandomness foreach { path System.setProperty("java.security.egd", path) }
sourceOfRandomness foreach { path
System.setProperty("java.security.egd", path)
System.setProperty("securerandom.source", path)
}
val rng = rngName match {
case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure"))
log.debug("SSL random number generator set to: {}", r)
SecureRandom.getInstance(r, akka)
case Some("SHA1PRNG")
log.debug("SSL random number generator set to: SHA1PRNG")
// This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
SecureRandom.getInstance(r, AkkaProvider)
case Some(s @ ("SHA1PRNG" | "NativePRNG"))
log.debug("SSL random number generator set to: " + s)
// SHA1PRNG needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
SecureRandom.getInstance("SHA1PRNG")
SecureRandom.getInstance(s)
case Some(unknown)
log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
new SecureRandom
@ -60,12 +62,18 @@ private[akka] object NettySSLSupport {
def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] =
try {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
trustManagerFactory.init(trustStore)
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)
val trustManagers: Array[TrustManager] = {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
trustManagerFactory.init({
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
val fin = new FileInputStream(trustStorePath)
try trustStore.load(fin, trustStorePassword.toCharArray) finally fin.close()
trustStore
})
trustManagerFactory.getTrustManagers
}
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(null, trustManagers, rng); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
@ -82,10 +90,12 @@ private[akka] object NettySSLSupport {
}) match {
case Some(context)
log.debug("Using client SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(true)
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
new SslHandler({
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(true)
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray)
sslEngine
})
case None
throw new GeneralSecurityException(
"""Failed to initialize client SSL because SSL context could not be found." +
@ -101,11 +111,15 @@ private[akka] object NettySSLSupport {
def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] =
try {
val rng = initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
factory.init(keyStore, keyStorePassword.toCharArray)
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
factory.init({
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
val fin = new FileInputStream(keyStorePath)
try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close()
keyStore
}, keyStorePassword.toCharArray)
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(factory.getKeyManagers, null, rng); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
@ -121,7 +135,7 @@ private[akka] object NettySSLSupport {
log.debug("Using server SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(false)
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray.map(_.toString))
sslEngine.setEnabledCipherSuites(settings.SSLEnabledAlgorithms.toArray)
new SslHandler(sslEngine)
case None throw new GeneralSecurityException(
"""Failed to initialize server SSL because SSL context could not be found.

View file

@ -8,6 +8,7 @@ import akka.util.Duration
import java.util.concurrent.TimeUnit._
import java.net.InetAddress
import akka.ConfigurationException
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
private[akka] class NettySettings(config: Config, val systemName: String) {
@ -72,7 +73,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
val ExecutionPoolKeepalive: Duration = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS)
val ExecutionPoolSize: Int = getInt("execution-pool-size") match {
case sz if sz < 1 throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 1")
case sz if sz < 0 throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 0")
case sz sz
}
@ -106,7 +107,7 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
case password Some(password)
}
val SSLEnabledAlgorithms = getStringList("ssl.enabled-algorithms").toArray.toSet
val SSLEnabledAlgorithms = iterableAsScalaIterableConverter(getStringList("ssl.enabled-algorithms")).asScala.toSet[String]
val SSLProtocol = getString("ssl.protocol") match {
case "" None

View file

@ -3,14 +3,14 @@
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
import java.security.SecureRandom
/**
* Internal API
*/
class AES128CounterRNGFast extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG(new SecureRandomSeedGenerator())
private val rng = new AESCounterRNG()
/**
* This is managed internally only
@ -31,6 +31,6 @@ class AES128CounterRNGFast extends java.security.SecureRandomSpi {
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandom).generateSeed(numBytes)
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
}

View file

@ -3,13 +3,13 @@
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
/**
* Internal API
*/
class AES128CounterRNGSecure extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG()
private val rng = new AESCounterRNG(new SecureRandomSeedGenerator())
/**
* This is managed internally only
@ -30,6 +30,6 @@ class AES128CounterRNGSecure extends java.security.SecureRandomSpi {
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandomSeedGenerator()).generateSeed(numBytes)
}

View file

@ -3,18 +3,18 @@
*/
package akka.security.provider
import java.security.{ PrivilegedAction, AccessController, Provider }
import java.security.{ PrivilegedAction, AccessController, Provider, Security }
/**
* A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom
*/
final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] {
object AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
AccessController.doPrivileged(new PrivilegedAction[this.type] {
def run = {
//SecureRandom
put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast")
put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure")
put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure")
put("SecureRandom.AES128CounterRNGFast", classOf[AES128CounterRNGFast].getName)
put("SecureRandom.AES128CounterRNGSecure", classOf[AES128CounterRNGSecure].getName)
put("SecureRandom.AES256CounterRNGSecure", classOf[AES256CounterRNGSecure].getName)
//Implementation type: software or hardware
put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software")

View file

@ -9,12 +9,12 @@ import com.typesafe.config._
import akka.dispatch.{ Await, Future }
import akka.pattern.ask
import java.io.File
import akka.event.{ NoLogging, LoggingAdapter }
import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController }
import netty.{ NettySettings, NettySSLSupport }
import javax.net.ssl.SSLException
import akka.util.{ Timeout, Duration }
import akka.util.duration._
import akka.event.{ Logging, NoLogging, LoggingAdapter }
object Configuration {
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
@ -32,6 +32,7 @@ object Configuration {
remote.netty {
hostname = localhost
port = %d
ssl {
enable = on
trust-store = "%s"
@ -41,41 +42,40 @@ object Configuration {
sha1prng-random-source = "/dev/./urandom"
}
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345"
}
}
"""
def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try {
case class CipherConfig(runTest: Boolean, config: Config, cipher: String, localPort: Int, remotePort: Int)
if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled")
def getCipherConfig(cipher: String, enabled: String*): CipherConfig = {
val localPort, remotePort = { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() }
try {
//if (true) throw new IllegalArgumentException("Ticket1978*Spec isn't enabled")
val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", "))))
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty")
val settings = new NettySettings(fullConfig, "placeholder")
val config = ConfigFactory.parseString(conf.format(localPort, trustStore, keyStore, cipher, enabled.mkString(", ")))
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty")
val settings = new NettySettings(fullConfig, "placeholder")
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging)
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging)
rng.nextInt() // Has to work
settings.SSLRandomNumberGenerator foreach { sRng rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) }
rng.nextInt() // Has to work
settings.SSLRandomNumberGenerator foreach { sRng rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) }
val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine
val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet
val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get))
val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine
val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet
val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException("Protocol not supported: " + settings.SSLProtocol.get))
(cipher, true, config)
} catch {
case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S
CipherConfig(true, config, cipher, localPort, remotePort)
} catch {
case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) CipherConfig(false, AkkaSpec.testConf, cipher, localPort, remotePort) // Cannot match against the message since the message might be localized :S
}
}
}
import Configuration.getCipherConfig
import Configuration.{ CipherConfig, getCipherConfig }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@ -99,101 +99,47 @@ class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(g
class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf))
class Ticket1978CrappyRSAWithMD5OnlyHereToMakeSureThingsWorkSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "SSL_RSA_WITH_NULL_MD5"))
abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(CipherConfig(false, AkkaSpec.testConf, "NonExistingRNG", 12345, 12346))
implicit val timeout: Timeout = Timeout(5 seconds)
abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) extends AkkaSpec(cipherConfig.config) with ImplicitSender {
implicit val timeout: Timeout = Timeout(10 seconds)
import RemoteCommunicationSpec._
val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config))
lazy val other: ActorSystem = ActorSystem(
"remote-sys",
ConfigFactory.parseString("akka.remote.netty.port=" + cipherConfig.remotePort).withFallback(system.settings.config))
override def atTermination() {
other.shutdown()
other.awaitTermination()
if (cipherConfig.runTest) {
other.shutdown()
other.awaitTermination()
}
}
"SSL Remoting" must {
if (cipherEnabledconfig._2) {
val remote = other.actorOf(Props(new Actor { def receive = { case "ping" sender ! (("pong", sender)) } }), "echo")
("-") must {
if (cipherConfig.runTest) {
val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) sender ! ((("pong", x), sender)) } }), "echo")
val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
"support tell" in {
val here = system.actorFor(otherAddress.toString + "/user/echo")
"support remote look-ups" in {
here ! "ping"
expectMsgPF(timeout.duration) {
case ("pong", s: AnyRef) if s eq testActor true
}
}
"send error message for wrong address" ignore {
within(timeout.duration) {
EventFilter.error(start = "dropping", occurrences = 1).intercept {
system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
}(other)
}
for (i 1 to 1000) here ! (("ping", i))
for (i 1 to 1000) expectMsgPF(timeout.duration) { case (("pong", i), `testActor`) true }
}
"support ask" in {
Await.result(here ? "ping", timeout.duration) match {
case ("pong", s: akka.pattern.PromiseActorRef) // good
case m fail(m + " was not (pong, AskActorRef)")
}
val here = system.actorFor(otherAddress.toString + "/user/echo")
val f = for (i 1 to 1000) yield here ? (("ping", i)) mapTo manifest[((String, Int), ActorRef)]
Await.result(Future.sequence(f), timeout.duration).map(_._1._1).toSet must be(Set("pong"))
}
"send dead letters on remote if actor does not exist" in {
within(timeout.duration) {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(other)
}
}
"create and supervise children on remote node" in {
within(timeout.duration) {
val r = system.actorOf(Props[Echo], "blub")
r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
r ! new Exception("crash")
}(other)
expectMsg("preRestart")
r ! 42
expectMsg(42)
}
}
"look-up actors across node boundaries" in {
within(timeout.duration) {
val l = system.actorOf(Props(new Actor {
def receive = {
case (p: Props, n: String) sender ! context.actorOf(p, n)
case s: String sender ! context.actorFor(s)
}
}), "looker")
l ! (Props[Echo], "child")
val r = expectMsgType[ActorRef]
r ! (Props[Echo], "grandchild")
val remref = expectMsgType[ActorRef]
remref.isInstanceOf[LocalActorRef] must be(true)
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
myref.isInstanceOf[RemoteActorRef] must be(true)
myref ! 43
expectMsg(43)
lastSender must be theSameInstanceAs remref
r.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs r
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
}
}
"not fail ask across node boundaries" in {
val f = for (_ 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong"))
}
} else {
"not be run when the cipher is not supported by the platform this test is currently being executed on" ignore {

View file

@ -15,12 +15,7 @@ akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port = 12345
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
port = 0
}
}
""") with ImplicitSender with DefaultTimeout {