Merge remote-tracking branch 'akka/master' into osgi

This commit is contained in:
Gert Vanthienen 2012-06-18 19:02:48 +02:00
commit ca1a9b478a
40 changed files with 1273 additions and 163 deletions

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
public class NonPublicClass {
public static Props createProps() {
return new Props(MyNonPublicActorClass.class);
}
}
class MyNonPublicActorClass extends UntypedActor {
@Override public void onReceive(Object msg) {
getSender().tell(msg);
}
}

View file

@ -358,6 +358,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
system.stop(serverRef)
}
"support actorOfs where the class of the actor isn't public" in {
val a = system.actorOf(NonPublicClass.createProps())
a.tell("pigdog", testActor)
expectMsg("pigdog")
system stop a
}
"stop when sent a poison pill" in {
val timeout = Timeout(20000)
val ref = system.actorOf(Props(new Actor {

View file

@ -7,7 +7,6 @@ package akka.actor
import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import scala.collection.immutable.Stack
import java.util.regex.Pattern
/**
@ -279,18 +278,14 @@ trait Actor {
*/
protected[akka] implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
def noContextError =
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw new ActorInitializationException(
"\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." +
"\n\tYou have to use one of the factory methods to create a new actor. Either use:" +
"\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" +
"\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)")
if (contextStack.isEmpty) noContextError
val c = contextStack.head
if (c eq null) noContextError
ActorCell.contextStack.set(contextStack.push(null))
ActorCell.contextStack.set(null :: contextStack)
c
}

View file

@ -13,7 +13,7 @@ import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap }
import collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
//TODO: everything here for current compatibility - could be limited more
@ -173,8 +173,8 @@ trait UntypedActorContext extends ActorContext {
* for! (waves hand)
*/
private[akka] object ActorCell {
val contextStack = new ThreadLocal[Stack[ActorContext]] {
override def initialValue = Stack[ActorContext]()
val contextStack = new ThreadLocal[List[ActorContext]] {
override def initialValue: List[ActorContext] = Nil
}
final val emptyCancellable: Cancellable = new Cancellable {
@ -184,7 +184,7 @@ private[akka] object ActorCell {
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
@ -408,7 +408,7 @@ private[akka] class ActorCell(
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
@ -511,25 +511,21 @@ private[akka] class ActorCell(
//This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = {
contextStack.set(contextStack.get.push(this))
contextStack.set(this :: contextStack.get)
try {
import ActorCell.behaviorStackPlaceHolder
behaviorStack = behaviorStackPlaceHolder
behaviorStack = emptyBehaviorStack
val instance = props.creator.apply()
if (instance eq null)
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
behaviorStack = behaviorStack match {
case `behaviorStackPlaceHolder` Stack.empty.push(instance.receive)
case newBehaviors Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1))
}
// If no becomes were issued, the actors behavior is its receive method
behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
instance
} finally {
val stackAfter = contextStack.get
if (stackAfter.nonEmpty)
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context
contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
}
}
@ -683,10 +679,8 @@ private[akka] class ActorCell(
}
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = {
if (discardOld) unbecome()
behaviorStack = behaviorStack.push(behavior)
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
/**
* UntypedActorContext impl
@ -701,8 +695,9 @@ private[akka] class ActorCell(
def unbecome(): Unit = {
val original = behaviorStack
val popped = original.pop
behaviorStack = if (popped.isEmpty) original else popped
behaviorStack =
if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
}
def autoReceiveMessage(msg: Envelope): Unit = {
@ -761,7 +756,7 @@ private[akka] class ActorCell(
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally {
behaviorStack = behaviorStackPlaceHolder
behaviorStack = emptyBehaviorStack
clearActorFields(a)
actor = null
}

View file

@ -13,7 +13,6 @@ import java.io.Closeable
import akka.dispatch.Await.{ Awaitable, CanAwait }
import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import collection.immutable.Stack
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
@ -430,7 +429,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$"""))
throw new IllegalArgumentException(
"invalid ActorSystem name [" + name +
"], must contain only word characters (i.e. [a-zA-Z_0-9] plus non-leading '-')")
"], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-')")
import ActorSystem._
@ -685,8 +684,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
private val lock = new ReentrantGuard
private var callbacks: Stack[Runnable] = _ //non-volatile since guarded by the lock
lock withGuard { callbacks = Stack.empty[Runnable] }
private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock
lock withGuard { callbacks = Nil }
private val latch = new CountDownLatch(1)
@ -695,17 +694,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
case 0 throw new RejectedExecutionException("Must be called prior to system shutdown.")
case _ lock withGuard {
if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.")
else callbacks = callbacks.push(callback)
else callbacks ::= callback
}
}
}
final def run(): Unit = lock withGuard {
@tailrec def runNext(c: Stack[Runnable]): Stack[Runnable] = c.headOption match {
case None Stack.empty[Runnable]
case Some(callback)
try callback.run() catch { case e log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext(c.pop)
@tailrec def runNext(c: List[Runnable]): List[Runnable] = c match {
case Nil Nil
case callback :: rest
try callback.run() catch { case NonFatal(e) log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
runNext(rest)
}
try { callbacks = runNext(callbacks) } finally latch.countDown()
}

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.dispatch._
import akka.japi.Creator
import collection.immutable.Stack
import akka.routing._
/**
@ -186,5 +185,10 @@ case class Props(
* able to optimize serialization.
*/
private[akka] case class FromClassCreator(clazz: Class[_ <: Actor]) extends Function0[Actor] {
def apply(): Actor = clazz.newInstance
def apply(): Actor = try clazz.newInstance catch {
case iae: IllegalAccessException
val ctor = clazz.getDeclaredConstructor()
ctor.setAccessible(true)
ctor.newInstance()
}
}

View file

@ -875,3 +875,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class
protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message))
protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message))
}
private[akka] object NoLogging extends LoggingAdapter {
def isErrorEnabled = false
def isWarningEnabled = false
def isInfoEnabled = false
def isDebugEnabled = false
protected def notifyError(message: String): Unit = ()
protected def notifyError(cause: Throwable, message: String): Unit = ()
protected def notifyWarning(message: String): Unit = ()
protected def notifyInfo(message: String): Unit = ()
protected def notifyDebug(message: String): Unit = ()
}

View file

@ -118,6 +118,15 @@ object Member {
case _ None
}
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
}
}
/**
* Picks the Member with the highest "priority" MemberStatus.
*/
@ -130,8 +139,8 @@ object Member {
case (_, Exiting) m2
case (Leaving, _) m1
case (_, Leaving) m2
case (Up, Joining) m1
case (Joining, Up) m2
case (Up, Joining) m2
case (Joining, Up) m1
case (Joining, Joining) m1
case (Up, Up) m1
}
@ -268,21 +277,12 @@ case class Gossip(
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a ++ b).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(Member.highestPriorityOf)
}
}
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq).
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).
filterNot(mergedUnreachable.contains)
// 5. fresh seen table
@ -522,24 +522,28 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
// start periodic gossip to random nodes in cluster
private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
gossip()
}
private val gossipTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
gossip()
}
// start periodic heartbeat to all nodes in cluster
private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) {
heartbeat()
}
private val heartbeatTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
heartbeat()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
reapUnreachableMembers()
}
private val failureDetectorReaperTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
reapUnreachableMembers()
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) {
leaderActions()
}
private val leaderActionsTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
leaderActions()
}
createMBean()

View file

@ -39,7 +39,7 @@ abstract class ConvergenceSpec
"A cluster of 3 members" must {
"reach initial convergence" taggedAs LongRunningTest ignore {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(fourth) {
@ -49,7 +49,7 @@ abstract class ConvergenceSpec
testConductor.enter("after-1")
}
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in {
val thirdAddress = node(third).address
testConductor.enter("before-shutdown")
@ -81,7 +81,7 @@ abstract class ConvergenceSpec
testConductor.enter("after-2")
}
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in {
runOn(fourth) {
// try to join
cluster.join(node(first).address)

View file

@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
val c1 = role("c1")
val c2 = role("c2")
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig))
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy

View file

@ -21,7 +21,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task interval
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))

View file

@ -19,7 +19,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.leader-actions-interval = 5 s
akka.cluster.unreachable-nodes-reaper-interval = 30 s
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))
}

View file

@ -5,12 +5,15 @@ package akka.cluster
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.{Address, ExtendedActorSystem}
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.util.Duration
import org.scalatest.Suite
import org.scalatest.TestFailedException
import scala.util.control.NoStackTrace
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
@ -29,10 +32,28 @@ object MultiNodeClusterSpec {
""")
}
trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec
override def initialParticipants = roles.size
// Cluster tests are written so that if previous step (test method) failed
// it will most likely not be possible to run next step. This ensures
// fail fast of steps after the first failure.
private var failed = false
override protected def withFixture(test: NoArgTest): Unit = try {
if (failed) {
val e = new TestFailedException("Previous step failed", 0)
// short stack trace
e.setStackTrace(e.getStackTrace.take(1))
throw e
}
super.withFixture(test)
} catch {
case t
failed = true
throw t
}
/**
* The cluster node instance. Needs to be lazily created.
*/
@ -151,6 +172,6 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
}
def roleName(address: Address): Option[RoleName] = {
testConductor.getNodes.await.find(node(_).address == address)
roles.find(node(_).address == address)
}
}

View file

@ -20,7 +20,7 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-interval = 30 s
unreachable-nodes-reaper-interval = 300 s # turn "off"
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))

View file

@ -21,18 +21,19 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString("""
akka.cluster {
gossip-interval = 400 ms
nr-of-deputy-nodes = 0
# FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms
}
akka.loglevel = INFO
"""))
}
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy
abstract class SunnyWeatherSpec
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)

View file

@ -0,0 +1,435 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.Address
import akka.remote.testconductor.RoleName
import MemberStatus._
object TransitionMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString(
"akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy
abstract class TransitionSpec
extends MultiNodeSpec(TransitionMultiJvmSpec)
with MultiNodeClusterSpec {
import TransitionMultiJvmSpec._
// sorted in the order used by the cluster
def leader(roles: RoleName*) = roles.sorted.head
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
def memberStatus(address: Address): MemberStatus = {
val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst {
case m if m.address == address m.status
}
statusOption must not be (None)
statusOption.get
}
def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address)
def members: Set[RoleName] = memberAddresses.flatMap(roleName(_))
def seenLatestGossip: Set[RoleName] = {
val gossip = cluster.latestGossip
gossip.overview.seen.collect {
case (address, v) if v == gossip.version roleName(address)
}.flatten.toSet
}
def awaitSeen(addresses: Address*): Unit = awaitCond {
seenLatestGossip.map(node(_).address) == addresses.toSet
}
def awaitMembers(addresses: Address*): Unit = awaitCond {
memberAddresses == addresses.toSet
}
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond {
memberStatus(address) == Up
}
// implicit conversion from RoleName to Address
implicit def role2Address(role: RoleName): Address = node(role).address
// DSL sugar for `role1 gossipTo role2`
implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role)
var gossipBarrierCounter = 0
class RoleWrapper(fromRole: RoleName) {
def gossipTo(toRole: RoleName): Unit = {
gossipBarrierCounter += 1
runOn(toRole) {
val g = cluster.latestGossip
testConductor.enter("before-gossip-" + gossipBarrierCounter)
awaitCond(cluster.latestGossip != g) // received gossip
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
runOn(fromRole) {
testConductor.enter("before-gossip-" + gossipBarrierCounter)
cluster.gossipTo(node(toRole).address) // send gossip
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
runOn(roles.filterNot(r r == fromRole || r == toRole): _*) {
testConductor.enter("before-gossip-" + gossipBarrierCounter)
testConductor.enter("after-gossip-" + gossipBarrierCounter)
}
}
}
"A Cluster" must {
"start nodes as singleton clusters" taggedAs LongRunningTest in {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
cluster.leaderActions()
cluster.status must be(Up)
testConductor.enter("after-1")
}
"perform correct transitions when second joining first" taggedAs LongRunningTest in {
runOn(second) {
cluster.join(first)
}
runOn(first) {
awaitMembers(first, second)
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
cluster.convergence.isDefined must be(false)
}
testConductor.enter("second-joined")
first gossipTo second
runOn(second) {
members must be(Set(first, second))
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
// we got a conflicting version in second, and therefore not convergence in second
seenLatestGossip must be(Set(second))
cluster.convergence.isDefined must be(false)
}
second gossipTo first
runOn(first) {
seenLatestGossip must be(Set(first, second))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second))
}
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("convergence-joining-2")
runOn(leader(first, second)) {
cluster.leaderActions()
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
}
testConductor.enter("leader-actions-2")
leader(first, second) gossipTo nonLeader(first, second).head
runOn(nonLeader(first, second).head) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
nonLeader(first, second).head gossipTo leader(first, second)
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-2")
}
"perform correct transitions when third joins second" taggedAs LongRunningTest in {
runOn(third) {
cluster.join(second)
}
runOn(second) {
awaitMembers(first, second, third)
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(second))
}
testConductor.enter("third-joined-second")
second gossipTo first
runOn(first) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
}
first gossipTo third
runOn(third) {
members must be(Set(first, second, third))
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining)
// conflicting version
seenLatestGossip must be(Set(third))
}
third gossipTo first
third gossipTo second
runOn(first, second) {
seenLatestGossip must be(Set(myself, third))
}
first gossipTo second
runOn(second) {
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
runOn(first, third) {
cluster.convergence.isDefined must be(false)
}
second gossipTo first
second gossipTo third
runOn(first, second, third) {
seenLatestGossip must be(Set(first, second, third))
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Joining)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("convergence-joining-3")
runOn(leader(first, second, third)) {
cluster.leaderActions()
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
}
testConductor.enter("leader-actions-3")
// leader gossipTo first non-leader
leader(first, second, third) gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(leader(first, second, third), myself))
cluster.convergence.isDefined must be(false)
}
// first non-leader gossipTo the other non-leader
nonLeader(first, second, third).head gossipTo nonLeader(first, second, third).tail.head
runOn(nonLeader(first, second, third).head) {
cluster.gossipTo(node(nonLeader(first, second, third).tail.head).address)
}
runOn(nonLeader(first, second, third).tail.head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
// and back again
nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
// first non-leader gossipTo the leader
nonLeader(first, second, third).head gossipTo leader(first, second, third)
runOn(first, second, third) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-3")
}
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(fifth)
awaitMembers(fourth, fifth)
cluster.gossipTo(fifth)
awaitSeen(fourth, fifth)
cluster.convergence.isDefined must be(true)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
cluster.gossipTo(fourth)
awaitSeen(fourth, fifth)
cluster.gossipTo(fourth)
cluster.convergence.isDefined must be(true)
}
testConductor.enter("fourth-joined-fifth")
testConductor.enter("after-4")
}
"perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(third)
}
runOn(third) {
awaitMembers(first, second, third, fourth)
seenLatestGossip must be(Set(third))
}
testConductor.enter("fourth-joined-third")
third gossipTo second
runOn(second) {
seenLatestGossip must be(Set(second, third))
}
second gossipTo fourth
runOn(fourth) {
members must be(roles.toSet)
// merge conflict
seenLatestGossip must be(Set(fourth))
}
fourth gossipTo first
fourth gossipTo second
fourth gossipTo third
fourth gossipTo fifth
runOn(first, second, third, fifth) {
members must be(roles.toSet)
seenLatestGossip must be(Set(fourth, myself))
}
first gossipTo fifth
runOn(fifth) {
seenLatestGossip must be(Set(first, fourth, fifth))
}
fifth gossipTo third
runOn(third) {
seenLatestGossip must be(Set(first, third, fourth, fifth))
}
third gossipTo second
runOn(second) {
seenLatestGossip must be(roles.toSet)
cluster.convergence.isDefined must be(true)
}
second gossipTo first
second gossipTo third
second gossipTo fourth
third gossipTo fifth
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Joining)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
testConductor.enter("convergence-joining-3")
runOn(leader(roles: _*)) {
cluster.leaderActions()
memberStatus(fourth) must be(Up)
seenLatestGossip must be(Set(myself))
cluster.convergence.isDefined must be(false)
}
// spread the word
for (x :: y :: Nil (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) {
x gossipTo y
}
testConductor.enter("spread-5")
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Up)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
testConductor.enter("after-5")
}
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in {
runOn(fifth) {
markNodeAsUnavailable(second)
cluster.reapUnreachableMembers()
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
seenLatestGossip must be(Set(fifth))
}
// spread the word
val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
cluster.convergence.isDefined must be(false)
}
runOn(third) {
cluster.down(second)
awaitMemberStatus(second, Down)
}
// spread the word
val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound2.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Down))
memberStatus(second) must be(Down)
seenLatestGossip must be(Set(first, third, fourth, fifth))
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after-6")
}
}
}

View file

@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = SortedSet(a2, c2, e2))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a1, c1, e2))
merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
merged1.members must be(SortedSet(a2, c1, e1))
merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a1, c1, e2))
merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
merged2.members must be(SortedSet(a2, c1, e1))
merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
}
@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))
val merged1 = g1 merge g2
merged1.overview.unreachable must be(Set(a1, b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
merged1.overview.unreachable must be(Set(a2, b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.overview.unreachable must be(Set(a1, b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
merged2.overview.unreachable must be(Set(a2, b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
}
@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers {
val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2)))
val merged1 = g1 merge g2
merged1.members must be(SortedSet(a1))
merged1.members.toSeq.map(_.status) must be(Seq(Up))
merged1.members must be(SortedSet(a2))
merged1.members.toSeq.map(_.status) must be(Seq(Joining))
merged1.overview.unreachable must be(Set(b2, c1, d2))
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
val merged2 = g2 merge g1
merged2.members must be(SortedSet(a1))
merged2.members.toSeq.map(_.status) must be(Seq(Up))
merged2.members must be(SortedSet(a2))
merged2.members.toSeq.map(_.status) must be(Seq(Joining))
merged2.overview.unreachable must be(Set(b2, c1, d2))
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))

View file

@ -16,9 +16,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory
private[akka] class TestConductorTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends NettyRemoteTransport(_system, _provider) {
override def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
override def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout) :+ endpoint)
def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout, isClient) :+ endpoint)
}
}

View file

@ -165,6 +165,52 @@ akka {
# (O) Maximum time window that a client should try to reconnect for
reconnection-time-window = 600s
ssl {
# (I&O) Enable SSL/TLS encryption.
# This must be enabled on both the client and server to work.
enable = off
# (I) This is the Java Key Store used by the server connection
key-store = "keystore"
# This password is used for decrypting the key store
key-store-password = "changeme"
# (O) This is the Java Key Store used by the client connection
trust-store = "truststore"
# This password is used for decrypting the trust store
trust-store-password = "changeme"
# (I&O) Protocol to use for SSL encryption, choose from:
# Java 6 & 7:
# 'SSLv3', 'TLSv1'
# Java 7:
# 'TLSv1.1', 'TLSv1.2'
protocol = "TLSv1"
# Examples: [ "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA" ]
# You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256
# More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider
supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"]
# Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to prevent blocking
# It is NOT as secure because it reuses the seed
# '' => defaults to /dev/random or whatever is set in java.security for example: securerandom.source=file:/dev/random
# '/dev/./urandom' => NOT '/dev/urandom' as that doesn't work according to: http://bugs.sun.com/view_bug.do?bug_id=6202721
sha1prng-random-source = ""
# There are three options, in increasing order of security:
# "" or SecureRandom => (default)
# "SHA1PRNG" => Can be slow because of blocking issues on Linux
# "AES128CounterRNGFast" => fastest startup and based on AES encryption algorithm
# The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java)
# "AES128CounterRNGSecure"
# "AES256CounterRNGSecure" (Install JCE Unlimited Strength Jurisdiction Policy Files first)
# Setting a value here may require you to supply the appropriate cipher suite (see supported-algorithms section above)
random-number-generator = ""
}
}
}
}

View file

@ -145,7 +145,7 @@ private[akka] class ActiveRemoteClient private[akka] (
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
val b = new ClientBootstrap(netty.clientChannelFactory)
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), true))
b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true))
b.setOption("tcpNoDelay", true)
b.setOption("keepAlive", true)
b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)

View file

@ -64,17 +64,18 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
*
* @param withTimeout determines whether an IdleStateHandler shall be included
*/
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean): ChannelPipelineFactory =
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = apply(defaultStack(withTimeout) ++ endpoint)
def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint)
}
/**
* Construct a default protocol stack, excluding the head handler (i.e. the one which
* actually dispatches the received messages to the local target actors).
*/
def defaultStack(withTimeout: Boolean): Seq[ChannelHandler] =
(if (withTimeout) timeout :: Nil else Nil) :::
def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] =
(if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) :::
(if (withTimeout) timeout :: Nil else Nil) :::
msgFormat :::
authenticator :::
executionHandler ::
@ -122,8 +123,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* This method is factored out to provide an extension point in case the
* pipeline shall be changed. It is recommended to use
*/
def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =
PipelineFactory(Seq(endpoint), withTimeout)
def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory =
PipelineFactory(Seq(endpoint), withTimeout, isClient)
private val remoteClients = new HashMap[Address, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock

View file

@ -0,0 +1,132 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.netty
import org.jboss.netty.handler.ssl.SslHandler
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
import akka.remote.RemoteTransportException
import akka.event.LoggingAdapter
import java.io.{ IOException, FileNotFoundException, FileInputStream }
import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security }
import akka.security.provider.AkkaProvider
/**
* Used for adding SSL support to Netty pipeline
* Internal use only
*/
private[akka] object NettySSLSupport {
/**
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
*/
def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler =
if (isClient) initializeClientSSL(settings, log) else initializeServerSSL(settings, log)
def initializeCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = {
/**
* According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721
* Using /dev/./urandom is only necessary when using SHA1PRNG on Linux
* <quote>Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")'</quote> to avoid having problems
*/
sourceOfRandomness foreach { path System.setProperty("java.security.egd", path) }
val rng = rngName match {
case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure"))
log.debug("SSL random number generator set to: {}", r)
val akka = new AkkaProvider
Security.addProvider(akka)
SecureRandom.getInstance(r, akka)
case Some("SHA1PRNG")
log.debug("SSL random number generator set to: SHA1PRNG")
// This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking
// However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD).
SecureRandom.getInstance("SHA1PRNG")
case Some(unknown)
log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown)
new SecureRandom
case None
log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom")
new SecureRandom
}
rng.nextInt() // prevent stall on first access
rng
}
def initializeClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Client SSL is enabled, initialising ...")
def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] =
try {
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
trustManagerFactory.init(trustStore)
val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e)
}
((settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol) match {
case (Some(trustStore), Some(password), Some(protocol)) constructClientContext(settings, log, trustStore, password, protocol)
case (trustStore, password, protocol) throw new GeneralSecurityException(
"One or several SSL trust store settings are missing: [trust-store: %s] [trust-store-password: %s] [protocol: %s]".format(
trustStore,
password,
protocol))
}) match {
case Some(context)
log.debug("Using client SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(true)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
case None
throw new GeneralSecurityException(
"""Failed to initialize client SSL because SSL context could not be found." +
"Make sure your settings are correct: [trust-store: %s] [trust-store-password: %s] [protocol: %s]""".format(
settings.SSLTrustStore,
settings.SSLTrustStorePassword,
settings.SSLProtocol))
}
}
def initializeServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = {
log.debug("Server SSL is enabled, initialising ...")
def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] =
try {
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed?
factory.init(keyStore, keyStorePassword.toCharArray)
Option(SSLContext.getInstance(protocol)) map { ctx ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx }
} catch {
case e: FileNotFoundException throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
case e: IOException throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
case e: GeneralSecurityException throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e)
}
((settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol) match {
case (Some(keyStore), Some(password), Some(protocol)) constructServerContext(settings, log, keyStore, password, protocol)
case (keyStore, password, protocol) throw new GeneralSecurityException(
"SSL key store settings went missing. [key-store: %s] [key-store-password: %s] [protocol: %s]".format(keyStore, password, protocol))
}) match {
case Some(context)
log.debug("Using server SSL context to create SSLEngine ...")
val sslEngine = context.createSSLEngine
sslEngine.setUseClientMode(false)
sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString))
new SslHandler(sslEngine)
case None throw new GeneralSecurityException(
"""Failed to initialize server SSL because SSL context could not be found.
Make sure your settings are correct: [key-store: %s] [key-store-password: %s] [protocol: %s]""".format(
settings.SSLKeyStore,
settings.SSLKeyStorePassword,
settings.SSLProtocol))
}
}
}

View file

@ -12,7 +12,6 @@ import org.jboss.netty.channel.group.ChannelGroup
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.execution.ExecutionHandler
import akka.event.Logging
import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage }
import akka.actor.Address
@ -40,7 +39,7 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) {
private val bootstrap = {
val b = new ServerBootstrap(factory)
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), false))
b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false))
b.setOption("backlog", settings.Backlog)
b.setOption("tcpNoDelay", true)
b.setOption("child.keepAlive", true)

View file

@ -86,4 +86,55 @@ private[akka] class NettySettings(config: Config, val systemName: String) {
case sz sz
}
val SSLKeyStore = getString("ssl.key-store") match {
case "" None
case keyStore Some(keyStore)
}
val SSLTrustStore = getString("ssl.trust-store") match {
case "" None
case trustStore Some(trustStore)
}
val SSLKeyStorePassword = getString("ssl.key-store-password") match {
case "" None
case password Some(password)
}
val SSLTrustStorePassword = getString("ssl.trust-store-password") match {
case "" None
case password Some(password)
}
val SSLSupportedAlgorithms = getStringList("ssl.supported-algorithms").toArray.toSet
val SSLProtocol = getString("ssl.protocol") match {
case "" None
case protocol Some(protocol)
}
val SSLRandomSource = getString("ssl.sha1prng-random-source") match {
case "" None
case path Some(path)
}
val SSLRandomNumberGenerator = getString("ssl.random-number-generator") match {
case "" None
case rng Some(rng)
}
val EnableSSL = {
val enableSSL = getBoolean("ssl.enable")
if (enableSSL) {
if (SSLProtocol.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no protocol is defined in 'akka.remote.netty.ssl.protocol'.")
if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.enable is turned on but no key/trust store is defined in 'akka.remote.netty.ssl.key-store' / 'akka.remote.netty.ssl.trust-store'.")
if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl.key-store-password'.")
if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.netty.ssl.trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl.trust-store-password'.")
}
enableSSL
}
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
import java.security.SecureRandom
/**
* Internal API
*/
class AES128CounterRNGFast extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG(new SecureRandomSeedGenerator())
/**
* This is managed internally only
*/
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandom).generateSeed(numBytes)
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
/**
* Internal API
*/
class AES128CounterRNGSecure extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG()
/**
* This is managed internally only
*/
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator }
/**
* Internal API
*/
class AES256CounterRNGSecure extends java.security.SecureRandomSpi {
private val rng = new AESCounterRNG(32) // Magic number is magic
/**
* This is managed internally only
*/
override protected def engineSetSeed(seed: Array[Byte]): Unit = ()
/**
* Generates a user-specified number of random bytes.
*
* @param bytes the array to be filled in with random bytes.
*/
override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes)
/**
* Returns the given number of seed bytes. This call may be used to
* seed other random number generators.
*
* @param numBytes the number of seed bytes to generate.
* @return the seed bytes.
*/
override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes)
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.security.provider
import java.security.{ PrivilegedAction, AccessController, Provider }
/**
* A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom
*/
final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") {
AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] {
def run = {
//SecureRandom
put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast")
put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure")
put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure")
//Implementation type: software or hardware
put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software")
put("SecureRandom.AES128CounterRNGSecure ImplementedIn", "Software")
put("SecureRandom.AES256CounterRNGSecure ImplementedIn", "Software")
null //Magic null is magic
}
})
}

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,198 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.dispatch.{ Await, Future }
import akka.pattern.ask
import java.io.File
import akka.event.{ NoLogging, LoggingAdapter }
import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController }
import netty.{ NettySettings, NettySSLSupport }
import javax.net.ssl.SSLException
import akka.util.{ Timeout, Duration }
import akka.util.duration._
object Configuration {
// set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager"
// The certificate will expire in 2109
private val trustStore = getClass.getClassLoader.getResource("truststore").getPath
private val keyStore = getClass.getClassLoader.getResource("keystore").getPath
private val conf = """
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
ssl {
enable = on
trust-store = "%s"
key-store = "%s"
random-number-generator = "%s"
supported-algorithms = [%s]
sha1prng-random-source = "/dev/./urandom"
}
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345"
}
}
"""
def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try {
val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", "))))
val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty")
val settings = new NettySettings(fullConfig, "placeholder")
val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging)
rng.nextInt() // Has to work
settings.SSLRandomNumberGenerator foreach { sRng rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) }
val engine = NettySSLSupport.initializeServerSSL(settings, NoLogging).getEngine
val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet
val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet
gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported))
gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled))
engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException(settings.SSLProtocol.get))
(cipher, true, config)
} catch {
case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) | (_: SSLException) (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S
}
}
import Configuration.getCipherConfig
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure", "TLS_RSA_WITH_AES_128_CBC_SHA"))
/**
* Both of the <quote>Secure</quote> variants require access to the Internet to access random.org.
*/
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure", "TLS_RSA_WITH_AES_256_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf))
abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender {
implicit val timeout: Timeout = Timeout(30 seconds)
import RemoteCommunicationSpec._
val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config))
override def atTermination() {
other.shutdown()
}
"SSL Remoting" must {
if (cipherEnabledconfig._2) {
val remote = other.actorOf(Props(new Actor { def receive = { case "ping" sender ! (("pong", sender)) } }), "echo")
val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo")
"support remote look-ups" in {
here ! "ping"
expectMsgPF(timeout.duration) {
case ("pong", s: AnyRef) if s eq testActor true
}
}
"send error message for wrong address" in {
within(timeout.duration) {
EventFilter.error(start = "dropping", occurrences = 1).intercept {
system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping"
}(other)
}
}
"support ask" in {
Await.result(here ? "ping", timeout.duration) match {
case ("pong", s: akka.pattern.PromiseActorRef) // good
case m fail(m + " was not (pong, AskActorRef)")
}
}
"send dead letters on remote if actor does not exist" in {
within(timeout.duration) {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(other)
}
}
"create and supervise children on remote node" in {
within(timeout.duration) {
val r = system.actorOf(Props[Echo], "blub")
r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
r ! new Exception("crash")
}(other)
expectMsg("preRestart")
r ! 42
expectMsg(42)
system.stop(r)
expectMsg("postStop")
}
}
"look-up actors across node boundaries" in {
within(timeout.duration) {
val l = system.actorOf(Props(new Actor {
def receive = {
case (p: Props, n: String) sender ! context.actorOf(p, n)
case s: String sender ! context.actorFor(s)
}
}), "looker")
l ! (Props[Echo], "child")
val r = expectMsgType[ActorRef]
r ! (Props[Echo], "grandchild")
val remref = expectMsgType[ActorRef]
remref.isInstanceOf[LocalActorRef] must be(true)
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
myref.isInstanceOf[RemoteActorRef] must be(true)
myref ! 43
expectMsg(43)
lastSender must be theSameInstanceAs remref
r.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs r
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
}
}
"not fail ask across node boundaries" in {
val f = for (_ 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)]
Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong"))
}
} else {
"not be run when the cipher is not supported by the platform this test is currently being executed on" ignore {
}
}
}
}

View file

@ -0,0 +1,48 @@
package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.actor.ExtendedActorSystem
import akka.util.duration._
import akka.util.Duration
import akka.remote.netty.NettyRemoteTransport
import java.util.ArrayList
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket1978ConfigSpec extends AkkaSpec("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty {
hostname = localhost
port = 12345
}
actor.deployment {
/blub.remote = "akka://remote-sys@localhost:12346"
/looker/child.remote = "akka://remote-sys@localhost:12346"
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender with DefaultTimeout {
"SSL Remoting" must {
"be able to parse these extra Netty config elements" in {
val settings =
system.asInstanceOf[ExtendedActorSystem]
.provider.asInstanceOf[RemoteActorRefProvider]
.transport.asInstanceOf[NettyRemoteTransport]
.settings
import settings._
EnableSSL must be(false)
SSLKeyStore must be(Some("keystore"))
SSLKeyStorePassword must be(Some("changeme"))
SSLTrustStore must be(Some("truststore"))
SSLTrustStorePassword must be(Some("changeme"))
SSLProtocol must be(Some("TLSv1"))
SSLSupportedAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
SSLRandomSource must be(None)
SSLRandomNumberGenerator must be(None)
}
}
}

View file

@ -75,7 +75,9 @@ object AkkaKernelPlugin extends Plugin {
copyFiles(libFiles(cp, conf.libFilter), distLibPath)
copyFiles(conf.additionalLibs, distLibPath)
for (subTarget subProjectDependencies.map(_.target)) {
for (subProjectDependency subProjectDependencies) {
val subTarget = subProjectDependency.target
EvaluateTask(buildStruct, packageBin in Compile, st, subProjectDependency.projectRef)
copyJars(subTarget, distLibPath)
}
log.info("Distribution created.")
@ -220,10 +222,10 @@ object AkkaKernelPlugin extends Plugin {
}.toList
val target = setting(Keys.crossTarget, "Missing crossTarget directory")
SubProjectInfo(project.id, target, subProjects)
SubProjectInfo(projectRef, target, subProjects)
}
private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) {
private case class SubProjectInfo(projectRef: ProjectRef, target: File, subProjects: Seq[SubProjectInfo]) {
def recursiveSubProjects: Set[SubProjectInfo] = {
val flatSubProjects = for {

View file

@ -5,9 +5,7 @@
package akka.testkit
import akka.actor._
import akka.util.Duration
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.Stack
import akka.dispatch._
import akka.pattern.ask

View file

@ -486,19 +486,21 @@ trait TestKitBase {
@tailrec
def doit(acc: List[T], count: Int): List[T] = {
if (count >= messages) return acc.reverse
receiveOne((stop - now) min idle)
lastMessage match {
case NullMessage
lastMessage = msg
acc.reverse
case RealMessage(o, _) if (f isDefinedAt o)
msg = lastMessage
doit(f(o) :: acc, count + 1)
case RealMessage(o, _)
queue.offerFirst(lastMessage)
lastMessage = msg
acc.reverse
if (count >= messages) acc.reverse
else {
receiveOne((stop - now) min idle)
lastMessage match {
case NullMessage
lastMessage = msg
acc.reverse
case RealMessage(o, _) if (f isDefinedAt o)
msg = lastMessage
doit(f(o) :: acc, count + 1)
case RealMessage(o, _)
queue.offerFirst(lastMessage)
lastMessage = msg
acc.reverse
}
}
}

View file

@ -57,7 +57,7 @@ public class UntypedCoordinatedIncrementTest {
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialise() {
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;

View file

@ -58,7 +58,7 @@ public class UntypedTransactorTest {
Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS);
@Before
public void initialise() {
public void initialize() {
counters = new ArrayList<ActorRef>();
for (int i = 1; i <= numCounters; i++) {
final String name = "counter" + i;

View file

@ -9,14 +9,17 @@ import akka.actor._
import akka.dispatch.{ Promise, Future }
import akka.event.Logging
import annotation.tailrec
import akka.util.Duration
import java.util.concurrent.TimeUnit
import collection.mutable.ListBuffer
import akka.util.{ NonFatal, Duration }
private[zeromq] object ConcurrentSocketActor {
private sealed trait PollMsg
private case object Poll extends PollMsg
private case object PollCareful extends PollMsg
private case object Flush
private class NoSocketHandleException() extends Exception("Couldn't create a zeromq socket.")
private val DefaultContext = Context()
@ -32,19 +35,28 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
import SocketType.{ ZMQSocketType ST }
params.collectFirst { case t: ST t }.getOrElse(throw new IllegalArgumentException("A socket type is required"))
}
private val socket: Socket = zmqContext.socket(socketType)
private val poller: Poller = zmqContext.poller
private val log = Logging(context.system, this)
private val pendingSends = new ListBuffer[Seq[Frame]]
def receive = {
case m: PollMsg doPoll(m)
case ZMQMessage(frames) sendMessage(frames)
case ZMQMessage(frames) handleRequest(Send(frames))
case r: Request handleRequest(r)
case Flush flush()
case Terminated(_) context stop self
}
private def handleRequest(msg: Request): Unit = msg match {
case Send(frames) sendMessage(frames)
case Send(frames)
if (frames.nonEmpty) {
val flushNow = pendingSends.isEmpty
pendingSends.append(frames)
if (flushNow) flush()
}
case opt: SocketOption handleSocketOption(opt)
case q: SocketOptionQuery handleSocketOptionQuery(q)
}
@ -117,48 +129,46 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
}
}
private def setupConnection() {
private def setupConnection(): Unit = {
params filter (_.isInstanceOf[SocketConnectOption]) foreach { self ! _ }
params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ }
}
private def deserializerFromParams = {
private def deserializerFromParams: Deserializer =
params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
private def setupSocket() = params foreach {
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta // ignore, handled differently
case m self ! m
}
private def setupSocket() = {
params foreach {
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta // ignore, handled differently
case m self ! m
override def preRestart(reason: Throwable, message: Option[Any]): Unit = context.children foreach context.stop //Do not call postStop
override def postRestart(reason: Throwable): Unit = () // Do nothing
override def postStop: Unit = try {
if (socket != null) {
poller.unregister(socket)
socket.close
}
}
} finally notifyListener(Closed)
override def preRestart(reason: Throwable, message: Option[Any]) {
context.children foreach context.stop //Do not call postStop
}
override def postRestart(reason: Throwable) {} //Do nothing
override def postStop {
try {
if (socket != null) {
poller.unregister(socket)
socket.close
@tailrec private def flushMessage(i: Seq[Frame]): Boolean =
if (i.isEmpty)
true
else {
val head = i.head
val tail = i.tail
if (socket.send(head.payload.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail)
else {
pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it
self ! Flush
false
}
} finally {
notifyListener(Closed)
}
}
private def sendMessage(frames: Seq[Frame]) {
def sendBytes(bytes: Seq[Byte], flags: Int) = socket.send(bytes.toArray, flags)
val iter = frames.iterator
while (iter.hasNext) {
val payload = iter.next.payload
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
sendBytes(payload, flags)
}
}
@tailrec private def flush(): Unit =
if (pendingSends.nonEmpty && flushMessage(pendingSends.remove(0))) flush() // Flush while things are going well
// this is a PollMsg=>Unit which either polls or schedules Poll, depending on the sign of the timeout
private val doPollTimeout = {

View file

@ -139,8 +139,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
*/
def newSocket(socketParameters: SocketOption*): ActorRef = {
implicit val timeout = NewSocketTimeout
val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef]
Await.result(req, timeout.duration)
Await.result((zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef], timeout.duration)
}
/**
@ -248,9 +247,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
case _ false
}
def receive = {
case p: Props sender ! context.actorOf(p)
}
def receive = { case p: Props sender ! context.actorOf(p) }
}), "zeromq")
}

View file

@ -461,7 +461,7 @@ object Dependencies {
)
val remote = Seq(
netty, protobuf, Test.junit, Test.scalatest
netty, protobuf, uncommonsMath, Test.junit, Test.scalatest
)
val cluster = Seq(Test.junit, Test.scalatest)
@ -502,6 +502,7 @@ object Dependency {
val ScalaStm = "0.5"
val Scalatest = "1.6.1"
val Slf4j = "1.6.4"
val UncommonsMath = "1.2.2a"
}
// Compile
@ -513,6 +514,7 @@ object Dependency {
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % V.UncommonsMath // ApacheV2
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.6" // ApacheV2
// Test