Merge pull request #476 from dev10/ticket_1978

Added changes to Netty pipelines to support SSL/TLS. Fixes #1978
This commit is contained in:
viktorklang 2012-06-15 04:34:08 -07:00
commit 5f77590eb8
16 changed files with 618 additions and 12 deletions

View file

@ -16,9 +16,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory
private[akka] class TestConductorTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends NettyRemoteTransport(_system, _provider) {
override def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
override def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout) :+ endpoint)
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout, isClient) :+ endpoint)
}
}

View file

@ -165,6 +165,50 @@ akka {
# (O) Maximum time window that a client should try to reconnect for
reconnection-time-window = 600s
ssl {
# (I&O) Enable SSL/TLS encryption.
# This must be enabled on both the client and server to work.
enable = off
# (I) This is the Java Key Store used by the server connection
key-store = "keystore"
# This password is used for decrypting the key store
key-store-password = "changeme"
# (O) This is the Java Key Store used by the client connection
trust-store = "truststore"
# This password is used for decrypting the trust store
trust-store-password = "changeme"
# (I&O) Protocol to use for SSL encryption, choose from:
# Java 6 & 7:
# 'SSLv3', 'TLSv1'
# Java 7:
# 'TLSv1.1', 'TLSv1.2'
protocol = "TLSv1"
# You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256
# More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"]
# Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to prevent blocking
# It is NOT as secure because it reuses the seed
# '' => defaults to /dev/random or whatever is set in java.security for example: securerandom.source=file:/dev/random
# '/dev/./urandom' => NOT '/dev/urandom' as that doesn't work according to: http://bugs.sun.com/view_bug.do?bug_id=6202721
sha1prng-random-source = ""
# There are three options, in increasing order of security:
# "" or SecureRandom => (default)
# "SHA1PRNG" => Can be slow because of blocking issues on Linux
# "AES128CounterRNGFast" => fastest startup and based on AES encryption algorithm
# The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java)
# "AES128CounterRNGSecure"
# "AES256CounterRNGSecure" (Install JCE Unlimited Strength Jurisdiction Policy Files first)
random-number-generator = ""
}
}
}
}

View file

@ -145,7 +145,7 @@ private[akka] class ActiveRemoteClient private[akka] (
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
val b = new ClientBootstrap(netty.clientChannelFactory)
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), true))
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true))
b.setOption("tcpNoDelay", true)
b.setOption("keepAlive", true)
b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)

View file

