Merge branch 'master' into wip-2162-redesign-of-management-of-the-exiting-to-removed-life-cycle-jboner
This commit is contained in:
commit
6d96d04234
27 changed files with 1125 additions and 65 deletions
|
|
@ -875,3 +875,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class
|
|||
protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message))
|
||||
protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message))
|
||||
}
|
||||
|
||||
private[akka] object NoLogging extends LoggingAdapter {
|
||||
def isErrorEnabled = false
|
||||
def isWarningEnabled = false
|
||||
def isInfoEnabled = false
|
||||
def isDebugEnabled = false
|
||||
|
||||
protected def notifyError(message: String): Unit = ()
|
||||
protected def notifyError(cause: Throwable, message: String): Unit = ()
|
||||
protected def notifyWarning(message: String): Unit = ()
|
||||
protected def notifyInfo(message: String): Unit = ()
|
||||
protected def notifyDebug(message: String): Unit = ()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,6 +132,15 @@ object Member {
|
|||
case _ ⇒ None
|
||||
}
|
||||
|
||||
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
|
||||
// group all members by Address => Seq[Member]
|
||||
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
|
||||
// pick highest MemberStatus
|
||||
(Set.empty[Member] /: groupedByAddress) {
|
||||
case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Picks the Member with the highest "priority" MemberStatus.
|
||||
*/
|
||||
|
|
@ -144,8 +153,8 @@ object Member {
|
|||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Up, Joining) ⇒ m1
|
||||
case (Joining, Up) ⇒ m2
|
||||
case (Up, Joining) ⇒ m2
|
||||
case (Joining, Up) ⇒ m1
|
||||
case (Joining, Joining) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
}
|
||||
|
|
@ -287,21 +296,12 @@ case class Gossip(
|
|||
// 2. merge meta-data
|
||||
val mergedMeta = this.meta ++ that.meta
|
||||
|
||||
def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
|
||||
// group all members by Address => Seq[Member]
|
||||
val groupedByAddress = (a ++ b).groupBy(_.address)
|
||||
// pick highest MemberStatus
|
||||
(Set.empty[Member] /: groupedByAddress) {
|
||||
case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
|
||||
val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
|
||||
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
|
||||
|
||||
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||
// and exclude unreachable
|
||||
val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq).
|
||||
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).
|
||||
filterNot(mergedUnreachable.contains)
|
||||
|
||||
// 5. fresh seen table
|
||||
|
|
@ -546,24 +546,28 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
}
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
|
||||
gossip()
|
||||
}
|
||||
private val gossipTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
gossip()
|
||||
}
|
||||
|
||||
// start periodic heartbeat to all nodes in cluster
|
||||
private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) {
|
||||
heartbeat()
|
||||
}
|
||||
private val heartbeatTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
|
||||
heartbeat()
|
||||
}
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
|
||||
reapUnreachableMembers()
|
||||
}
|
||||
private val failureDetectorReaperTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
|
||||
reapUnreachableMembers()
|
||||
}
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) {
|
||||
leaderActions()
|
||||
}
|
||||
private val leaderActionsTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
|
||||
leaderActions()
|
||||
}
|
||||
|
||||
createMBean()
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ abstract class ConvergenceSpec
|
|||
|
||||
"A cluster of 3 members" must {
|
||||
|
||||
"reach initial convergence" taggedAs LongRunningTest ignore {
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
runOn(fourth) {
|
||||
|
|
@ -49,7 +49,7 @@ abstract class ConvergenceSpec
|
|||
testConductor.enter("after-1")
|
||||
}
|
||||
|
||||
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
|
||||
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in {
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("before-shutdown")
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ abstract class ConvergenceSpec
|
|||
testConductor.enter("after-2")
|
||||
}
|
||||
|
||||
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
|
||||
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in {
|
||||
runOn(fourth) {
|
||||
// try to join
|
||||
cluster.join(node(first).address)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
|
|||
val c1 = role("c1")
|
||||
val c2 = role("c2")
|
||||
|
||||
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task interval
|
||||
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
|
||||
unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.leader-actions-interval = 5 s
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 30 s
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,12 +5,15 @@ package akka.cluster
|
|||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.{Address, ExtendedActorSystem}
|
||||
import akka.actor.{ Address, ExtendedActorSystem }
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import org.scalatest.Suite
|
||||
import org.scalatest.TestFailedException
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
object MultiNodeClusterSpec {
|
||||
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||
|
|
@ -29,10 +32,28 @@ object MultiNodeClusterSpec {
|
|||
""")
|
||||
}
|
||||
|
||||
trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec ⇒
|
||||
trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
// Cluster tests are written so that if previous step (test method) failed
|
||||
// it will most likely not be possible to run next step. This ensures
|
||||
// fail fast of steps after the first failure.
|
||||
private var failed = false
|
||||
override protected def withFixture(test: NoArgTest): Unit = try {
|
||||
if (failed) {
|
||||
val e = new TestFailedException("Previous step failed", 0)
|
||||
// short stack trace
|
||||
e.setStackTrace(e.getStackTrace.take(1))
|
||||
throw e
|
||||
}
|
||||
super.withFixture(test)
|
||||
} catch {
|
||||
case t ⇒
|
||||
failed = true
|
||||
throw t
|
||||
}
|
||||
|
||||
/**
|
||||
* The cluster node instance. Needs to be lazily created.
|
||||
*/
|
||||
|
|
@ -151,6 +172,6 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
|
|||
}
|
||||
|
||||
def roleName(address: Address): Option[RoleName] = {
|
||||
testConductor.getNodes.await.find(node(_).address == address)
|
||||
roles.find(node(_).address == address)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 30 s
|
||||
unreachable-nodes-reaper-interval = 300 s # turn "off"
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
|
|
|||
|
|
@ -21,18 +21,17 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
gossip-interval = 400 ms
|
||||
nr-of-deputy-nodes = 0
|
||||
}
|
||||
akka.loglevel = INFO
|
||||
"""))
|
||||
}
|
||||
|
||||
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
|
||||
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
|
||||
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
|
||||
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
|
||||
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
|
||||
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
|
||||
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
|
||||
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
|
||||
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
|
||||
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
|
||||
|
||||
abstract class SunnyWeatherSpec
|
||||
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,435 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.actor.Address
|
||||
import akka.remote.testconductor.RoleName
|
||||
import MemberStatus._
|
||||
|
||||
object TransitionMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString(
|
||||
"akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
|
||||
abstract class TransitionSpec
|
||||
extends MultiNodeSpec(TransitionMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import TransitionMultiJvmSpec._
|
||||
|
||||
// sorted in the order used by the cluster
|
||||
def leader(roles: RoleName*) = roles.sorted.head
|
||||
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
|
||||
|
||||
def memberStatus(address: Address): MemberStatus = {
|
||||
val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst {
|
||||
case m if m.address == address ⇒ m.status
|
||||
}
|
||||
statusOption must not be (None)
|
||||
statusOption.get
|
||||
}
|
||||
|
||||
def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address)
|
||||
|
||||
def members: Set[RoleName] = memberAddresses.flatMap(roleName(_))
|
||||
|
||||
def seenLatestGossip: Set[RoleName] = {
|
||||
val gossip = cluster.latestGossip
|
||||
gossip.overview.seen.collect {
|
||||
case (address, v) if v == gossip.version ⇒ roleName(address)
|
||||
}.flatten.toSet
|
||||
}
|
||||
|
||||
def awaitSeen(addresses: Address*): Unit = awaitCond {
|
||||
seenLatestGossip.map(node(_).address) == addresses.toSet
|
||||
}
|
||||
|
||||
def awaitMembers(addresses: Address*): Unit = awaitCond {
|
||||
memberAddresses == addresses.toSet
|
||||
}
|
||||
|
||||
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond {
|
||||
memberStatus(address) == Up
|
||||
}
|
||||
|
||||
// implicit conversion from RoleName to Address
|
||||
implicit def role2Address(role: RoleName): Address = node(role).address
|
||||
|
||||
// DSL sugar for `role1 gossipTo role2`
|
||||
implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role)
|
||||
var gossipBarrierCounter = 0
|
||||
class RoleWrapper(fromRole: RoleName) {
|
||||
def gossipTo(toRole: RoleName): Unit = {
|
||||
gossipBarrierCounter += 1
|
||||
runOn(toRole) {
|
||||
val g = cluster.latestGossip
|
||||
testConductor.enter("before-gossip-" + gossipBarrierCounter)
|
||||
awaitCond(cluster.latestGossip != g) // received gossip
|
||||
testConductor.enter("after-gossip-" + gossipBarrierCounter)
|
||||
}
|
||||
runOn(fromRole) {
|
||||
testConductor.enter("before-gossip-" + gossipBarrierCounter)
|
||||
cluster.gossipTo(node(toRole).address) // send gossip
|
||||
testConductor.enter("after-gossip-" + gossipBarrierCounter)
|
||||
}
|
||||
runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) {
|
||||
testConductor.enter("before-gossip-" + gossipBarrierCounter)
|
||||
testConductor.enter("after-gossip-" + gossipBarrierCounter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"A Cluster" must {
|
||||
|
||||
"start nodes as singleton clusters" taggedAs LongRunningTest in {
|
||||
|
||||
startClusterNode()
|
||||
cluster.isSingletonCluster must be(true)
|
||||
cluster.status must be(Joining)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
cluster.leaderActions()
|
||||
cluster.status must be(Up)
|
||||
|
||||
testConductor.enter("after-1")
|
||||
}
|
||||
|
||||
"perform correct transitions when second joining first" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(first)
|
||||
}
|
||||
runOn(first) {
|
||||
awaitMembers(first, second)
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Joining)
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
testConductor.enter("second-joined")
|
||||
|
||||
first gossipTo second
|
||||
runOn(second) {
|
||||
members must be(Set(first, second))
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Joining)
|
||||
// we got a conflicting version in second, and therefore not convergence in second
|
||||
seenLatestGossip must be(Set(second))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
|
||||
second gossipTo first
|
||||
runOn(first) {
|
||||
seenLatestGossip must be(Set(first, second))
|
||||
}
|
||||
|
||||
first gossipTo second
|
||||
runOn(second) {
|
||||
seenLatestGossip must be(Set(first, second))
|
||||
}
|
||||
|
||||
runOn(first, second) {
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Joining)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
testConductor.enter("convergence-joining-2")
|
||||
|
||||
runOn(leader(first, second)) {
|
||||
cluster.leaderActions()
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
}
|
||||
testConductor.enter("leader-actions-2")
|
||||
|
||||
leader(first, second) gossipTo nonLeader(first, second).head
|
||||
runOn(nonLeader(first, second).head) {
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
seenLatestGossip must be(Set(first, second))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
nonLeader(first, second).head gossipTo leader(first, second)
|
||||
runOn(first, second) {
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
seenLatestGossip must be(Set(first, second))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
testConductor.enter("after-2")
|
||||
}
|
||||
|
||||
"perform correct transitions when third joins second" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(third) {
|
||||
cluster.join(second)
|
||||
}
|
||||
runOn(second) {
|
||||
awaitMembers(first, second, third)
|
||||
cluster.convergence.isDefined must be(false)
|
||||
memberStatus(third) must be(Joining)
|
||||
seenLatestGossip must be(Set(second))
|
||||
}
|
||||
testConductor.enter("third-joined-second")
|
||||
|
||||
second gossipTo first
|
||||
runOn(first) {
|
||||
members must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
memberStatus(third) must be(Joining)
|
||||
}
|
||||
|
||||
first gossipTo third
|
||||
runOn(third) {
|
||||
members must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
memberStatus(third) must be(Joining)
|
||||
// conflicting version
|
||||
seenLatestGossip must be(Set(third))
|
||||
}
|
||||
|
||||
third gossipTo first
|
||||
third gossipTo second
|
||||
runOn(first, second) {
|
||||
seenLatestGossip must be(Set(myself, third))
|
||||
}
|
||||
|
||||
first gossipTo second
|
||||
runOn(second) {
|
||||
seenLatestGossip must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
runOn(first, third) {
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
|
||||
second gossipTo first
|
||||
second gossipTo third
|
||||
runOn(first, second, third) {
|
||||
seenLatestGossip must be(Set(first, second, third))
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
memberStatus(third) must be(Joining)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
testConductor.enter("convergence-joining-3")
|
||||
|
||||
runOn(leader(first, second, third)) {
|
||||
cluster.leaderActions()
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
memberStatus(third) must be(Up)
|
||||
}
|
||||
testConductor.enter("leader-actions-3")
|
||||
|
||||
// leader gossipTo first non-leader
|
||||
leader(first, second, third) gossipTo nonLeader(first, second, third).head
|
||||
runOn(nonLeader(first, second, third).head) {
|
||||
memberStatus(third) must be(Up)
|
||||
seenLatestGossip must be(Set(leader(first, second, third), myself))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
|
||||
// first non-leader gossipTo the other non-leader
|
||||
nonLeader(first, second, third).head gossipTo nonLeader(first, second, third).tail.head
|
||||
runOn(nonLeader(first, second, third).head) {
|
||||
cluster.gossipTo(node(nonLeader(first, second, third).tail.head).address)
|
||||
}
|
||||
runOn(nonLeader(first, second, third).tail.head) {
|
||||
memberStatus(third) must be(Up)
|
||||
seenLatestGossip must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
// and back again
|
||||
nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head
|
||||
runOn(nonLeader(first, second, third).head) {
|
||||
memberStatus(third) must be(Up)
|
||||
seenLatestGossip must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
// first non-leader gossipTo the leader
|
||||
nonLeader(first, second, third).head gossipTo leader(first, second, third)
|
||||
runOn(first, second, third) {
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
memberStatus(third) must be(Up)
|
||||
seenLatestGossip must be(Set(first, second, third))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
testConductor.enter("after-3")
|
||||
}
|
||||
|
||||
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
|
||||
runOn(fourth) {
|
||||
cluster.join(fifth)
|
||||
awaitMembers(fourth, fifth)
|
||||
cluster.gossipTo(fifth)
|
||||
awaitSeen(fourth, fifth)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
runOn(fifth) {
|
||||
awaitMembers(fourth, fifth)
|
||||
cluster.gossipTo(fourth)
|
||||
awaitSeen(fourth, fifth)
|
||||
cluster.gossipTo(fourth)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
testConductor.enter("fourth-joined-fifth")
|
||||
|
||||
testConductor.enter("after-4")
|
||||
}
|
||||
|
||||
"perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(fourth) {
|
||||
cluster.join(third)
|
||||
}
|
||||
runOn(third) {
|
||||
awaitMembers(first, second, third, fourth)
|
||||
seenLatestGossip must be(Set(third))
|
||||
}
|
||||
testConductor.enter("fourth-joined-third")
|
||||
|
||||
third gossipTo second
|
||||
runOn(second) {
|
||||
seenLatestGossip must be(Set(second, third))
|
||||
}
|
||||
|
||||
second gossipTo fourth
|
||||
runOn(fourth) {
|
||||
members must be(roles.toSet)
|
||||
// merge conflict
|
||||
seenLatestGossip must be(Set(fourth))
|
||||
}
|
||||
|
||||
fourth gossipTo first
|
||||
fourth gossipTo second
|
||||
fourth gossipTo third
|
||||
fourth gossipTo fifth
|
||||
runOn(first, second, third, fifth) {
|
||||
members must be(roles.toSet)
|
||||
seenLatestGossip must be(Set(fourth, myself))
|
||||
}
|
||||
|
||||
first gossipTo fifth
|
||||
runOn(fifth) {
|
||||
seenLatestGossip must be(Set(first, fourth, fifth))
|
||||
}
|
||||
|
||||
fifth gossipTo third
|
||||
runOn(third) {
|
||||
seenLatestGossip must be(Set(first, third, fourth, fifth))
|
||||
}
|
||||
|
||||
third gossipTo second
|
||||
runOn(second) {
|
||||
seenLatestGossip must be(roles.toSet)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
second gossipTo first
|
||||
second gossipTo third
|
||||
second gossipTo fourth
|
||||
third gossipTo fifth
|
||||
|
||||
seenLatestGossip must be(roles.toSet)
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
memberStatus(third) must be(Up)
|
||||
memberStatus(fourth) must be(Joining)
|
||||
memberStatus(fifth) must be(Up)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
|
||||
testConductor.enter("convergence-joining-3")
|
||||
|
||||
runOn(leader(roles: _*)) {
|
||||
cluster.leaderActions()
|
||||
memberStatus(fourth) must be(Up)
|
||||
seenLatestGossip must be(Set(myself))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
// spread the word
|
||||
for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) {
|
||||
x gossipTo y
|
||||
}
|
||||
|
||||
testConductor.enter("spread-5")
|
||||
|
||||
seenLatestGossip must be(roles.toSet)
|
||||
memberStatus(first) must be(Up)
|
||||
memberStatus(second) must be(Up)
|
||||
memberStatus(third) must be(Up)
|
||||
memberStatus(fourth) must be(Up)
|
||||
memberStatus(fifth) must be(Up)
|
||||
cluster.convergence.isDefined must be(true)
|
||||
|
||||
testConductor.enter("after-5")
|
||||
}
|
||||
|
||||
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in {
|
||||
runOn(fifth) {
|
||||
markNodeAsUnavailable(second)
|
||||
cluster.reapUnreachableMembers()
|
||||
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
|
||||
seenLatestGossip must be(Set(fifth))
|
||||
}
|
||||
|
||||
// spread the word
|
||||
val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth)
|
||||
for (x :: y :: Nil ← gossipRound.sliding(2)) {
|
||||
x gossipTo y
|
||||
}
|
||||
|
||||
runOn((roles.filterNot(_ == second)): _*) {
|
||||
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
|
||||
cluster.convergence.isDefined must be(false)
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
cluster.down(second)
|
||||
awaitMemberStatus(second, Down)
|
||||
}
|
||||
|
||||
// spread the word
|
||||
val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)
|
||||
for (x :: y :: Nil ← gossipRound2.sliding(2)) {
|
||||
x gossipTo y
|
||||
}
|
||||
|
||||
runOn((roles.filterNot(_ == second)): _*) {
|
||||
cluster.latestGossip.overview.unreachable must contain(Member(second, Down))
|
||||
memberStatus(second) must be(Down)
|
||||
seenLatestGossip must be(Set(first, third, fourth, fifth))
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
testConductor.enter("after-6")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = SortedSet(a2, c2, e2))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.members must be(SortedSet(a1, c1, e2))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
|
||||
merged1.members must be(SortedSet(a2, c1, e1))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.members must be(SortedSet(a1, c1, e2))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
|
||||
merged2.members must be(SortedSet(a2, c1, e1))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.overview.unreachable must be(Set(a1, b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
|
||||
merged1.overview.unreachable must be(Set(a2, b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.overview.unreachable must be(Set(a1, b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
|
||||
merged2.overview.unreachable must be(Set(a2, b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2)))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.members must be(SortedSet(a1))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Up))
|
||||
merged1.members must be(SortedSet(a2))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Joining))
|
||||
merged1.overview.unreachable must be(Set(b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.members must be(SortedSet(a1))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Up))
|
||||
merged2.members must be(SortedSet(a2))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Joining))
|
||||
merged2.overview.unreachable must be(Set(b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
|
||||
|
||||
|
|
|
|||
|
|
@ -16,9 +16,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory
|
|||
private[akka] class TestConductorTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
|
||||
extends NettyRemoteTransport(_system, _provider) {
|
||||
|
||||
override def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
|
||||
override def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
|
||||
new ChannelPipelineFactory {
|
||||
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout) :+ endpoint)
|
||||
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout, isClient) :+ endpoint)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -165,6 +165,52 @@ akka {
|
|||
|
||||
# (O) Maximum time window that a client should try to reconnect for
|
||||
reconnection-time-window = 600s
|
||||
|
||||
ssl {
|
||||
# (I&O) Enable SSL/TLS encryption.
|
||||
# This must be enabled on both the client and server to work.
|
||||
enable = off
|
||||
|
||||
# (I) This is the Java Key Store used by the server connection
|
||||
key-store = "keystore"
|
||||
|
||||
# This password is used for decrypting the key store
|
||||
key-store-password = "changeme"
|
||||
|
||||
# (O) This is the Java Key Store used by the client connection
|
||||
trust-store = "truststore"
|
||||
|
||||
# This password is used for decrypting the trust store
|
||||
trust-store-password = "changeme"
|
||||
|
||||
# (I&O) Protocol to use for SSL encryption, choose from:
|
||||
# Java 6 & 7:
|
||||
# 'SSLv3', 'TLSv1'
|
||||
# Java 7:
|
||||
# 'TLSv1.1', 'TLSv1.2'
|
||||
protocol = "TLSv1"
|
||||
|
||||
# Examples: [ "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA" ]
|
||||
# You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256
|
||||
# More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
|
||||
supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
|
||||
|
||||
# Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to prevent blocking
|
||||
# It is NOT as secure because it reuses the seed
|
||||
# '' => defaults to /dev/random or whatever is set in java.security for example: securerandom.source=file:/dev/random
|
||||
# '/dev/./urandom' => NOT '/dev/urandom' as that doesn't work according to: http://bugs.sun.com/view_bug.do?bug_id=6202721
|
||||
sha1prng-random-source = ""
|
||||
|
||||
# There are three options, in increasing order of security:
|
||||
# "" or SecureRandom => (default)
|
||||
# "SHA1PRNG" => Can be slow because of blocking issues on Linux
|
||||
# "AES128CounterRNGFast" => fastest startup and based on AES encryption algorithm
|
||||
# The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java)
|
||||
# "AES128CounterRNGSecure"
|
||||
# "AES256CounterRNGSecure" (Install JCE Unlimited Strength Jurisdiction Policy Files first)
|
||||
# Setting a value here may require you to supply the appropriate cipher suite (see supported-algorithms section above)
|
||||
random-number-generator = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ private[akka] class ActiveRemoteClient private[akka] (
|
|||
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
|
||||
|
||||
val b = new ClientBootstrap(netty.clientChannelFactory)
|
||||
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), true))
|
||||
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true))
|
||||
b.setOption("tcpNoDelay", true)
|
||||
b.setOption("keepAlive", true)
|
||||
b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)
|
||||
|
|
|
|||
|
|
@ -64,17 +64,18 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
|
|||
*
|
||||
* @param withTimeout determines whether an IdleStateHandler shall be included
|
||||
*/
|
||||
def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean): ChannelPipelineFactory =
|
||||
def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
|
||||
new ChannelPipelineFactory {
|
||||
def getPipeline = apply(defaultStack(withTimeout) ++ endpoint)
|
||||
def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint)
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a default protocol stack, excluding the “head” handler (i.e. the one which
|
||||
* actually dispatches the received messages to the local target actors).
|
||||
*/
|
||||
def defaultStack(withTimeout: Boolean): Seq[ChannelHandler] =
|
||||
(if (withTimeout) timeout :: Nil else Nil) :::
|
||||
def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] =
|
||||
(if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) :::
|
||||
(if (withTimeout) timeout :: Nil else Nil) :::
|
||||
msgFormat :::
|
||||
authenticator :::
|
||||
executionHandler ::
|
||||
|
|
@ -122,8 +123,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
|
|||
* This method is factored out to provide an extension point in case the
|
||||
* pipeline shall be changed. It is recommended to use
|
||||
*/
|
||||
def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
|
||||
PipelineFactory(Seq(endpoint), withTimeout)
|
||||
def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
|
||||
PipelineFactory(Seq(endpoint), withTimeout, isClient)
|
||||
|
||||
private val remoteClients = new HashMap[Address, RemoteClient]
|
||||
private val clientsLock = new ReentrantReadWriteLock
|
||||
|
|
|
|||
|
|
@ -0,0 +1,133 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.remote.netty
|
||||
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
|
||||
import akka.remote.RemoteTransportException
|
||||
import akka.event.LoggingAdapter
|
||||
import java.io.{ IOException, FileNotFoundException, FileInputStream }
|
||||
import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security }
|
||||
import akka.security.provider.AkkaProvider
|
||||
|
||||
/**
|
||||
* Used for adding SSL support to Netty pipeline
|
||||
* Internal use only
|
||||
*/
|
||||
private[akka] object NettySSLSupport {
|
||||
/**
|
||||
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
|
||||
*/
|
||||
def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler =
|
||||
if (isClient) initialiseClientSSL(settings, log) else initialiseServerSSL(settings, log)
|
||||
|
||||
def initialiseCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = {
|
||||
/**
|
||||
* According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721
|
||||
* Using /dev/./urandom is only necessary when using SHA1PRNG on Linux
|
||||
* <quote>Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")'</quote> to avoid having problems
|
||||
*/
|
||||
sourceOfRandomness foreach { path ⇒ System.setProperty("java.security.egd", path) }
|
||||
|
||||
val rng = rngName match {
|
||||
case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure")) ⇒
|
||||
log.debug("SSL random number generator set to: {}", r)
|
||||
val akka = new AkkaProvider
|
||||
Security.addProvider(akka)
|
||||
SecureRandom.getInstance(r, akka)
|
||||
case Some("SHA1PRNG") ⇒
|
||||
log.debug("SSL random number generator set to: SHA1PRNG")
|
||||
// This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
|
||||
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
|
||||
SecureRandom.getInstance("SHA1PRNG")
|
||||
case Some(unknown) ⇒
|
||||
log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
|
||||
new SecureRandom
|
||||
case None ⇒
|
||||
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
|
||||
new SecureRandom
|
||||
}
|
||||
rng.nextInt() // prevent stall on first access
|
||||
rng
|
||||
}
|
||||
|
||||
private def initialiseClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
|
||||
log.debug("Client SSL is enabled, initialising ...")
|
||||
((settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol) match {
|
||||
case (Some(trustStore), Some(password), Some(protocol)) ⇒ constructClientContext(settings, log, trustStore, password, protocol)
|
||||
case (trustStore, password, protocol) ⇒ throw new GeneralSecurityException(
|
||||
"One or several SSL trust store settings are missing: [trust-store: %s] [trust-store-password: %s] [protocol: %s]".format(
|
||||
trustStore,
|
||||
password,
|
||||
protocol))
|
||||
}) match {
|
||||
case Some(context) ⇒
|
||||
log.debug("Using client SSL context to create SSLEngine ...")
|
||||
val sslEngine = context.createSSLEngine
|
||||
sslEngine.setUseClientMode(true)
|
||||
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
|
||||
new SslHandler(sslEngine)
|
||||
case None ⇒
|
||||
throw new GeneralSecurityException(
|
||||
"""Failed to initialise client SSL because SSL context could not be found." +
|
||||
"Make sure your settings are correct: [trust-store: %s] [trust-store-password: %s] [protocol: %s]""".format(
|
||||
settings.SSLTrustStore,
|
||||
settings.SSLTrustStorePassword,
|
||||
settings.SSLProtocol))
|
||||
}
|
||||
}
|
||||
|
||||
private def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = {
|
||||
try {
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
|
||||
trustManagerFactory.init(trustStore)
|
||||
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
|
||||
Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
|
||||
case e: IOException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
|
||||
case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e)
|
||||
}
|
||||
}
|
||||
|
||||
private def initialiseServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
|
||||
log.debug("Server SSL is enabled, initialising ...")
|
||||
|
||||
((settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol) match {
|
||||
case (Some(keyStore), Some(password), Some(protocol)) ⇒ constructServerContext(settings, log, keyStore, password, protocol)
|
||||
case (keyStore, password, protocol) ⇒ throw new GeneralSecurityException(
|
||||
"SSL key store settings went missing. [key-store: %s] [key-store-password: %s] [protocol: %s]".format(keyStore, password, protocol))
|
||||
}) match {
|
||||
case Some(context) ⇒
|
||||
log.debug("Using server SSL context to create SSLEngine ...")
|
||||
val sslEngine = context.createSSLEngine
|
||||
sslEngine.setUseClientMode(false)
|
||||
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
|
||||
new SslHandler(sslEngine)
|
||||
case None ⇒ throw new GeneralSecurityException(
|
||||
"""Failed to initialise server SSL because SSL context could not be found.
|
||||
Make sure your settings are correct: [key-store: %s] [key-store-password: %s] [protocol: %s]""".format(
|
||||
settings.SSLKeyStore,
|
||||
settings.SSLKeyStorePassword,
|
||||
settings.SSLProtocol))
|
||||
}
|
||||
}
|
||||
|
||||
private def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = {
|
||||
try {
|
||||
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
|
||||
factory.init(keyStore, keyStorePassword.toCharArray)
|
||||
Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initialiseCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
|
||||
case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
|
||||
case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -12,7 +12,6 @@ import org.jboss.netty.channel.group.ChannelGroup
|
|||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
|
||||
import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage }
|
||||
import akka.actor.Address
|
||||
|
|
@ -40,7 +39,7 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
|||
|
||||
private val bootstrap = {
|
||||
val b = new ServerBootstrap(factory)
|
||||
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), false))
|
||||
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false))
|
||||
b.setOption("backlog", settings.Backlog)
|
||||
b.setOption("tcpNoDelay", true)
|
||||
b.setOption("child.keepAlive", true)
|
||||
|
|
|
|||
|
|
@ -86,4 +86,55 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
|
|||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val SSLKeyStore = getString("ssl.key-store") match {
|
||||
case "" ⇒ None
|
||||
case keyStore ⇒ Some(keyStore)
|
||||
}
|
||||
|
||||
val SSLTrustStore = getString("ssl.trust-store") match {
|
||||
case "" ⇒ None
|
||||
case trustStore ⇒ Some(trustStore)
|
||||
}
|
||||
|
||||
val SSLKeyStorePassword = getString("ssl.key-store-password") match {
|
||||
case "" ⇒ None
|
||||
case password ⇒ Some(password)
|
||||
}
|
||||
|
||||
val SSLTrustStorePassword = getString("ssl.trust-store-password") match {
|
||||
case "" ⇒ None
|
||||
case password ⇒ Some(password)
|
||||
}
|
||||
|
||||
val SSLSupportedAlgorithms = getStringList("ssl.supported-algorithms").toArray.toSet
|
||||
|
||||
val SSLProtocol = getString("ssl.protocol") match {
|
||||
case "" ⇒ None
|
||||
case protocol ⇒ Some(protocol)
|
||||
}
|
||||
|
||||
val SSLRandomSource = getString("ssl.sha1prng-random-source") match {
|
||||
case "" ⇒ None
|
||||
case path ⇒ Some(path)
|
||||
}
|
||||
|
||||
val SSLRandomNumberGenerator = getString("ssl.random-number-generator") match {
|
||||
case "" ⇒ None
|
||||
case rng ⇒ Some(rng)
|
||||
}
|
||||
|
||||
val EnableSSL = {
|
||||
val enableSSL = getBoolean("ssl.enable")
|
||||
if (enableSSL) {
|
||||
if (SSLProtocol.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no protocol is defined in 'akka.remote.netty.ssl.protocol'.")
|
||||
if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no key/trust store is defined in 'akka.remote.netty.ssl.key-store' / 'akka.remote.netty.ssl.trust-store'.")
|
||||
if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl.key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl.key-store-password'.")
|
||||
if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.netty.ssl.trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl.trust-store-password'.")
|
||||
}
|
||||
enableSSL
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.security.provider
|
||||
|
||||
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
|
||||
import java.security.SecureRandom
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
class AES128CounterRNGFast extends java.security.SecureRandomSpi {
|
||||
private val rng = new AESCounterRNG(new SecureRandomSeedGenerator())
|
||||
|
||||
/**
|
||||
* This is managed internally only
|
||||
*/
|
||||
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
|
||||
|
||||
/**
|
||||
* Generates a user-specified number of random bytes.
|
||||
*
|
||||
* @param bytes the array to be filled in with random bytes.
|
||||
*/
|
||||
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
|
||||
|
||||
/**
|
||||
* Returns the given number of seed bytes. This call may be used to
|
||||
* seed other random number generators.
|
||||
*
|
||||
* @param numBytes the number of seed bytes to generate.
|
||||
* @return the seed bytes.
|
||||
*/
|
||||
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandom).generateSeed(numBytes)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.security.provider
|
||||
|
||||
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
class AES128CounterRNGSecure extends java.security.SecureRandomSpi {
|
||||
private val rng = new AESCounterRNG()
|
||||
|
||||
/**
|
||||
* This is managed internally only
|
||||
*/
|
||||
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
|
||||
|
||||
/**
|
||||
* Generates a user-specified number of random bytes.
|
||||
*
|
||||
* @param bytes the array to be filled in with random bytes.
|
||||
*/
|
||||
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
|
||||
|
||||
/**
|
||||
* Returns the given number of seed bytes. This call may be used to
|
||||
* seed other random number generators.
|
||||
*
|
||||
* @param numBytes the number of seed bytes to generate.
|
||||
* @return the seed bytes.
|
||||
*/
|
||||
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.security.provider
|
||||
|
||||
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
class AES256CounterRNGSecure extends java.security.SecureRandomSpi {
|
||||
private val rng = new AESCounterRNG(32) // Magic number is magic
|
||||
|
||||
/**
|
||||
* This is managed internally only
|
||||
*/
|
||||
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
|
||||
|
||||
/**
|
||||
* Generates a user-specified number of random bytes.
|
||||
*
|
||||
* @param bytes the array to be filled in with random bytes.
|
||||
*/
|
||||
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
|
||||
|
||||
/**
|
||||
* Returns the given number of seed bytes. This call may be used to
|
||||
* seed other random number generators.
|
||||
*
|
||||
* @param numBytes the number of seed bytes to generate.
|
||||
* @return the seed bytes.
|
||||
*/
|
||||
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.security.provider
|
||||
|
||||
import java.security.{ PrivilegedAction, AccessController, Provider }
|
||||
|
||||
/**
|
||||
* A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom
|
||||
*/
|
||||
final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
|
||||
AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] {
|
||||
def run = {
|
||||
//SecureRandom
|
||||
put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast")
|
||||
put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure")
|
||||
put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure")
|
||||
|
||||
//Implementation type: software or hardware
|
||||
put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software")
|
||||
put("SecureRandom.AES128CounterRNGSecure ImplementedIn", "Software")
|
||||
put("SecureRandom.AES256CounterRNGSecure ImplementedIn", "Software")
|
||||
null //Magic null is magic
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
BIN
akka-remote/src/test/resources/keystore
Normal file
BIN
akka-remote/src/test/resources/keystore
Normal file
Binary file not shown.
BIN
akka-remote/src/test/resources/truststore
Normal file
BIN
akka-remote/src/test/resources/truststore
Normal file
Binary file not shown.
|
|
@ -0,0 +1,175 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
/*package akka.remote
|
||||
|
||||
import akka.testkit._
|
||||
import akka.actor._
|
||||
import com.typesafe.config._
|
||||
import akka.dispatch.{ Await, Future }
|
||||
import akka.pattern.ask
|
||||
import java.io.File
|
||||
import java.security.{ SecureRandom, PrivilegedAction, AccessController }
|
||||
import netty.NettySSLSupport
|
||||
import akka.event.{ NoLogging, LoggingAdapter }
|
||||
|
||||
object Configuration {
|
||||
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
|
||||
// The certificate will expire in 2109
|
||||
private val trustStore = getPath("truststore")
|
||||
private val keyStore = getPath("keystore")
|
||||
private def getPath(name: String): String = (new File("akka-remote/src/test/resources/" + name)).getAbsolutePath.replace("\\", "\\\\")
|
||||
private val conf = """
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty {
|
||||
hostname = localhost
|
||||
port = 12345
|
||||
ssl {
|
||||
enable = on
|
||||
trust-store = "%s"
|
||||
key-store = "%s"
|
||||
random-number-generator = "%s"
|
||||
supported-algorithms = [%s]
|
||||
}
|
||||
}
|
||||
actor.deployment {
|
||||
/blub.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345"
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = if (try {
|
||||
NettySSLSupport.initialiseCustomSecureRandom(Some(cipher), None, NoLogging) ne null
|
||||
} catch {
|
||||
case _: IllegalArgumentException ⇒ false // Cannot match against the message since the message might be localized :S
|
||||
case _: java.security.NoSuchAlgorithmException ⇒ false
|
||||
}) (cipher, true, ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", ")))) else (cipher, false, AkkaSpec.testConf)
|
||||
}
|
||||
|
||||
import Configuration.getCipherConfig
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA"))
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
|
||||
|
||||
/**
|
||||
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
|
||||
*/
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure", "TLS_RSA_WITH_AES_128_CBC_SHA"))
|
||||
|
||||
/**
|
||||
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
|
||||
*/
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure", "TLS_RSA_WITH_AES_256_CBC_SHA"))
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA"))
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf))
|
||||
|
||||
abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender with DefaultTimeout {
|
||||
|
||||
import RemoteCommunicationSpec._
|
||||
|
||||
val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)
|
||||
val other = ActorSystem("remote-sys", conf)
|
||||
|
||||
val remote = other.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "ping" ⇒ sender ! (("pong", sender))
|
||||
}
|
||||
}), "echo")
|
||||
|
||||
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
|
||||
|
||||
override def atTermination() {
|
||||
other.shutdown()
|
||||
}
|
||||
|
||||
"SSL Remoting" must {
|
||||
if (cipherEnabledconfig._2) {
|
||||
"support remote look-ups" in {
|
||||
here ! "ping"
|
||||
expectMsgPF() {
|
||||
case ("pong", s: AnyRef) if s eq testActor ⇒ true
|
||||
}
|
||||
}
|
||||
|
||||
"send error message for wrong address" in {
|
||||
EventFilter.error(start = "dropping", occurrences = 1).intercept {
|
||||
system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
|
||||
}(other)
|
||||
}
|
||||
|
||||
"support ask" in {
|
||||
Await.result(here ? "ping", timeout.duration) match {
|
||||
case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good
|
||||
case m ⇒ fail(m + " was not (pong, AskActorRef)")
|
||||
}
|
||||
}
|
||||
|
||||
"send dead letters on remote if actor does not exist" in {
|
||||
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
||||
system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
|
||||
}(other)
|
||||
}
|
||||
|
||||
"create and supervise children on remote node" in {
|
||||
val r = system.actorOf(Props[Echo], "blub")
|
||||
r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub"
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
r ! new Exception("crash")
|
||||
}(other)
|
||||
expectMsg("preRestart")
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
system.stop(r)
|
||||
expectMsg("postStop")
|
||||
}
|
||||
|
||||
"look-up actors across node boundaries" in {
|
||||
val l = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n)
|
||||
case s: String ⇒ sender ! context.actorFor(s)
|
||||
}
|
||||
}), "looker")
|
||||
l ! (Props[Echo], "child")
|
||||
val r = expectMsgType[ActorRef]
|
||||
r ! (Props[Echo], "grandchild")
|
||||
val remref = expectMsgType[ActorRef]
|
||||
remref.isInstanceOf[LocalActorRef] must be(true)
|
||||
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
|
||||
myref.isInstanceOf[RemoteActorRef] must be(true)
|
||||
myref ! 43
|
||||
expectMsg(43)
|
||||
lastSender must be theSameInstanceAs remref
|
||||
r.asInstanceOf[RemoteActorRef].getParent must be(l)
|
||||
system.actorFor("/user/looker/child") must be theSameInstanceAs r
|
||||
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
||||
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
|
||||
}
|
||||
|
||||
"not fail ask across node boundaries" in {
|
||||
val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
|
||||
Await.result(Future.sequence(f), remaining).map(_._1).toSet must be(Set("pong"))
|
||||
}
|
||||
} else {
|
||||
"not be run when the cipher is not supported by the platform this test is currently being executed on" ignore {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}*/
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
package akka.remote
|
||||
|
||||
import akka.testkit._
|
||||
import akka.actor._
|
||||
import com.typesafe.config._
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.util.duration._
|
||||
import akka.util.Duration
|
||||
import akka.remote.netty.NettyRemoteTransport
|
||||
import java.util.ArrayList
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class Ticket1978ConfigSpec extends AkkaSpec("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty {
|
||||
hostname = localhost
|
||||
port = 12345
|
||||
}
|
||||
actor.deployment {
|
||||
/blub.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child.remote = "akka://remote-sys@localhost:12346"
|
||||
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
|
||||
}
|
||||
}
|
||||
""") with ImplicitSender with DefaultTimeout {
|
||||
|
||||
"SSL Remoting" must {
|
||||
"be able to parse these extra Netty config elements" in {
|
||||
val settings =
|
||||
system.asInstanceOf[ExtendedActorSystem]
|
||||
.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
.transport.asInstanceOf[NettyRemoteTransport]
|
||||
.settings
|
||||
import settings._
|
||||
|
||||
EnableSSL must be(false)
|
||||
SSLKeyStore must be(Some("keystore"))
|
||||
SSLKeyStorePassword must be(Some("changeme"))
|
||||
SSLTrustStore must be(Some("truststore"))
|
||||
SSLTrustStorePassword must be(Some("changeme"))
|
||||
SSLProtocol must be(Some("TLSv1"))
|
||||
SSLSupportedAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
|
||||
SSLRandomSource must be(None)
|
||||
SSLRandomNumberGenerator must be(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -452,7 +452,7 @@ object Dependencies {
|
|||
)
|
||||
|
||||
val remote = Seq(
|
||||
netty, protobuf, Test.junit, Test.scalatest
|
||||
netty, protobuf, uncommonsMath, Test.junit, Test.scalatest
|
||||
)
|
||||
|
||||
val cluster = Seq(Test.junit, Test.scalatest)
|
||||
|
|
@ -490,6 +490,7 @@ object Dependency {
|
|||
val ScalaStm = "0.5"
|
||||
val Scalatest = "1.6.1"
|
||||
val Slf4j = "1.6.4"
|
||||
val UncommonsMath = "1.2.2a"
|
||||
}
|
||||
|
||||
// Compile
|
||||
|
|
@ -499,6 +500,7 @@ object Dependency {
|
|||
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
|
||||
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)
|
||||
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
|
||||
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % V.UncommonsMath // ApacheV2
|
||||
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.6" // ApacheV2
|
||||
|
||||
// Test
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue