diff --git a/akka-actor-tests/src/test/java/akka/actor/NonPublicClass.java b/akka-actor-tests/src/test/java/akka/actor/NonPublicClass.java new file mode 100644 index 0000000000..850d82cd62 --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/actor/NonPublicClass.java @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor; + +public class NonPublicClass { + public static Props createProps() { + return new Props(MyNonPublicActorClass.class); + } +} + +class MyNonPublicActorClass extends UntypedActor { + @Override public void onReceive(Object msg) { + getSender().tell(msg); + } +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index bec066d97a..3056dc9e95 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -358,6 +358,13 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { system.stop(serverRef) } + "support actorOfs where the class of the actor isn't public" in { + val a = system.actorOf(NonPublicClass.createProps()) + a.tell("pigdog", testActor) + expectMsg("pigdog") + system stop a + } + "stop when sent a poison pill" in { val timeout = Timeout(20000) val ref = system.actorOf(Props(new Actor { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2721ccffa0..8fc7df93e5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -7,7 +7,6 @@ package akka.actor import akka.AkkaException import scala.reflect.BeanProperty import scala.util.control.NoStackTrace -import scala.collection.immutable.Stack import java.util.regex.Pattern /** @@ -279,18 +278,14 @@ trait Actor { */ protected[akka] implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get - - def noContextError = + if ((contextStack.isEmpty) || (contextStack.head eq null)) throw new ActorInitializationException( "\n\tYou cannot create an instance of [" + getClass.getName + "] explicitly using the constructor (new)." + "\n\tYou have to use one of the factory methods to create a new actor. Either use:" + "\n\t\t'val actor = context.actorOf(Props[MyActor])' (to create a supervised child actor from within an actor), or" + "\n\t\t'val actor = system.actorOf(Props(new MyActor(..)))' (to create a top level actor from the ActorSystem)") - - if (contextStack.isEmpty) noContextError val c = contextStack.head - if (c eq null) noContextError - ActorCell.contextStack.set(contextStack.push(null)) + ActorCell.contextStack.set(null :: contextStack) c } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9dbe610195..72793513e2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,7 +13,7 @@ import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension import akka.event.Logging.LogEventException -import collection.immutable.{ TreeSet, Stack, TreeMap } +import collection.immutable.{ TreeSet, TreeMap } import akka.util.{ Unsafe, Duration, Helpers, NonFatal } //TODO: everything here for current compatibility - could be limited more @@ -173,8 +173,8 @@ trait UntypedActorContext extends ActorContext { * for! (waves hand) */ private[akka] object ActorCell { - val contextStack = new ThreadLocal[Stack[ActorContext]] { - override def initialValue = Stack[ActorContext]() + val contextStack = new ThreadLocal[List[ActorContext]] { + override def initialValue: List[ActorContext] = Nil } final val emptyCancellable: Cancellable = new Cancellable { @@ -184,7 +184,7 @@ private[akka] object ActorCell { final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) - final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) + final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty @@ -408,7 +408,7 @@ private[akka] class ActorCell( var currentMessage: Envelope = _ var actor: Actor = _ - private var behaviorStack: Stack[Actor.Receive] = Stack.empty + private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack @volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status var nextNameSequence: Long = 0 var watching: Set[ActorRef] = emptyActorRefSet @@ -511,25 +511,21 @@ private[akka] class ActorCell( //This method is in charge of setting up the contextStack and create a new instance of the Actor protected def newActor(): Actor = { - contextStack.set(contextStack.get.push(this)) + contextStack.set(this :: contextStack.get) try { - import ActorCell.behaviorStackPlaceHolder - - behaviorStack = behaviorStackPlaceHolder + behaviorStack = emptyBehaviorStack val instance = props.creator.apply() if (instance eq null) throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'") - behaviorStack = behaviorStack match { - case `behaviorStackPlaceHolder` ⇒ Stack.empty.push(instance.receive) - case newBehaviors ⇒ Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1)) - } + // If no becomes were issued, the actors behavior is its receive method + behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack instance } finally { val stackAfter = contextStack.get if (stackAfter.nonEmpty) - contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context + contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context } } @@ -683,10 +679,8 @@ private[akka] class ActorCell( } } - def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = { - if (discardOld) unbecome() - behaviorStack = behaviorStack.push(behavior) - } + def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = + behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack) /** * UntypedActorContext impl @@ -701,8 +695,9 @@ private[akka] class ActorCell( def unbecome(): Unit = { val original = behaviorStack - val popped = original.pop - behaviorStack = if (popped.isEmpty) original else popped + behaviorStack = + if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack + else original.tail } def autoReceiveMessage(msg: Envelope): Unit = { @@ -761,7 +756,7 @@ private[akka] class ActorCell( if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped")) } finally { - behaviorStack = behaviorStackPlaceHolder + behaviorStack = emptyBehaviorStack clearActorFields(a) actor = null } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 721375adda..0d13f2451a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -13,7 +13,6 @@ import java.io.Closeable import akka.dispatch.Await.{ Awaitable, CanAwait } import akka.util._ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } -import collection.immutable.Stack import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS @@ -430,7 +429,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-]*$""")) throw new IllegalArgumentException( "invalid ActorSystem name [" + name + - "], must contain only word characters (i.e. [a-zA-Z_0-9] plus non-leading '-')") + "], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-')") import ActorSystem._ @@ -685,8 +684,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, final class TerminationCallbacks extends Runnable with Awaitable[Unit] { private val lock = new ReentrantGuard - private var callbacks: Stack[Runnable] = _ //non-volatile since guarded by the lock - lock withGuard { callbacks = Stack.empty[Runnable] } + private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock + lock withGuard { callbacks = Nil } private val latch = new CountDownLatch(1) @@ -695,17 +694,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, case 0 ⇒ throw new RejectedExecutionException("Must be called prior to system shutdown.") case _ ⇒ lock withGuard { if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.") - else callbacks = callbacks.push(callback) + else callbacks ::= callback } } } final def run(): Unit = lock withGuard { - @tailrec def runNext(c: Stack[Runnable]): Stack[Runnable] = c.headOption match { - case None ⇒ Stack.empty[Runnable] - case Some(callback) ⇒ - try callback.run() catch { case e ⇒ log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) } - runNext(c.pop) + @tailrec def runNext(c: List[Runnable]): List[Runnable] = c match { + case Nil ⇒ Nil + case callback :: rest ⇒ + try callback.run() catch { case NonFatal(e) ⇒ log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) } + runNext(rest) } try { callbacks = runNext(callbacks) } finally latch.countDown() } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index fc01a5ba36..f48bbe9573 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -6,7 +6,6 @@ package akka.actor import akka.dispatch._ import akka.japi.Creator -import collection.immutable.Stack import akka.routing._ /** @@ -186,5 +185,10 @@ case class Props( * able to optimize serialization. */ private[akka] case class FromClassCreator(clazz: Class[_ <: Actor]) extends Function0[Actor] { - def apply(): Actor = clazz.newInstance + def apply(): Actor = try clazz.newInstance catch { + case iae: IllegalAccessException ⇒ + val ctor = clazz.getDeclaredConstructor() + ctor.setAccessible(true) + ctor.newInstance() + } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index b91509ac9f..0777d9aef1 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -875,3 +875,16 @@ class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class protected def notifyInfo(message: String): Unit = bus.publish(Info(logSource, logClass, message)) protected def notifyDebug(message: String): Unit = bus.publish(Debug(logSource, logClass, message)) } + +private[akka] object NoLogging extends LoggingAdapter { + def isErrorEnabled = false + def isWarningEnabled = false + def isInfoEnabled = false + def isDebugEnabled = false + + protected def notifyError(message: String): Unit = () + protected def notifyError(cause: Throwable, message: String): Unit = () + protected def notifyWarning(message: String): Unit = () + protected def notifyInfo(message: String): Unit = () + protected def notifyDebug(message: String): Unit = () +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a86bc0148c..c495e470ce 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -118,6 +118,15 @@ object Member { case _ ⇒ None } + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) + // pick highest MemberStatus + (Set.empty[Member] /: groupedByAddress) { + case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + } + } + /** * Picks the Member with the highest "priority" MemberStatus. */ @@ -130,8 +139,8 @@ object Member { case (_, Exiting) ⇒ m2 case (Leaving, _) ⇒ m1 case (_, Leaving) ⇒ m2 - case (Up, Joining) ⇒ m1 - case (Joining, Up) ⇒ m2 + case (Up, Joining) ⇒ m2 + case (Joining, Up) ⇒ m1 case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } @@ -268,21 +277,12 @@ case class Gossip( // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { - // group all members by Address => Seq[Member] - val groupedByAddress = (a ++ b).groupBy(_.address) - // pick highest MemberStatus - (Set.empty[Member] /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf) - } - } - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq). + val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members). filterNot(mergedUnreachable.contains) // 5. fresh seen table @@ -522,24 +522,28 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } // start periodic gossip to random nodes in cluster - private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { - gossip() - } + private val gossipTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + gossip() + } // start periodic heartbeat to all nodes in cluster - private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) { - heartbeat() - } + private val heartbeatTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + heartbeat() + } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { - reapUnreachableMembers() - } + private val failureDetectorReaperTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + reapUnreachableMembers() + } // start periodic leader action management (only applies for the current leader) - private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) { - leaderActions() - } + private val leaderActionsTask = + FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + leaderActions() + } createMBean() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index bdc0a1ae8b..52206f1b8c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -39,7 +39,7 @@ abstract class ConvergenceSpec "A cluster of 3 members" must { - "reach initial convergence" taggedAs LongRunningTest ignore { + "reach initial convergence" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) runOn(fourth) { @@ -49,7 +49,7 @@ abstract class ConvergenceSpec testConductor.enter("after-1") } - "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { val thirdAddress = node(third).address testConductor.enter("before-shutdown") @@ -81,7 +81,7 @@ abstract class ConvergenceSpec testConductor.enter("after-2") } - "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { runOn(fourth) { // try to join cluster.join(node(first).address) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index f4ea161b2a..4b64bb6e58 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index d9b2c7b876..88cee08191 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -21,7 +21,7 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster { leader-actions-interval = 5 s # increase the leader action task interval - unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set + unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index eda29ea0f0..0640e58175 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -19,7 +19,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster.leader-actions-interval = 5 s - akka.cluster.unreachable-nodes-reaper-interval = 30 s + akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index b4532f7efc..b5afaf404c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -5,12 +5,15 @@ package akka.cluster import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import akka.actor.{Address, ExtendedActorSystem} +import akka.actor.{ Address, ExtendedActorSystem } import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ import akka.util.Duration +import org.scalatest.Suite +import org.scalatest.TestFailedException +import scala.util.control.NoStackTrace object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" @@ -29,10 +32,28 @@ object MultiNodeClusterSpec { """) } -trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size + // Cluster tests are written so that if previous step (test method) failed + // it will most likely not be possible to run next step. This ensures + // fail fast of steps after the first failure. + private var failed = false + override protected def withFixture(test: NoArgTest): Unit = try { + if (failed) { + val e = new TestFailedException("Previous step failed", 0) + // short stack trace + e.setStackTrace(e.getStackTrace.take(1)) + throw e + } + super.withFixture(test) + } catch { + case t ⇒ + failed = true + throw t + } + /** * The cluster node instance. Needs to be lazily created. */ @@ -151,6 +172,6 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec } def roleName(address: Address): Option[RoleName] = { - testConductor.getNodes.await.find(node(_).address == address) + roles.find(node(_).address == address) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 6378a74040..fc62c17c1d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -20,7 +20,7 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster { leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-interval = 30 s + unreachable-nodes-reaper-interval = 300 s # turn "off" } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index b8486841c6..ef420ab302 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,18 +21,19 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - gossip-interval = 400 ms nr-of-deputy-nodes = 0 + # FIXME remove this (use default) when ticket #2239 has been fixed + gossip-interval = 400 ms } akka.loglevel = INFO """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala new file mode 100644 index 0000000000..0fb3cb03c4 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -0,0 +1,435 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Address +import akka.remote.testconductor.RoleName +import MemberStatus._ + +object TransitionMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString( + "akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy + +abstract class TransitionSpec + extends MultiNodeSpec(TransitionMultiJvmSpec) + with MultiNodeClusterSpec { + + import TransitionMultiJvmSpec._ + + // sorted in the order used by the cluster + def leader(roles: RoleName*) = roles.sorted.head + def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail + + def memberStatus(address: Address): MemberStatus = { + val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst { + case m if m.address == address ⇒ m.status + } + statusOption must not be (None) + statusOption.get + } + + def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address) + + def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) + + def seenLatestGossip: Set[RoleName] = { + val gossip = cluster.latestGossip + gossip.overview.seen.collect { + case (address, v) if v == gossip.version ⇒ roleName(address) + }.flatten.toSet + } + + def awaitSeen(addresses: Address*): Unit = awaitCond { + seenLatestGossip.map(node(_).address) == addresses.toSet + } + + def awaitMembers(addresses: Address*): Unit = awaitCond { + memberAddresses == addresses.toSet + } + + def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond { + memberStatus(address) == Up + } + + // implicit conversion from RoleName to Address + implicit def role2Address(role: RoleName): Address = node(role).address + + // DSL sugar for `role1 gossipTo role2` + implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role) + var gossipBarrierCounter = 0 + class RoleWrapper(fromRole: RoleName) { + def gossipTo(toRole: RoleName): Unit = { + gossipBarrierCounter += 1 + runOn(toRole) { + val g = cluster.latestGossip + testConductor.enter("before-gossip-" + gossipBarrierCounter) + awaitCond(cluster.latestGossip != g) // received gossip + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + runOn(fromRole) { + testConductor.enter("before-gossip-" + gossipBarrierCounter) + cluster.gossipTo(node(toRole).address) // send gossip + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { + testConductor.enter("before-gossip-" + gossipBarrierCounter) + testConductor.enter("after-gossip-" + gossipBarrierCounter) + } + } + } + + "A Cluster" must { + + "start nodes as singleton clusters" taggedAs LongRunningTest in { + + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + + testConductor.enter("after-1") + } + + "perform correct transitions when second joining first" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(first) + } + runOn(first) { + awaitMembers(first, second) + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + cluster.convergence.isDefined must be(false) + } + testConductor.enter("second-joined") + + first gossipTo second + runOn(second) { + members must be(Set(first, second)) + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + // we got a conflicting version in second, and therefore not convergence in second + seenLatestGossip must be(Set(second)) + cluster.convergence.isDefined must be(false) + } + + second gossipTo first + runOn(first) { + seenLatestGossip must be(Set(first, second)) + } + + first gossipTo second + runOn(second) { + seenLatestGossip must be(Set(first, second)) + } + + runOn(first, second) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Joining) + cluster.convergence.isDefined must be(true) + } + testConductor.enter("convergence-joining-2") + + runOn(leader(first, second)) { + cluster.leaderActions() + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + } + testConductor.enter("leader-actions-2") + + leader(first, second) gossipTo nonLeader(first, second).head + runOn(nonLeader(first, second).head) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + seenLatestGossip must be(Set(first, second)) + cluster.convergence.isDefined must be(true) + } + + nonLeader(first, second).head gossipTo leader(first, second) + runOn(first, second) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + seenLatestGossip must be(Set(first, second)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-2") + } + + "perform correct transitions when third joins second" taggedAs LongRunningTest in { + + runOn(third) { + cluster.join(second) + } + runOn(second) { + awaitMembers(first, second, third) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + seenLatestGossip must be(Set(second)) + } + testConductor.enter("third-joined-second") + + second gossipTo first + runOn(first) { + members must be(Set(first, second, third)) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + } + + first gossipTo third + runOn(third) { + members must be(Set(first, second, third)) + cluster.convergence.isDefined must be(false) + memberStatus(third) must be(Joining) + // conflicting version + seenLatestGossip must be(Set(third)) + } + + third gossipTo first + third gossipTo second + runOn(first, second) { + seenLatestGossip must be(Set(myself, third)) + } + + first gossipTo second + runOn(second) { + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + runOn(first, third) { + cluster.convergence.isDefined must be(false) + } + + second gossipTo first + second gossipTo third + runOn(first, second, third) { + seenLatestGossip must be(Set(first, second, third)) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Joining) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("convergence-joining-3") + + runOn(leader(first, second, third)) { + cluster.leaderActions() + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + } + testConductor.enter("leader-actions-3") + + // leader gossipTo first non-leader + leader(first, second, third) gossipTo nonLeader(first, second, third).head + runOn(nonLeader(first, second, third).head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(leader(first, second, third), myself)) + cluster.convergence.isDefined must be(false) + } + + // first non-leader gossipTo the other non-leader + nonLeader(first, second, third).head gossipTo nonLeader(first, second, third).tail.head + runOn(nonLeader(first, second, third).head) { + cluster.gossipTo(node(nonLeader(first, second, third).tail.head).address) + } + runOn(nonLeader(first, second, third).tail.head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + // and back again + nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head + runOn(nonLeader(first, second, third).head) { + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + // first non-leader gossipTo the leader + nonLeader(first, second, third).head gossipTo leader(first, second, third) + runOn(first, second, third) { + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-3") + } + + "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fourth) { + cluster.join(fifth) + awaitMembers(fourth, fifth) + cluster.gossipTo(fifth) + awaitSeen(fourth, fifth) + cluster.convergence.isDefined must be(true) + } + runOn(fifth) { + awaitMembers(fourth, fifth) + cluster.gossipTo(fourth) + awaitSeen(fourth, fifth) + cluster.gossipTo(fourth) + cluster.convergence.isDefined must be(true) + } + testConductor.enter("fourth-joined-fifth") + + testConductor.enter("after-4") + } + + "perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in { + + runOn(fourth) { + cluster.join(third) + } + runOn(third) { + awaitMembers(first, second, third, fourth) + seenLatestGossip must be(Set(third)) + } + testConductor.enter("fourth-joined-third") + + third gossipTo second + runOn(second) { + seenLatestGossip must be(Set(second, third)) + } + + second gossipTo fourth + runOn(fourth) { + members must be(roles.toSet) + // merge conflict + seenLatestGossip must be(Set(fourth)) + } + + fourth gossipTo first + fourth gossipTo second + fourth gossipTo third + fourth gossipTo fifth + runOn(first, second, third, fifth) { + members must be(roles.toSet) + seenLatestGossip must be(Set(fourth, myself)) + } + + first gossipTo fifth + runOn(fifth) { + seenLatestGossip must be(Set(first, fourth, fifth)) + } + + fifth gossipTo third + runOn(third) { + seenLatestGossip must be(Set(first, third, fourth, fifth)) + } + + third gossipTo second + runOn(second) { + seenLatestGossip must be(roles.toSet) + cluster.convergence.isDefined must be(true) + } + + second gossipTo first + second gossipTo third + second gossipTo fourth + third gossipTo fifth + + seenLatestGossip must be(roles.toSet) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + memberStatus(fourth) must be(Joining) + memberStatus(fifth) must be(Up) + cluster.convergence.isDefined must be(true) + + testConductor.enter("convergence-joining-3") + + runOn(leader(roles: _*)) { + cluster.leaderActions() + memberStatus(fourth) must be(Up) + seenLatestGossip must be(Set(myself)) + cluster.convergence.isDefined must be(false) + } + // spread the word + for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) { + x gossipTo y + } + + testConductor.enter("spread-5") + + seenLatestGossip must be(roles.toSet) + memberStatus(first) must be(Up) + memberStatus(second) must be(Up) + memberStatus(third) must be(Up) + memberStatus(fourth) must be(Up) + memberStatus(fifth) must be(Up) + cluster.convergence.isDefined must be(true) + + testConductor.enter("after-5") + } + + "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { + runOn(fifth) { + markNodeAsUnavailable(second) + cluster.reapUnreachableMembers() + cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + seenLatestGossip must be(Set(fifth)) + } + + // spread the word + val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) + for (x :: y :: Nil ← gossipRound.sliding(2)) { + x gossipTo y + } + + runOn((roles.filterNot(_ == second)): _*) { + cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + cluster.convergence.isDefined must be(false) + } + + runOn(third) { + cluster.down(second) + awaitMemberStatus(second, Down) + } + + // spread the word + val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth) + for (x :: y :: Nil ← gossipRound2.sliding(2)) { + x gossipTo y + } + + runOn((roles.filterNot(_ == second)): _*) { + cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) + memberStatus(second) must be(Down) + seenLatestGossip must be(Set(first, third, fourth, fifth)) + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after-6") + } + + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 449ebf7bff..8020010655 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2, e2)) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1, c1, e2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged1.members must be(SortedSet(a2, c1, e1)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, c1, e2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged2.members must be(SortedSet(a2, c1, e1)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) } @@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val merged1 = g1 merge g2 - merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged2.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) } @@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1)) - merged1.members.toSeq.map(_.status) must be(Seq(Up)) + merged1.members must be(SortedSet(a2)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining)) merged1.overview.unreachable must be(Set(b2, c1, d2)) merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1)) - merged2.members.toSeq.map(_.status) must be(Seq(Up)) + merged2.members must be(SortedSet(a2)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining)) merged2.overview.unreachable must be(Set(b2, c1, d2)) merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala index dbf17fa5a7..f7b7943275 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/TestConductorTransport.scala @@ -16,9 +16,9 @@ import org.jboss.netty.channel.ChannelPipelineFactory private[akka] class TestConductorTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends NettyRemoteTransport(_system, _provider) { - override def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory = + override def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = new ChannelPipelineFactory { - def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout) :+ endpoint) + def getPipeline = PipelineFactory(new NetworkFailureInjector(system) +: PipelineFactory.defaultStack(withTimeout, isClient) :+ endpoint) } } \ No newline at end of file diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index b7aeb9a7e9..e2c0a45346 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -165,6 +165,52 @@ akka { # (O) Maximum time window that a client should try to reconnect for reconnection-time-window = 600s + + ssl { + # (I&O) Enable SSL/TLS encryption. + # This must be enabled on both the client and server to work. + enable = off + + # (I) This is the Java Key Store used by the server connection + key-store = "keystore" + + # This password is used for decrypting the key store + key-store-password = "changeme" + + # (O) This is the Java Key Store used by the client connection + trust-store = "truststore" + + # This password is used for decrypting the trust store + trust-store-password = "changeme" + + # (I&O) Protocol to use for SSL encryption, choose from: + # Java 6 & 7: + # 'SSLv3', 'TLSv1' + # Java 7: + # 'TLSv1.1', 'TLSv1.2' + protocol = "TLSv1" + + # Examples: [ "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA" ] + # You need to install the JCE Unlimited Strength Jurisdiction Policy Files to use AES 256 + # More info here: http://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html#SunJCEProvider + supported-algorithms = ["TLS_RSA_WITH_AES_128_CBC_SHA"] + + # Using /dev/./urandom is only necessary when using SHA1PRNG on Linux to prevent blocking + # It is NOT as secure because it reuses the seed + # '' => defaults to /dev/random or whatever is set in java.security for example: securerandom.source=file:/dev/random + # '/dev/./urandom' => NOT '/dev/urandom' as that doesn't work according to: http://bugs.sun.com/view_bug.do?bug_id=6202721 + sha1prng-random-source = "" + + # There are three options, in increasing order of security: + # "" or SecureRandom => (default) + # "SHA1PRNG" => Can be slow because of blocking issues on Linux + # "AES128CounterRNGFast" => fastest startup and based on AES encryption algorithm + # The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java) + # "AES128CounterRNGSecure" + # "AES256CounterRNGSecure" (Install JCE Unlimited Strength Jurisdiction Policy Files first) + # Setting a value here may require you to supply the appropriate cipher suite (see supported-algorithms section above) + random-number-generator = "" + } } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index e3a2cea9a7..0917086d4d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -145,7 +145,7 @@ private[akka] class ActiveRemoteClient private[akka] ( openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) val b = new ClientBootstrap(netty.clientChannelFactory) - b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), true)) + b.setPipelineFactory(netty.createPipeline(new ActiveRemoteClientHandler(name, b, remoteAddress, localAddress, netty.timer, this), withTimeout = true, isClient = true)) b.setOption("tcpNoDelay", true) b.setOption("keepAlive", true) b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 61124cfecb..5f62bb58c8 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -64,17 +64,18 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * * @param withTimeout determines whether an IdleStateHandler shall be included */ - def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean): ChannelPipelineFactory = + def apply(endpoint: ⇒ Seq[ChannelHandler], withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = new ChannelPipelineFactory { - def getPipeline = apply(defaultStack(withTimeout) ++ endpoint) + def getPipeline = apply(defaultStack(withTimeout, isClient) ++ endpoint) } /** * Construct a default protocol stack, excluding the “head” handler (i.e. the one which * actually dispatches the received messages to the local target actors). */ - def defaultStack(withTimeout: Boolean): Seq[ChannelHandler] = - (if (withTimeout) timeout :: Nil else Nil) ::: + def defaultStack(withTimeout: Boolean, isClient: Boolean): Seq[ChannelHandler] = + (if (settings.EnableSSL) NettySSLSupport(settings, NettyRemoteTransport.this.log, isClient) :: Nil else Nil) ::: + (if (withTimeout) timeout :: Nil else Nil) ::: msgFormat ::: authenticator ::: executionHandler :: @@ -122,8 +123,8 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * This method is factored out to provide an extension point in case the * pipeline shall be changed. It is recommended to use */ - def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory = - PipelineFactory(Seq(endpoint), withTimeout) + def createPipeline(endpoint: ⇒ ChannelHandler, withTimeout: Boolean, isClient: Boolean): ChannelPipelineFactory = + PipelineFactory(Seq(endpoint), withTimeout, isClient) private val remoteClients = new HashMap[Address, RemoteClient] private val clientsLock = new ReentrantReadWriteLock diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala new file mode 100644 index 0000000000..9440c09c95 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.remote.netty + +import org.jboss.netty.handler.ssl.SslHandler +import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext } +import akka.remote.RemoteTransportException +import akka.event.LoggingAdapter +import java.io.{ IOException, FileNotFoundException, FileInputStream } +import java.security.{ SecureRandom, GeneralSecurityException, KeyStore, Security } +import akka.security.provider.AkkaProvider + +/** + * Used for adding SSL support to Netty pipeline + * Internal use only + */ +private[akka] object NettySSLSupport { + /** + * Construct a SSLHandler which can be inserted into a Netty server/client pipeline + */ + def apply(settings: NettySettings, log: LoggingAdapter, isClient: Boolean): SslHandler = + if (isClient) initializeClientSSL(settings, log) else initializeServerSSL(settings, log) + + def initializeCustomSecureRandom(rngName: Option[String], sourceOfRandomness: Option[String], log: LoggingAdapter): SecureRandom = { + /** + * According to this bug report: http://bugs.sun.com/view_bug.do?bug_id=6202721 + * Using /dev/./urandom is only necessary when using SHA1PRNG on Linux + * Use 'new SecureRandom()' instead of 'SecureRandom.getInstance("SHA1PRNG")' to avoid having problems + */ + sourceOfRandomness foreach { path ⇒ System.setProperty("java.security.egd", path) } + + val rng = rngName match { + case Some(r @ ("AES128CounterRNGFast" | "AES128CounterRNGSecure" | "AES256CounterRNGSecure")) ⇒ + log.debug("SSL random number generator set to: {}", r) + val akka = new AkkaProvider + Security.addProvider(akka) + SecureRandom.getInstance(r, akka) + case Some("SHA1PRNG") ⇒ + log.debug("SSL random number generator set to: SHA1PRNG") + // This needs /dev/urandom to be the source on Linux to prevent problems with /dev/random blocking + // However, this also makes the seed source insecure as the seed is reused to avoid blocking (not a problem on FreeBSD). + SecureRandom.getInstance("SHA1PRNG") + case Some(unknown) ⇒ + log.debug("Unknown SSLRandomNumberGenerator [{}] falling back to SecureRandom", unknown) + new SecureRandom + case None ⇒ + log.debug("SSLRandomNumberGenerator not specified, falling back to SecureRandom") + new SecureRandom + } + rng.nextInt() // prevent stall on first access + rng + } + + def initializeClientSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = { + log.debug("Client SSL is enabled, initialising ...") + + def constructClientContext(settings: NettySettings, log: LoggingAdapter, trustStorePath: String, trustStorePassword: String, protocol: String): Option[SSLContext] = + try { + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + trustStore.load(new FileInputStream(trustStorePath), trustStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? + trustManagerFactory.init(trustStore) + val trustManagers: Array[TrustManager] = trustManagerFactory.getTrustManagers + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(null, trustManagers, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } + } catch { + case e: FileNotFoundException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because trust store could not be loaded", e) + case e: IOException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because: " + e.getMessage, e) + case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Client SSL connection could not be established because SSL context could not be constructed", e) + } + + ((settings.SSLTrustStore, settings.SSLTrustStorePassword, settings.SSLProtocol) match { + case (Some(trustStore), Some(password), Some(protocol)) ⇒ constructClientContext(settings, log, trustStore, password, protocol) + case (trustStore, password, protocol) ⇒ throw new GeneralSecurityException( + "One or several SSL trust store settings are missing: [trust-store: %s] [trust-store-password: %s] [protocol: %s]".format( + trustStore, + password, + protocol)) + }) match { + case Some(context) ⇒ + log.debug("Using client SSL context to create SSLEngine ...") + val sslEngine = context.createSSLEngine + sslEngine.setUseClientMode(true) + sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString)) + new SslHandler(sslEngine) + case None ⇒ + throw new GeneralSecurityException( + """Failed to initialize client SSL because SSL context could not be found." + + "Make sure your settings are correct: [trust-store: %s] [trust-store-password: %s] [protocol: %s]""".format( + settings.SSLTrustStore, + settings.SSLTrustStorePassword, + settings.SSLProtocol)) + } + } + + def initializeServerSSL(settings: NettySettings, log: LoggingAdapter): SslHandler = { + log.debug("Server SSL is enabled, initialising ...") + + def constructServerContext(settings: NettySettings, log: LoggingAdapter, keyStorePath: String, keyStorePassword: String, protocol: String): Option[SSLContext] = + try { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + keyStore.load(new FileInputStream(keyStorePath), keyStorePassword.toCharArray) //FIXME does the FileInputStream need to be closed? + factory.init(keyStore, keyStorePassword.toCharArray) + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, log)); ctx } + } catch { + case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) + case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e) + case e: GeneralSecurityException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because SSL context could not be constructed", e) + } + + ((settings.SSLKeyStore, settings.SSLKeyStorePassword, settings.SSLProtocol) match { + case (Some(keyStore), Some(password), Some(protocol)) ⇒ constructServerContext(settings, log, keyStore, password, protocol) + case (keyStore, password, protocol) ⇒ throw new GeneralSecurityException( + "SSL key store settings went missing. [key-store: %s] [key-store-password: %s] [protocol: %s]".format(keyStore, password, protocol)) + }) match { + case Some(context) ⇒ + log.debug("Using server SSL context to create SSLEngine ...") + val sslEngine = context.createSSLEngine + sslEngine.setUseClientMode(false) + sslEngine.setEnabledCipherSuites(settings.SSLSupportedAlgorithms.toArray.map(_.toString)) + new SslHandler(sslEngine) + case None ⇒ throw new GeneralSecurityException( + """Failed to initialize server SSL because SSL context could not be found. + Make sure your settings are correct: [key-store: %s] [key-store-password: %s] [protocol: %s]""".format( + settings.SSLKeyStore, + settings.SSLKeyStorePassword, + settings.SSLProtocol)) + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 04dfbe525e..895fea9212 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -12,7 +12,6 @@ import org.jboss.netty.channel.group.ChannelGroup import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } import org.jboss.netty.handler.execution.ExecutionHandler -import akka.event.Logging import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } import akka.remote.{ RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage } import akka.actor.Address @@ -40,7 +39,7 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { private val bootstrap = { val b = new ServerBootstrap(factory) - b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), false)) + b.setPipelineFactory(netty.createPipeline(new RemoteServerHandler(openChannels, netty), withTimeout = false, isClient = false)) b.setOption("backlog", settings.Backlog) b.setOption("tcpNoDelay", true) b.setOption("child.keepAlive", true) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 0d105eda1d..024ed104c3 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -86,4 +86,55 @@ private[akka] class NettySettings(config: Config, val systemName: String) { case sz ⇒ sz } + val SSLKeyStore = getString("ssl.key-store") match { + case "" ⇒ None + case keyStore ⇒ Some(keyStore) + } + + val SSLTrustStore = getString("ssl.trust-store") match { + case "" ⇒ None + case trustStore ⇒ Some(trustStore) + } + + val SSLKeyStorePassword = getString("ssl.key-store-password") match { + case "" ⇒ None + case password ⇒ Some(password) + } + + val SSLTrustStorePassword = getString("ssl.trust-store-password") match { + case "" ⇒ None + case password ⇒ Some(password) + } + + val SSLSupportedAlgorithms = getStringList("ssl.supported-algorithms").toArray.toSet + + val SSLProtocol = getString("ssl.protocol") match { + case "" ⇒ None + case protocol ⇒ Some(protocol) + } + + val SSLRandomSource = getString("ssl.sha1prng-random-source") match { + case "" ⇒ None + case path ⇒ Some(path) + } + + val SSLRandomNumberGenerator = getString("ssl.random-number-generator") match { + case "" ⇒ None + case rng ⇒ Some(rng) + } + + val EnableSSL = { + val enableSSL = getBoolean("ssl.enable") + if (enableSSL) { + if (SSLProtocol.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl.enable is turned on but no protocol is defined in 'akka.remote.netty.ssl.protocol'.") + if (SSLKeyStore.isEmpty && SSLTrustStore.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl.enable is turned on but no key/trust store is defined in 'akka.remote.netty.ssl.key-store' / 'akka.remote.netty.ssl.trust-store'.") + if (SSLKeyStore.isDefined && SSLKeyStorePassword.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl.key-store' is defined but no key-store password is defined in 'akka.remote.netty.ssl.key-store-password'.") + if (SSLTrustStore.isDefined && SSLTrustStorePassword.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.ssl.trust-store' is defined but no trust-store password is defined in 'akka.remote.netty.ssl.trust-store-password'.") + } + enableSSL + } } diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala new file mode 100644 index 0000000000..c355f5a548 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGFast.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.security.provider + +import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } +import java.security.SecureRandom + +/** + * Internal API + */ +class AES128CounterRNGFast extends java.security.SecureRandomSpi { + private val rng = new AESCounterRNG(new SecureRandomSeedGenerator()) + + /** + * This is managed internally only + */ + override protected def engineSetSeed(seed: Array[Byte]): Unit = () + + /** + * Generates a user-specified number of random bytes. + * + * @param bytes the array to be filled in with random bytes. + */ + override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) + + /** + * Returns the given number of seed bytes. This call may be used to + * seed other random number generators. + * + * @param numBytes the number of seed bytes to generate. + * @return the seed bytes. + */ + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandom).generateSeed(numBytes) +} + diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala new file mode 100644 index 0000000000..846476cc2d --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterRNGSecure.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.security.provider + +import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } + +/** + * Internal API + */ +class AES128CounterRNGSecure extends java.security.SecureRandomSpi { + private val rng = new AESCounterRNG() + + /** + * This is managed internally only + */ + override protected def engineSetSeed(seed: Array[Byte]): Unit = () + + /** + * Generates a user-specified number of random bytes. + * + * @param bytes the array to be filled in with random bytes. + */ + override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) + + /** + * Returns the given number of seed bytes. This call may be used to + * seed other random number generators. + * + * @param numBytes the number of seed bytes to generate. + * @return the seed bytes. + */ + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) +} + diff --git a/akka-remote/src/main/scala/akka/security/provider/AES256CounterRNGSecure.scala b/akka-remote/src/main/scala/akka/security/provider/AES256CounterRNGSecure.scala new file mode 100644 index 0000000000..d942938411 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/AES256CounterRNGSecure.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.security.provider + +import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } + +/** + * Internal API + */ +class AES256CounterRNGSecure extends java.security.SecureRandomSpi { + private val rng = new AESCounterRNG(32) // Magic number is magic + + /** + * This is managed internally only + */ + override protected def engineSetSeed(seed: Array[Byte]): Unit = () + + /** + * Generates a user-specified number of random bytes. + * + * @param bytes the array to be filled in with random bytes. + */ + override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) + + /** + * Returns the given number of seed bytes. This call may be used to + * seed other random number generators. + * + * @param numBytes the number of seed bytes to generate. + * @return the seed bytes. + */ + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) +} + diff --git a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala new file mode 100644 index 0000000000..f44aeae584 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.security.provider + +import java.security.{ PrivilegedAction, AccessController, Provider } + +/** + * A provider that for AES128CounterRNGFast, a cryptographically secure random number generator through SecureRandom + */ +final class AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implements a secure AES random number generator") { + AccessController.doPrivileged(new PrivilegedAction[AkkaProvider] { + def run = { + //SecureRandom + put("SecureRandom.AES128CounterRNGFast", "akka.security.provider.AES128CounterRNGFast") + put("SecureRandom.AES128CounterRNGSecure", "akka.security.provider.AES128CounterRNGSecure") + put("SecureRandom.AES256CounterRNGSecure", "akka.security.provider.AES256CounterRNGSecure") + + //Implementation type: software or hardware + put("SecureRandom.AES128CounterRNGFast ImplementedIn", "Software") + put("SecureRandom.AES128CounterRNGSecure ImplementedIn", "Software") + put("SecureRandom.AES256CounterRNGSecure ImplementedIn", "Software") + null //Magic null is magic + } + }) +} + diff --git a/akka-remote/src/test/resources/keystore b/akka-remote/src/test/resources/keystore new file mode 100644 index 0000000000..ee5581d930 Binary files /dev/null and b/akka-remote/src/test/resources/keystore differ diff --git a/akka-remote/src/test/resources/truststore b/akka-remote/src/test/resources/truststore new file mode 100644 index 0000000000..cc07616dad Binary files /dev/null and b/akka-remote/src/test/resources/truststore differ diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala new file mode 100644 index 0000000000..505ce180cf --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -0,0 +1,198 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote + +import akka.testkit._ +import akka.actor._ +import com.typesafe.config._ +import akka.dispatch.{ Await, Future } +import akka.pattern.ask +import java.io.File +import akka.event.{ NoLogging, LoggingAdapter } +import java.security.{ NoSuchAlgorithmException, SecureRandom, PrivilegedAction, AccessController } +import netty.{ NettySettings, NettySSLSupport } +import javax.net.ssl.SSLException +import akka.util.{ Timeout, Duration } +import akka.util.duration._ + +object Configuration { + // set this in your JAVA_OPTS to see all ssl debug info: "-Djavax.net.debug=ssl,keymanager" + // The certificate will expire in 2109 + private val trustStore = getClass.getClassLoader.getResource("truststore").getPath + private val keyStore = getClass.getClassLoader.getResource("keystore").getPath + private val conf = """ + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + ssl { + enable = on + trust-store = "%s" + key-store = "%s" + random-number-generator = "%s" + supported-algorithms = [%s] + sha1prng-random-source = "/dev/./urandom" + } + } + actor.deployment { + /blub.remote = "akka://remote-sys@localhost:12346" + /looker/child.remote = "akka://remote-sys@localhost:12346" + /looker/child/grandchild.remote = "akka://Ticket1978CommunicationSpec@localhost:12345" + } + } + """ + + def getCipherConfig(cipher: String, enabled: String*): (String, Boolean, Config) = try { + + val config = ConfigFactory.parseString("akka.remote.netty.port=12345").withFallback(ConfigFactory.parseString(conf.format(trustStore, keyStore, cipher, enabled.mkString(", ")))) + val fullConfig = config.withFallback(AkkaSpec.testConf).withFallback(ConfigFactory.load).getConfig("akka.remote.netty") + val settings = new NettySettings(fullConfig, "placeholder") + + val rng = NettySSLSupport.initializeCustomSecureRandom(settings.SSLRandomNumberGenerator, settings.SSLRandomSource, NoLogging) + + rng.nextInt() // Has to work + settings.SSLRandomNumberGenerator foreach { sRng ⇒ rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) } + + val engine = NettySSLSupport.initializeServerSSL(settings, NoLogging).getEngine + val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet + val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet + gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) + gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) + engine.getSupportedProtocols.contains(settings.SSLProtocol.get) || (throw new IllegalArgumentException(settings.SSLProtocol.get)) + + (cipher, true, config) + } catch { + case (_: IllegalArgumentException) | (_: NoSuchAlgorithmException) | (_: SSLException) ⇒ (cipher, false, AkkaSpec.testConf) // Cannot match against the message since the message might be localized :S + } +} + +import Configuration.getCipherConfig + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("SHA1PRNG", "TLS_RSA_WITH_AES_128_CBC_SHA")) + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978AES128CounterRNGFastSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGFast", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) + +/** + * Both of the Secure variants require access to the Internet to access random.org. + */ +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978AES128CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterRNGSecure", "TLS_RSA_WITH_AES_128_CBC_SHA")) + +/** + * Both of the Secure variants require access to the Internet to access random.org. + */ +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978AES256CounterRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterRNGSecure", "TLS_RSA_WITH_AES_256_CBC_SHA")) + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978DefaultRNGSecureSpec extends Ticket1978CommunicationSpec(getCipherConfig("", "TLS_RSA_WITH_AES_128_CBC_SHA")) + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978NonExistingRNGSecureSpec extends Ticket1978CommunicationSpec(("NonExistingRNG", false, AkkaSpec.testConf)) + +abstract class Ticket1978CommunicationSpec(val cipherEnabledconfig: (String, Boolean, Config)) extends AkkaSpec(cipherEnabledconfig._3) with ImplicitSender { + + implicit val timeout: Timeout = Timeout(30 seconds) + + import RemoteCommunicationSpec._ + + val other = ActorSystem("remote-sys", ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config)) + + override def atTermination() { + other.shutdown() + } + + "SSL Remoting" must { + if (cipherEnabledconfig._2) { + val remote = other.actorOf(Props(new Actor { def receive = { case "ping" ⇒ sender ! (("pong", sender)) } }), "echo") + + val here = system.actorFor("akka://remote-sys@localhost:12346/user/echo") + + "support remote look-ups" in { + here ! "ping" + expectMsgPF(timeout.duration) { + case ("pong", s: AnyRef) if s eq testActor ⇒ true + } + } + + "send error message for wrong address" in { + within(timeout.duration) { + EventFilter.error(start = "dropping", occurrences = 1).intercept { + system.actorFor("akka://remotesys@localhost:12346/user/echo") ! "ping" + }(other) + } + } + + "support ask" in { + Await.result(here ? "ping", timeout.duration) match { + case ("pong", s: akka.pattern.PromiseActorRef) ⇒ // good + case m ⇒ fail(m + " was not (pong, AskActorRef)") + } + } + + "send dead letters on remote if actor does not exist" in { + within(timeout.duration) { + EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { + system.actorFor("akka://remote-sys@localhost:12346/does/not/exist") ! "buh" + }(other) + } + } + + "create and supervise children on remote node" in { + within(timeout.duration) { + val r = system.actorOf(Props[Echo], "blub") + r.path.toString must be === "akka://remote-sys@localhost:12346/remote/Ticket1978CommunicationSpec@localhost:12345/user/blub" + r ! 42 + expectMsg(42) + EventFilter[Exception]("crash", occurrences = 1).intercept { + r ! new Exception("crash") + }(other) + expectMsg("preRestart") + r ! 42 + expectMsg(42) + system.stop(r) + expectMsg("postStop") + } + } + + "look-up actors across node boundaries" in { + within(timeout.duration) { + val l = system.actorOf(Props(new Actor { + def receive = { + case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) + case s: String ⇒ sender ! context.actorFor(s) + } + }), "looker") + l ! (Props[Echo], "child") + val r = expectMsgType[ActorRef] + r ! (Props[Echo], "grandchild") + val remref = expectMsgType[ActorRef] + remref.isInstanceOf[LocalActorRef] must be(true) + val myref = system.actorFor(system / "looker" / "child" / "grandchild") + myref.isInstanceOf[RemoteActorRef] must be(true) + myref ! 43 + expectMsg(43) + lastSender must be theSameInstanceAs remref + r.asInstanceOf[RemoteActorRef].getParent must be(l) + system.actorFor("/user/looker/child") must be theSameInstanceAs r + Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l + Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l + } + } + + "not fail ask across node boundaries" in { + val f = for (_ ← 1 to 1000) yield here ? "ping" mapTo manifest[(String, ActorRef)] + Await.result(Future.sequence(f), timeout.duration).map(_._1).toSet must be(Set("pong")) + } + } else { + "not be run when the cipher is not supported by the platform this test is currently being executed on" ignore { + + } + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala new file mode 100644 index 0000000000..4017f1cfcc --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978ConfigSpec.scala @@ -0,0 +1,48 @@ +package akka.remote + +import akka.testkit._ +import akka.actor._ +import com.typesafe.config._ +import akka.actor.ExtendedActorSystem +import akka.util.duration._ +import akka.util.Duration +import akka.remote.netty.NettyRemoteTransport +import java.util.ArrayList + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978ConfigSpec extends AkkaSpec(""" +akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port = 12345 + } + actor.deployment { + /blub.remote = "akka://remote-sys@localhost:12346" + /looker/child.remote = "akka://remote-sys@localhost:12346" + /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" + } +} +""") with ImplicitSender with DefaultTimeout { + + "SSL Remoting" must { + "be able to parse these extra Netty config elements" in { + val settings = + system.asInstanceOf[ExtendedActorSystem] + .provider.asInstanceOf[RemoteActorRefProvider] + .transport.asInstanceOf[NettyRemoteTransport] + .settings + import settings._ + + EnableSSL must be(false) + SSLKeyStore must be(Some("keystore")) + SSLKeyStorePassword must be(Some("changeme")) + SSLTrustStore must be(Some("truststore")) + SSLTrustStorePassword must be(Some("changeme")) + SSLProtocol must be(Some("TLSv1")) + SSLSupportedAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA")) + SSLRandomSource must be(None) + SSLRandomNumberGenerator must be(None) + } + } +} diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index 08826fa5dd..835a596a4a 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -75,7 +75,9 @@ object AkkaKernelPlugin extends Plugin { copyFiles(libFiles(cp, conf.libFilter), distLibPath) copyFiles(conf.additionalLibs, distLibPath) - for (subTarget ← subProjectDependencies.map(_.target)) { + for (subProjectDependency ← subProjectDependencies) { + val subTarget = subProjectDependency.target + EvaluateTask(buildStruct, packageBin in Compile, st, subProjectDependency.projectRef) copyJars(subTarget, distLibPath) } log.info("Distribution created.") @@ -220,10 +222,10 @@ object AkkaKernelPlugin extends Plugin { }.toList val target = setting(Keys.crossTarget, "Missing crossTarget directory") - SubProjectInfo(project.id, target, subProjects) + SubProjectInfo(projectRef, target, subProjects) } - private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) { + private case class SubProjectInfo(projectRef: ProjectRef, target: File, subProjects: Seq[SubProjectInfo]) { def recursiveSubProjects: Set[SubProjectInfo] = { val flatSubProjects = for { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index ed151b6b12..f8efe4e2e5 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -5,9 +5,7 @@ package akka.testkit import akka.actor._ -import akka.util.Duration import java.util.concurrent.atomic.AtomicLong -import scala.collection.immutable.Stack import akka.dispatch._ import akka.pattern.ask diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c0fb6e5267..4a5a880bb0 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -486,19 +486,21 @@ trait TestKitBase { @tailrec def doit(acc: List[T], count: Int): List[T] = { - if (count >= messages) return acc.reverse - receiveOne((stop - now) min idle) - lastMessage match { - case NullMessage ⇒ - lastMessage = msg - acc.reverse - case RealMessage(o, _) if (f isDefinedAt o) ⇒ - msg = lastMessage - doit(f(o) :: acc, count + 1) - case RealMessage(o, _) ⇒ - queue.offerFirst(lastMessage) - lastMessage = msg - acc.reverse + if (count >= messages) acc.reverse + else { + receiveOne((stop - now) min idle) + lastMessage match { + case NullMessage ⇒ + lastMessage = msg + acc.reverse + case RealMessage(o, _) if (f isDefinedAt o) ⇒ + msg = lastMessage + doit(f(o) :: acc, count + 1) + case RealMessage(o, _) ⇒ + queue.offerFirst(lastMessage) + lastMessage = msg + acc.reverse + } } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java index 60a887f554..36c063feaa 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedIncrementTest.java @@ -57,7 +57,7 @@ public class UntypedCoordinatedIncrementTest { Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS); @Before - public void initialise() { + public void initialize() { counters = new ArrayList(); for (int i = 1; i <= numCounters; i++) { final String name = "counter" + i; diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java index cadc4828b1..b7dc99389a 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedTransactorTest.java @@ -58,7 +58,7 @@ public class UntypedTransactorTest { Timeout timeout = new Timeout(timeoutSeconds, TimeUnit.SECONDS); @Before - public void initialise() { + public void initialize() { counters = new ArrayList(); for (int i = 1; i <= numCounters; i++) { final String name = "counter" + i; diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index e848809644..71b7b185f0 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -9,14 +9,17 @@ import akka.actor._ import akka.dispatch.{ Promise, Future } import akka.event.Logging import annotation.tailrec -import akka.util.Duration import java.util.concurrent.TimeUnit +import collection.mutable.ListBuffer +import akka.util.{ NonFatal, Duration } private[zeromq] object ConcurrentSocketActor { private sealed trait PollMsg private case object Poll extends PollMsg private case object PollCareful extends PollMsg + private case object Flush + private class NoSocketHandleException() extends Exception("Couldn't create a zeromq socket.") private val DefaultContext = Context() @@ -32,19 +35,28 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A import SocketType.{ ZMQSocketType ⇒ ST } params.collectFirst { case t: ST ⇒ t }.getOrElse(throw new IllegalArgumentException("A socket type is required")) } + private val socket: Socket = zmqContext.socket(socketType) private val poller: Poller = zmqContext.poller private val log = Logging(context.system, this) + private val pendingSends = new ListBuffer[Seq[Frame]] + def receive = { case m: PollMsg ⇒ doPoll(m) - case ZMQMessage(frames) ⇒ sendMessage(frames) + case ZMQMessage(frames) ⇒ handleRequest(Send(frames)) case r: Request ⇒ handleRequest(r) + case Flush ⇒ flush() case Terminated(_) ⇒ context stop self } private def handleRequest(msg: Request): Unit = msg match { - case Send(frames) ⇒ sendMessage(frames) + case Send(frames) ⇒ + if (frames.nonEmpty) { + val flushNow = pendingSends.isEmpty + pendingSends.append(frames) + if (flushNow) flush() + } case opt: SocketOption ⇒ handleSocketOption(opt) case q: SocketOptionQuery ⇒ handleSocketOptionQuery(q) } @@ -117,48 +129,46 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A } } - private def setupConnection() { + private def setupConnection(): Unit = { params filter (_.isInstanceOf[SocketConnectOption]) foreach { self ! _ } params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ } } - private def deserializerFromParams = { + private def deserializerFromParams: Deserializer = params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer + + private def setupSocket() = params foreach { + case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently + case m ⇒ self ! m } - private def setupSocket() = { - params foreach { - case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently - case m ⇒ self ! m + override def preRestart(reason: Throwable, message: Option[Any]): Unit = context.children foreach context.stop //Do not call postStop + + override def postRestart(reason: Throwable): Unit = () // Do nothing + + override def postStop: Unit = try { + if (socket != null) { + poller.unregister(socket) + socket.close } - } + } finally notifyListener(Closed) - override def preRestart(reason: Throwable, message: Option[Any]) { - context.children foreach context.stop //Do not call postStop - } - - override def postRestart(reason: Throwable) {} //Do nothing - - override def postStop { - try { - if (socket != null) { - poller.unregister(socket) - socket.close + @tailrec private def flushMessage(i: Seq[Frame]): Boolean = + if (i.isEmpty) + true + else { + val head = i.head + val tail = i.tail + if (socket.send(head.payload.toArray, if (tail.nonEmpty) JZMQ.SNDMORE else 0)) flushMessage(tail) + else { + pendingSends.prepend(i) // Reenqueue the rest of the message so the next flush takes care of it + self ! Flush + false } - } finally { - notifyListener(Closed) } - } - private def sendMessage(frames: Seq[Frame]) { - def sendBytes(bytes: Seq[Byte], flags: Int) = socket.send(bytes.toArray, flags) - val iter = frames.iterator - while (iter.hasNext) { - val payload = iter.next.payload - val flags = if (iter.hasNext) JZMQ.SNDMORE else 0 - sendBytes(payload, flags) - } - } + @tailrec private def flush(): Unit = + if (pendingSends.nonEmpty && flushMessage(pendingSends.remove(0))) flush() // Flush while things are going well // this is a “PollMsg=>Unit” which either polls or schedules Poll, depending on the sign of the timeout private val doPollTimeout = { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 85a9ea6642..4bf52a41e3 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -139,8 +139,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { */ def newSocket(socketParameters: SocketOption*): ActorRef = { implicit val timeout = NewSocketTimeout - val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef] - Await.result(req, timeout.duration) + Await.result((zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef], timeout.duration) } /** @@ -248,9 +247,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { case _ ⇒ false } - def receive = { - case p: Props ⇒ sender ! context.actorOf(p) - } + def receive = { case p: Props ⇒ sender ! context.actorOf(p) } }), "zeromq") } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7b2b7846e9..c0e3a72a68 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -461,7 +461,7 @@ object Dependencies { ) val remote = Seq( - netty, protobuf, Test.junit, Test.scalatest + netty, protobuf, uncommonsMath, Test.junit, Test.scalatest ) val cluster = Seq(Test.junit, Test.scalatest) @@ -502,6 +502,7 @@ object Dependency { val ScalaStm = "0.5" val Scalatest = "1.6.1" val Slf4j = "1.6.4" + val UncommonsMath = "1.2.2a" } // Compile @@ -513,6 +514,7 @@ object Dependency { val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala) val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT + val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % V.UncommonsMath // ApacheV2 val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.6" // ApacheV2 // Test