@ -64,17 +64,18 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
*
* @param withTimeout determines whether an IdleStateHandler shall be included
*/
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean): ChannelPipelineFactory =
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = apply(defaultStack(withTimeout) ++ endpoint)
def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint)
}
/**
* Construct a default protocol stack, excluding the head handler (i.e. the one which
* actually dispatches the received messages to the local target actors).
*/
def defaultStack(withTimeout: Boolean): Seq[ChannelHandler] =
(if (withTimeout) timeout :: Nil else Nil) :::
def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] =
(if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) :::
(if (withTimeout) timeout :: Nil else Nil) :::
msgFormat :::
authenticator :::
executionHandler ::
@ -122,8 +123,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* This method is factored out to provide an extension point in case the
* pipeline shall be changed. It is recommended to use
*/
def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
PipelineFactory(Seq(endpoint), withTimeout)
def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
PipelineFactory(Seq(endpoint), withTimeout, isClient)
private val remoteClients = new HashMap[Address, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock

View file

@ -0,0 +1,148 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.netty
import org.jboss.netty.handler.ssl.SslHandler
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
import akka.remote.RemoteTransportException
import akka.event.LoggingAdapter
import java.io.{ IOException, FileNotFoundException, FileInputStream }
import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security }
import akka.security.provider.AkkaProvider
/**
* Used for adding SSL support to Netty pipeline
* Internal use only
*/
private object NettySSLSupport {
/**
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
*/
def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler = {
if (isClient) initialiseClientSSL(settings, log)
else initialiseServerSSL(settings, log)
}
private def initialiseCustomSecureRandom(settings: NettySettings, log: LoggingAdapter): SecureRandom = {
/**
* According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721
* Using /dev/./urandom is only necessary when using SHA1PRNG on Linux
* <quote>Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")'</quote> to avoid having problems
*/
settings.SSLRandomSource foreach { path System.setProperty("java.security.egd", path) }
val rng = settings.SSLRandomNumberGenerator match {
case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure"))
log.debug("SSL random number generator set to: {}", r)
val akka = new AkkaProvider
Security.addProvider(akka)
SecureRandom.getInstance(r, akka)
case Some("SHA1PRNG")
log.debug("SSL random number generator set to: SHA1PRNG")
// This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
SecureRandom.getInstance("SHA1PRNG")
case Some(unknown)
log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
new SecureRandom
case None
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
new SecureRandom
}
// prevent stall on first access
rng.nextInt()
rng
}
private def initialiseClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Client SSL is enabled, initialising ...")
val sslContext: Option[SSLContext] = {
(settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol) match {
case (Some(trustStore), Some(password), Some(protocol)) constructClientContext(settings, log, trustStore, password, protocol)
case (trustStore, password, protocol)
val msg = "SSL trust store settings went missing. [trust-store: %s] [trust-store-password: %s] [protocol: %s]"
.format(trustStore, password, protocol)
throw new GeneralSecurityException(msg)
}
}
sslContext match {
case Some(context) {
log.debug("Using client SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(true)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
}
case None {
val msg = "Failed to initialise client SSL because SSL context could not be found. " +
"Make sure your settings are correct: [trust-store: %s] [trust-store-password: %s] [protocol: %s]"
.format(settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol)
throw new GeneralSecurityException(msg)
}
}
}
private def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = {
try {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
val stream = new FileInputStream(trustStorePath)
trustStore.load(stream, trustStorePassword.toCharArray)
trustManagerFactory.init(trustStore)
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
val sslContext = SSLContext.getInstance(protocol)
sslContext.init(null, trustManagers, initialiseCustomSecureRandom(settings, log))
Some(sslContext)
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e)
}
}
private def initialiseServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Server SSL is enabled, initialising ...")
val sslContext: Option[SSLContext] = {
(settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol) match {
case (Some(keyStore), Some(password), Some(protocol)) constructServerContext(settings, log, keyStore, password, protocol)
case (keyStore, password, protocol)
val msg = "SSL key store settings went missing. [key-store: %s] [key-store-password: %s] [protocol: %s]".format(keyStore, password, protocol)
throw new GeneralSecurityException(msg)
}
}
sslContext match {
case Some(context) {
log.debug("Using server SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(false)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
}
case None {
val msg = "Failed to initialise server SSL because SSL context could not be found. " +
"Make sure your settings are correct: [key-store: %s] [key-store-password: %s] [protocol: %s]"
.format(settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol)
throw new GeneralSecurityException(msg)
}
}
}
private def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = {
try {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
val stream = new FileInputStream(keyStorePath)
keyStore.load(stream, keyStorePassword.toCharArray)
factory.init(keyStore, keyStorePassword.toCharArray)
val sslContext = SSLContext.getInstance(protocol)
sslContext.init(factory.getKeyManagers, null, initialiseCustomSecureRandom(settings, log))
Some(sslContext)
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
}
}
}

View file

@ -12,7 +12,6 @@ import org.jboss.netty.channel.group.ChannelGroup
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.execution.ExecutionHandler
import akka.event.Logging
import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage }
import akka.actor.Address
@ -40,7 +39,7 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
private val bootstrap = {
val b = new ServerBootstrap(factory)
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), false))
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false))
b.setOption("backlog", settings.Backlog)
b.setOption("tcpNoDelay", true)
b.setOption("child.keepAlive", true)

View file

@ -86,4 +86,55 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
case sz sz
}
val SSLKeyStore = getString("ssl.key-store") match {
case "" None
case keyStore Some(keyStore)
}
val SSLTrustStore = getString("ssl.trust-store") match {
case "" None
case trustStore Some(trustStore)
}
val SSLKeyStorePassword = getString("ssl.key-store-password") match {
case "" None
case password Some(password)
}
val SSLTrustStorePassword = getString("ssl.trust-store-password") match {
case "" None
case password Some(password)
}
val SSLSupportedAlgorithms = getStringList("ssl.supported-algorithms")
val SSLProtocol = getString("ssl.protocol") match {
case "" None
case protocol Some(protocol)
}
val SSLRandomSource = getString("ssl.sha1prng-random-source") match {
case "" None
case path Some(path)
}
val SSLRandomNumberGenerator = getString("ssl.random-number-generator") match {
case "" None
case rng Some(rng)
}
val EnableSSL = {
val enableSSL = getBoolean("ssl.enable")
if (enableSSL) {
if (SSLProtocol.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no protocol is defined in 'akka.remote.netty.ssl.protocol'.")
if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no key/trust store is defined in 'akka.remote.netty.ssl.key-store' / 'akka.remote.netty.ssl.trust-store'.")
if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl.key-store-password'.")
if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl.trust-store-password'.")
}
enableSSL
}
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
import java.security.SecureRandom
/**
* Internal API
*/
class AES128CounterRNGFast extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG(new SecureRandomSeedGenerator())
/**
* This is managed internally only
*/
protected def engineSetSeed(seed: Array[Byte]) {
}
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
protected def engineNextBytes(bytes: Array[Byte]) {
rng.nextBytes(bytes)
}
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
protected def engineGenerateSeed(numBytes: Int): Array[Byte] = {
(new SecureRandom).generateSeed(numBytes)
}
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
/**
* Internal API
*/
class AES128CounterRNGSecure extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG()
/**
* This is managed internally only
*/
protected def engineSetSeed(seed: Array[Byte]) {
}
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
protected def engineNextBytes(bytes: Array[Byte]) {
rng.nextBytes(bytes)
}
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
protected def engineGenerateSeed(numBytes: Int): Array[Byte] = {
DefaultSeedGenerator.getInstance.generateSeed(numBytes)
}
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
/**
* Internal API
*/
class AES256CounterRNGSecure extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG(32)
/**
* This is managed internally only
*/
protected def engineSetSeed(seed: Array[Byte]) {
}
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
protected def engineNextBytes(bytes: Array[Byte]) {
rng.nextBytes(bytes)
}
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
protected def engineGenerateSeed(numBytes: Int): Array[Byte] = {
DefaultSeedGenerator.getInstance.generateSeed(numBytes)
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import java.security.{ PrivilegedAction, AccessController, Provider }
/**
* A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom
*/
final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] {
def run = {
/**
* SecureRandom
*/
put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast")
put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure")
put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure")
/**
* Implementation type: software or hardware
*/
put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software")
put("SecureRandom.AES128CounterRNGSecure ImplementedIn", "Software")
put("SecureRandom.AES256CounterRNGSecure ImplementedIn", "Software")
null
}
})
}

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.dispatch.{ Await, Future }
import akka.pattern.ask
import java.io.File
import java.security.{ PrivilegedAction, AccessController }
object Configuration {
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
// The certificate will expire in 2109
private val trustStore = getPath("truststore")
private val keyStore = getPath("keystore")
private def getPath(name: String): String = (new File("akka-remote/src/test/resources/" + name)).getAbsolutePath.replace("\\", "\\\\")
private val conf = """
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port = 12345
ssl {
enable = on
trust-store = "%s"
key-store = "%s"
random-number-generator = "%s"
}
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345"
}
}
"""
def getConfig(rng: String): String = {
conf.format(trustStore, keyStore, rng)
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978SHA1PRNG extends Ticket1978CommunicationSpec(Configuration.getConfig("SHA1PRNG"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGFast extends Ticket1978CommunicationSpec(Configuration.getConfig("AES128CounterRNGFast"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGSecure extends Ticket1978CommunicationSpec(Configuration.getConfig("AES128CounterRNGSecure"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES256CounterRNGSecure extends Ticket1978CommunicationSpec(Configuration.getConfig("AES256CounterRNGSecure"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978CommunicationSpec(val configuration: String)
extends AkkaSpec(configuration) with ImplicitSender with DefaultTimeout {
import RemoteCommunicationSpec._
// default SecureRandom RNG
def this() = this(Configuration.getConfig(""))
val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)
val other = ActorSystem("remote-sys", conf)
val remote = other.actorOf(Props(new Actor {
def receive = {
case "ping" sender ! (("pong", sender))
}
}), "echo")
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
override def atTermination() {
other.shutdown()
}
"SSL Remoting" must {
"support remote look-ups" in {
here ! "ping"
expectMsgPF() {
case ("pong", s: AnyRef) if s eq testActor true
}
}
"send error message for wrong address" in {
EventFilter.error(start = "dropping", occurrences = 1).intercept {
system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
}(other)
}
"support ask" in {
Await.result(here ? "ping", timeout.duration) match {
case ("pong", s: akka.pattern.PromiseActorRef) // good
case m fail(m + " was not (pong, AskActorRef)")
}
}
"send dead letters on remote if actor does not exist" in {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(other)
}
"create and supervise children on remote node" in {
val r = system.actorOf(Props[Echo], "blub")
r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
r ! new Exception("crash")
}(other)
expectMsg("preRestart")
r ! 42
expectMsg(42)
system.stop(r)
expectMsg("postStop")
}
"look-up actors across node boundaries" in {
val l = system.actorOf(Props(new Actor {
def receive = {
case (p: Props, n: String) sender ! context.actorOf(p, n)
case s: String sender ! context.actorFor(s)
}
}), "looker")
l ! (Props[Echo], "child")
val r = expectMsgType[ActorRef]
r ! (Props[Echo], "grandchild")
val remref = expectMsgType[ActorRef]
remref.isInstanceOf[LocalActorRef] must be(true)
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
myref.isInstanceOf[RemoteActorRef] must be(true)
myref ! 43
expectMsg(43)
lastSender must be theSameInstanceAs remref
r.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs r
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
}
"not fail ask across node boundaries" in {
val f = for (_ 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong"))
}
}
}

View file

@ -0,0 +1,48 @@
package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.actor.ExtendedActorSystem
import akka.util.duration._
import akka.util.Duration
import akka.remote.netty.NettyRemoteTransport
import java.util.ArrayList
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978ConfigSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port = 12345
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender with DefaultTimeout {
"SSL Remoting" must {
"be able to parse these extra Netty config elements" in {
val settings =
system.asInstanceOf[ExtendedActorSystem]
.provider.asInstanceOf[RemoteActorRefProvider]
.transport.asInstanceOf[NettyRemoteTransport]
.settings
import settings._
EnableSSL must be(false)
SSLKeyStore must be(Some("keystore"))
SSLKeyStorePassword must be(Some("changeme"))
SSLTrustStore must be(Some("truststore"))
SSLTrustStorePassword must be(Some("changeme"))
SSLProtocol must be(Some("TLSv1"))
SSLSupportedAlgorithms must be(java.util.Arrays.asList("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
SSLRandomSource must be(None)
SSLRandomNumberGenerator must be(None)
}
}
}

View file

@ -452,7 +452,7 @@ object Dependencies {
)
val remote = Seq(
netty, protobuf, Test.junit, Test.scalatest
netty, protobuf, uncommonsMath, Test.junit, Test.scalatest
)
val cluster = Seq(Test.junit, Test.scalatest)
@ -490,6 +490,7 @@ object Dependency {
val ScalaStm = "0.5"
val Scalatest = "1.6.1"
val Slf4j = "1.6.4"
val UncommonsMath = "1.2.2a"
}
// Compile
@ -499,6 +500,7 @@ object Dependency {
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % V.UncommonsMath // ApacheV2
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.6" // ApacheV2
// Test