diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 30a6aa9a9f..e8f5ed9d75 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -803,33 +803,6 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa assert(Await.result(z, timeout.duration) === 42) } - "futureFlowLoops" in { - import Future.flow - import akka.util.cps._ - - val count = 1000 - - val promises = List.fill(count)(Promise[Int]()) - - flow { - var i = 0 - val iter = promises.iterator - whileC(iter.hasNext) { - iter.next << i - i += 1 - } - } - - var i = 0 - promises foreach { p ⇒ - assert(Await.result(p, timeout.duration) === i) - i += 1 - } - - assert(i === count) - - } - "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) @@ -1038,4 +1011,4 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } def checkType[A: Manifest, B](in: Future[A], refmanifest: Manifest[B]): Boolean = manifest[A] == refmanifest -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6917cbe078..d59baaf6fe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -257,7 +257,7 @@ private[akka] object ActorCell { def cancel() {} } - final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable) + final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable) final val emptyBehaviorStack: List[Actor.Receive] = Nil @@ -448,29 +448,24 @@ private[akka] class ActorCell( final def provider = system.provider - override final def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None - - override final def setReceiveTimeout(timeout: Duration): Unit = setReceiveTimeout(Some(timeout)) - - final def setReceiveTimeout(timeout: Option[Duration]): Unit = { - val timeoutMs = timeout match { - case None ⇒ -1L - case Some(duration) ⇒ - val ms = duration.toMillis - if (ms <= 0) -1L - // 1 millisecond is minimum supported - else if (ms < 1) 1L - else ms - } - receiveTimeoutData = (timeoutMs, receiveTimeoutData._2) + override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { + case Duration.Undefined ⇒ None + case duration ⇒ Some(duration) } + final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) + + override final def setReceiveTimeout(timeout: Duration): Unit = + receiveTimeoutData = ( + if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, + receiveTimeoutData._2) + final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None) /** * In milliseconds */ - var receiveTimeoutData: (Long, Cancellable) = emptyReceiveTimeoutData + var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData @volatile private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer @@ -1014,10 +1009,10 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && !mailbox.hasMessages) { + if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout)) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/util/cps/package.scala b/akka-actor/src/main/scala/akka/util/cps/package.scala deleted file mode 100644 index a1b4bc39eb..0000000000 --- a/akka-actor/src/main/scala/akka/util/cps/package.scala +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.util - -import scala.util.continuations._ -import akka.dispatch.MessageDispatcher - -//FIXME Needs docs -package object cps { - def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in) - - def loopC[A, U](block: ⇒ U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = - loop.loopC(block) - - def whileC[A, U](test: ⇒ Boolean)(block: ⇒ U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = - loop.whileC(test)(block) - - def repeatC[A, U](times: Int)(block: ⇒ U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = - loop.repeatC(times)(block) -} - -package cps { - object CPSLoop extends DefaultCPSLoop { - - implicit object FutureCPSLoop extends FutureCPSLoop - } - - trait CPSLoop[A] { - def loopC[U](block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] - def whileC[U](test: ⇒ Boolean)(block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] - def repeatC[U](times: Int)(block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] - } - - import akka.dispatch.{ Future, Promise } - class FutureCPSLoop extends CPSLoop[Future[Any]] { - - def loopC[U](block: ⇒ U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] = - shift { c: (Unit ⇒ Future[Any]) ⇒ - Future(reify(block) flatMap (_ ⇒ reify(loopC(block))) foreach c) - } - - def whileC[U](test: ⇒ Boolean)(block: ⇒ U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] = - shift { c: (Unit ⇒ Future[Any]) ⇒ - if (test) - Future(reify(block) flatMap (_ ⇒ reify(whileC(test)(block))) foreach c) - else - Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c) - } - - def repeatC[U](times: Int)(block: ⇒ U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] = - shift { c: (Unit ⇒ Future[Any]) ⇒ - if (times > 0) - Future(reify(block) flatMap (_ ⇒ reify(repeatC(times - 1)(block))) foreach c) - else - Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c) - } - } - - trait DefaultCPSLoop { - implicit def defaultCPSLoop[A] = new CPSLoop[A] { - - def loopC[U](block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = { - block - loopC(block) - } - - def whileC[U](test: ⇒ Boolean)(block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = { - if (test) { - block - whileC(test)(block) - } - } - - def repeatC[U](times: Int)(block: ⇒ U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = { - if (times > 0) { - block - repeatC(times - 1)(block) - } - } - - } - } -} diff --git a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala index 6bbae25dfb..99a3ab6dbc 100644 --- a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -2,13 +2,13 @@ package akka.agent import language.postfixOps -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import scala.concurrent.util.Duration import scala.concurrent.util.duration._ import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{ CountDownLatch, TimeUnit } class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { val latch = new CountDownLatch(num) @@ -38,14 +38,15 @@ class AgentSpec extends AkkaSpec { "maintain order between send and sendOff" in { val countDown = new CountDownFunction[String] + val l1, l2 = new CountDownLatch(1) val agent = Agent("a") agent send (_ + "b") - val longRunning = (s: String) ⇒ { Thread.sleep(2000); s + "c" } - agent sendOff longRunning + agent.sendOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) + l1.await(5, TimeUnit.SECONDS) agent send (_ + "d") agent send countDown - + l2.countDown countDown.await(5 seconds) agent() must be("abcd") @@ -53,16 +54,17 @@ class AgentSpec extends AkkaSpec { } "maintain order between alter and alterOff" in { - + val l1, l2 = new CountDownLatch(1) val agent = Agent("a") val r1 = agent.alter(_ + "b")(5000) - val r2 = agent.alterOff((s: String) ⇒ { Thread.sleep(2000); s + "c" })(5000) + val r2 = agent.alterOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })(5000) + l1.await(5, TimeUnit.SECONDS) val r3 = agent.alter(_ + "d")(5000) + val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":")) + l2.countDown - Await.result(r1, 5 seconds) must be === "ab" - Await.result(r2, 5 seconds) must be === "abc" - Await.result(r3, 5 seconds) must be === "abcd" + Await.result(result, 5 seconds) must be === "ab:abc:abcd" agent() must be("abcd") diff --git a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala index 83590004b9..43f9498bdd 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerRegistryTest.scala @@ -21,10 +21,10 @@ class ProducerRegistryTest extends WordSpec with MustMatchers with SharedCamelSy "register a started SendProcessor for the producer, which is stopped when the actor is stopped" in { val actorRef = newEmptyActor val processor = registerProcessorFor(actorRef) - camel.awaitActivation(actorRef, 1 second) + camel.awaitActivation(actorRef, 5 second) processor.isStarted must be(true) system.stop(actorRef) - camel.awaitDeactivation(actorRef, 1 second) + camel.awaitDeactivation(actorRef, 5 second) (processor.isStopping || processor.isStopped) must be(true) } "remove and stop the SendProcessor if the actorRef is registered" in { diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b60b91ec43..706064d42c 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -25,7 +25,7 @@ akka { # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? # Using auto-down implies that two separate clusters will automatically be formed in case of # network partition. - auto-down = on + auto-down = off # the number of gossip daemon actors nr-of-gossip-daemons = 4 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 010406e4dd..9e45b1529b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -18,10 +18,8 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-interval = 30 s - }""") + # turn off unreachable reaper + akka.cluster.unreachable-nodes-reaper-interval = 300 s""") .withFallback(MultiNodeClusterSpec.clusterConfig))) } @@ -35,7 +33,7 @@ abstract class LeaderLeavingSpec import LeaderLeavingMultiJvmSpec._ - val leaderHandoffWaitingTime = 30.seconds.dilated + val leaderHandoffWaitingTime = 30.seconds "A LEADER that is LEAVING" must { @@ -45,41 +43,60 @@ abstract class LeaderLeavingSpec val oldLeaderAddress = cluster.leader - if (cluster.isLeader) { + within(leaderHandoffWaitingTime) { - cluster.leave(oldLeaderAddress) - enterBarrier("leader-left") + if (cluster.isLeader) { - // verify that a NEW LEADER have taken over - awaitCond(!cluster.isLeader) + enterBarrier("registered-listener") - // verify that the LEADER is shut down - awaitCond(!cluster.isRunning, 30.seconds.dilated) + cluster.leave(oldLeaderAddress) + enterBarrier("leader-left") - // verify that the LEADER is REMOVED - awaitCond(cluster.status == MemberStatus.Removed) + // verify that a NEW LEADER have taken over + awaitCond(!cluster.isLeader) - } else { + // verify that the LEADER is shut down + awaitCond(!cluster.isRunning) - enterBarrier("leader-left") + // verify that the LEADER is REMOVED + awaitCond(cluster.status == MemberStatus.Removed) - // verify that the LEADER is LEAVING - awaitCond(cluster.latestGossip.members.exists(m ⇒ m.status == MemberStatus.Leaving && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on LEAVING + } else { - // verify that the LEADER is EXITING - awaitCond(cluster.latestGossip.members.exists(m ⇒ m.status == MemberStatus.Exiting && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on EXITING + val leavingLatch = TestLatch() + val exitingLatch = TestLatch() + val expectedAddresses = roles.toSet map address + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() + } + }) + enterBarrier("registered-listener") - // verify that the LEADER is no longer part of the 'members' set - awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime) + enterBarrier("leader-left") - // verify that the LEADER is not part of the 'unreachable' set - awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime) + // verify that the LEADER is LEAVING + leavingLatch.await - // verify that we have a new LEADER - awaitCond(cluster.leader != oldLeaderAddress, leaderHandoffWaitingTime) + // verify that the LEADER is EXITING + exitingLatch.await + + // verify that the LEADER is no longer part of the 'members' set + awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress)) + + // verify that the LEADER is not part of the 'unreachable' set + awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress)) + + // verify that we have a new LEADER + awaitCond(cluster.leader != oldLeaderAddress) + } + + enterBarrier("finished") } - - enterBarrier("finished") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 71b55c21f9..19c81ecb28 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -18,11 +18,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-interval = 300 s # turn "off" - } - """) + # turn off unreachable reaper + akka.cluster.unreachable-nodes-reaper-interval = 300 s""") .withFallback(MultiNodeClusterSpec.clusterConfig))) } @@ -42,26 +39,39 @@ abstract class NodeLeavingAndExitingSpec awaitClusterUp(first, second, third) - runOn(first) { - cluster.leave(second) - } - enterBarrier("second-left") - runOn(first, third) { + val secondAddess = address(second) + val leavingLatch = TestLatch() + val exitingLatch = TestLatch() + val expectedAddresses = roles.toSet map address + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == secondAddess && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() + } + }) + enterBarrier("registered-listener") - // 1. Verify that 'second' node is set to LEAVING - // We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a - // chance to test the LEAVING state before the leader moves the node to EXITING - awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING - val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left - hasLeft must be('defined) - hasLeft.get.address must be(address(second)) + runOn(third) { + cluster.leave(second) + } + enterBarrier("second-left") - // 2. Verify that 'second' node is set to EXITING - awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Exiting)) // wait on EXITING - val hasExited = cluster.latestGossip.members.find(_.status == MemberStatus.Exiting) // verify node that exited - hasExited must be('defined) - hasExited.get.address must be(address(second)) + // Verify that 'second' node is set to LEAVING + leavingLatch.await + + // Verify that 'second' node is set to EXITING + exitingLatch.await + + } + + // node that is leaving + runOn(second) { + enterBarrier("registered-listener") + enterBarrier("second-left") } enterBarrier("finished") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 4bb8f46b85..1f395563e1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -33,7 +33,7 @@ class ClusterConfigSpec extends AkkaSpec { JoinTimeout must be(60 seconds) NrOfGossipDaemons must be(4) AutoJoin must be(true) - AutoDown must be(true) + AutoDown must be(false) GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) diff --git a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java index 9b91336800..0bc30381f6 100644 --- a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java @@ -235,7 +235,7 @@ public class UntypedActorDocTestBase { final ArrayList> futures = new ArrayList>(); futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout - futures.add(ask(actorB, "reqeest", t)); // using timeout from above + futures.add(ask(actorB, "another request", t)); // using timeout from above final Future> aggregate = Futures.sequence(futures, system.dispatcher()); diff --git a/akka-docs/modules/http.rst b/akka-docs/modules/http.rst index b0f54948d3..9a5f1f3a47 100644 --- a/akka-docs/modules/http.rst +++ b/akka-docs/modules/http.rst @@ -12,11 +12,19 @@ service applications that integrates with Akka. It provides a REST API on top of Getting started --------------- -First you must make your application aware of play-mini. -In SBT you just have to add the following to your ``libraryDependencies``:: +Easiest way to get started with `Play2 Mini `_ is to use the +G8 project templates, as described in the `Play2 Mini Documentation `_. + +If you already have an Akka project and want to add Play2 Mini, you must first add the following to +your ``libraryDependencies``:: libraryDependencies += "com.typesafe" %% "play-mini" % "" +In case you need to start Play2 Mini programatically you can use:: + + play.core.server.NettyServer.main(Array()) + + Akka Mist ========= diff --git a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala index 7b0d28ecce..03338c6b24 100644 --- a/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala +++ b/akka-osgi-aries/src/test/scala/akka/osgi/aries/blueprint/NamespaceHandlerTest.scala @@ -66,7 +66,6 @@ class ConfigNamespaceHandlerTest extends WordSpec with MustMatchers with PojoSRT "set up ActorSystem when bundle starts" in { val system = serviceForType[ActorSystem] system must not be (null) - system.settings.config.getString("some.config.key") must be("value") } diff --git a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala index 029877d8af..e928f42c53 100644 --- a/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala +++ b/akka-osgi/src/test/scala/akka/osgi/PojoSRTestSupport.scala @@ -14,8 +14,8 @@ import java.net.URL import java.util.jar.JarInputStream import java.io.{ FileInputStream, FileOutputStream, File } -import java.util.{ Date, ServiceLoader, HashMap } import org.scalatest.{ BeforeAndAfterAll, Suite } +import java.util.{ UUID, Date, ServiceLoader, HashMap } /** * Trait that provides support for building akka-osgi tests using PojoSR @@ -29,11 +29,11 @@ trait PojoSRTestSupport extends Suite with BeforeAndAfterAll { * All bundles being found on the test classpath are automatically installed and started in the PojoSR runtime. * Implement this to define the extra bundles that should be available for testing. */ - val testBundles: Seq[BundleDescriptor] + def testBundles: Seq[BundleDescriptor] lazy val context: BundleContext = { val config = new HashMap[String, AnyRef]() - System.setProperty("org.osgi.framework.storage", "target/akka-osgi/" + System.currentTimeMillis) + System.setProperty("org.osgi.framework.storage", "target/akka-osgi/" + UUID.randomUUID().toString) val bundles = new ClasspathScanner().scanForBundles() bundles.addAll(testBundles) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index f365d5ce19..64c2e1a840 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -209,6 +209,7 @@ akka { # "" or SecureRandom => (default) # "SHA1PRNG" => Can be slow because of blocking issues on Linux # "AES128CounterSecureRNG" => fastest startup and based on AES encryption algorithm + # "AES256CounterSecureRNG" # The following use one of 3 possible seed sources, depending on availability: /dev/random, random.org and SecureRandom (provided by Java) # "AES128CounterInetRNG" # "AES256CounterInetRNG" (Install JCE Unlimited Strength Jurisdiction Policy Files first) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala index 690b4522ec..83fdb781a7 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettySSLSupport.scala @@ -38,7 +38,7 @@ private[akka] object NettySSLSupport { } val rng = rngName match { - case Some(r @ ("AES128CounterSecureRNG" | "AES128CounterInetRNG" | "AES256CounterInetRNG")) ⇒ + case Some(r @ ("AES128CounterSecureRNG" | "AES256CounterSecureRNG" | "AES128CounterInetRNG" | "AES256CounterInetRNG")) ⇒ log.debug("SSL random number generator set to: {}", r) SecureRandom.getInstance(r, AkkaProvider) case Some(s @ ("SHA1PRNG" | "NativePRNG")) ⇒ diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterInetRNG.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterInetRNG.scala index 41d12b275f..9944f3d6d4 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AES128CounterInetRNG.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterInetRNG.scala @@ -3,17 +3,18 @@ */ package akka.security.provider -import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } +import org.uncommons.maths.random.{ AESCounterRNG } +import SeedSize.Seed128 /** * Internal API * This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * It uses the default seed generator which uses one of the following 3 random seed sources: - * Depending on availability: /dev/random, random.org and SecureRandom (provided by Java) + * Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java) * The only method used by netty ssl is engineNextBytes(bytes) */ class AES128CounterInetRNG extends java.security.SecureRandomSpi { - private val rng = new AESCounterRNG() + private val rng = new AESCounterRNG(engineGenerateSeed(Seed128)) /** * This is managed internally by AESCounterRNG @@ -35,6 +36,6 @@ class AES128CounterInetRNG extends java.security.SecureRandomSpi { * @param numBytes the number of seed bytes to generate. * @return the seed bytes. */ - override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = InternetSeedGenerator.getInstance.generateSeed(numBytes) } diff --git a/akka-remote/src/main/scala/akka/security/provider/AES128CounterSecureRNG.scala b/akka-remote/src/main/scala/akka/security/provider/AES128CounterSecureRNG.scala index cda59ee03b..bd422a249b 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AES128CounterSecureRNG.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AES128CounterSecureRNG.scala @@ -4,15 +4,22 @@ package akka.security.provider import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } +import SeedSize.Seed128 /** * Internal API - * This class is a wrapper around the AESCounterRNG algorithm provided by http://maths.uncommons.org/ * + * This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * The only method used by netty ssl is engineNextBytes(bytes) * This RNG is good to use to prevent startup delay when you don't have Internet access to random.org */ class AES128CounterSecureRNG extends java.security.SecureRandomSpi { - private val rng = new AESCounterRNG(new SecureRandomSeedGenerator()) + /**Singleton instance. */ + private final val Instance: SecureRandomSeedGenerator = new SecureRandomSeedGenerator + + /** + * Make sure the seed generator is provided by a SecureRandom singleton and not default 'Random' + */ + private val rng = new AESCounterRNG(engineGenerateSeed(Seed128)) /** * This is managed internally by AESCounterRNG @@ -34,6 +41,6 @@ class AES128CounterSecureRNG extends java.security.SecureRandomSpi { * @param numBytes the number of seed bytes to generate. * @return the seed bytes. */ - override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = (new SecureRandomSeedGenerator()).generateSeed(numBytes) + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = Instance.generateSeed(numBytes) } diff --git a/akka-remote/src/main/scala/akka/security/provider/AES256CounterInetRNG.scala b/akka-remote/src/main/scala/akka/security/provider/AES256CounterInetRNG.scala index 076d4fcd7f..4c7de74990 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AES256CounterInetRNG.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AES256CounterInetRNG.scala @@ -3,23 +3,18 @@ */ package akka.security.provider -import org.uncommons.maths.random.{ AESCounterRNG, DefaultSeedGenerator } +import org.uncommons.maths.random.{ AESCounterRNG } +import SeedSize.Seed256 /** * Internal API * This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * It uses the default seed generator which uses one of the following 3 random seed sources: - * Depending on availability: /dev/random, random.org and SecureRandom (provided by Java) + * Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java) * The only method used by netty ssl is engineNextBytes(bytes) */ class AES256CounterInetRNG extends java.security.SecureRandomSpi { - /** - * From AESCounterRNG API docs: - * Valid values are 16 (128 bits), 24 (192 bits) and 32 (256 bits). - * Any other values will result in an exception from the AES implementation. - */ - private val AES_256_BIT = 32 // Magic number is magic - private val rng = new AESCounterRNG(AES_256_BIT) + private val rng = new AESCounterRNG(engineGenerateSeed(Seed256)) /** * This is managed internally by AESCounterRNG @@ -41,6 +36,6 @@ class AES256CounterInetRNG extends java.security.SecureRandomSpi { * @param numBytes the number of seed bytes to generate. * @return the seed bytes. */ - override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = DefaultSeedGenerator.getInstance.generateSeed(numBytes) + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = InternetSeedGenerator.getInstance.generateSeed(numBytes) } diff --git a/akka-remote/src/main/scala/akka/security/provider/AES256CounterSecureRNG.scala b/akka-remote/src/main/scala/akka/security/provider/AES256CounterSecureRNG.scala new file mode 100644 index 0000000000..8650cd75c4 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/AES256CounterSecureRNG.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.security.provider + +import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } +import SeedSize.Seed256 + +/** + * Internal API + * This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ + * The only method used by netty ssl is engineNextBytes(bytes) + * This RNG is good to use to prevent startup delay when you don't have Internet access to random.org + */ +class AES256CounterSecureRNG extends java.security.SecureRandomSpi { + /**Singleton instance. */ + private final val Instance: SecureRandomSeedGenerator = new SecureRandomSeedGenerator + + private val rng = new AESCounterRNG(engineGenerateSeed(Seed256)) + + /** + * This is managed internally by AESCounterRNG + */ + override protected def engineSetSeed(seed: Array[Byte]): Unit = () + + /** + * Generates a user-specified number of random bytes. + * + * @param bytes the array to be filled in with random bytes. + */ + override protected def engineNextBytes(bytes: Array[Byte]): Unit = rng.nextBytes(bytes) + + /** + * Unused method + * Returns the given number of seed bytes. This call may be used to + * seed other random number generators. + * + * @param numBytes the number of seed bytes to generate. + * @return the seed bytes. + */ + override protected def engineGenerateSeed(numBytes: Int): Array[Byte] = Instance.generateSeed(numBytes) +} + diff --git a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala index 707ad0c399..8cbebe4190 100644 --- a/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala +++ b/akka-remote/src/main/scala/akka/security/provider/AkkaProvider.scala @@ -13,11 +13,13 @@ object AkkaProvider extends Provider("Akka", 1.0, "Akka provider 1.0 that implem def run = { //SecureRandom put("SecureRandom.AES128CounterSecureRNG", classOf[AES128CounterSecureRNG].getName) + put("SecureRandom.AES256CounterSecureRNG", classOf[AES256CounterSecureRNG].getName) put("SecureRandom.AES128CounterInetRNG", classOf[AES128CounterInetRNG].getName) put("SecureRandom.AES256CounterInetRNG", classOf[AES256CounterInetRNG].getName) //Implementation type: software or hardware put("SecureRandom.AES128CounterSecureRNG ImplementedIn", "Software") + put("SecureRandom.AES256CounterSecureRNG ImplementedIn", "Software") put("SecureRandom.AES128CounterInetRNG ImplementedIn", "Software") put("SecureRandom.AES256CounterInetRNG ImplementedIn", "Software") null //Magic null is magic diff --git a/akka-remote/src/main/scala/akka/security/provider/InternetSeedGenerator.scala b/akka-remote/src/main/scala/akka/security/provider/InternetSeedGenerator.scala new file mode 100644 index 0000000000..e28cbf4f17 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/InternetSeedGenerator.scala @@ -0,0 +1,53 @@ +// ============================================================================ +// Copyright 2006-2010 Daniel W. Dyer +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============================================================================ +package akka.security.provider + +import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSeedGenerator, RandomDotOrgSeedGenerator, DevRandomSeedGenerator } + +/** + * Internal API + * Seed generator that maintains multiple strategies for seed + * generation and will delegate to the best one available for the + * current operating environment. + * @author Daniel Dyer + */ +object InternetSeedGenerator { + /** + * @return The singleton instance of this class. + */ + def getInstance: InternetSeedGenerator = Instance + + /**Singleton instance. */ + private final val Instance: InternetSeedGenerator = new InternetSeedGenerator + /**Delegate generators. */ + private final val Generators: Seq[SeedGenerator] = + Seq(new RandomDotOrgSeedGenerator, // first try the Internet seed generator + new SecureRandomSeedGenerator) // this is last because it always works +} + +final class InternetSeedGenerator extends SeedGenerator { + /** + * Generates a seed by trying each of the available strategies in + * turn until one succeeds. Tries the most suitable strategy first + * and eventually degrades to the least suitable (but guaranteed to + * work) strategy. + * @param length The length (in bytes) of the seed. + * @return A random seed of the requested length. + */ + def generateSeed(length: Int): Array[Byte] = InternetSeedGenerator.Generators.view.flatMap( + g ⇒ try Option(g.generateSeed(length)) catch { case _: SeedException ⇒ None }).headOption.getOrElse(throw new IllegalStateException("All available seed generation strategies failed.")) +} + diff --git a/akka-remote/src/main/scala/akka/security/provider/SeedSize.scala b/akka-remote/src/main/scala/akka/security/provider/SeedSize.scala new file mode 100644 index 0000000000..c8c7c0e661 --- /dev/null +++ b/akka-remote/src/main/scala/akka/security/provider/SeedSize.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.security.provider + +/** + * Internal API + * From AESCounterRNG API docs: + * Valid values are 16 (128 bits), 24 (192 bits) and 32 (256 bits). + * Any other values will result in an exception from the AES implementation. + * + * Internal API + */ +private[provider] object SeedSize { + val Seed128 = 16 + val Seed192 = 24 + val Seed256 = 32 +} + diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 5abf12d6cf..16d2542da5 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -87,6 +87,9 @@ class Ticket1978SHA1PRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class Ticket1978AES128CounterSecureRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES128CounterSecureRNG", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Ticket1978AES256CounterSecureRNGSpec extends Ticket1978CommunicationSpec(getCipherConfig("AES256CounterSecureRNG", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) + /** * Both of the Inet variants require access to the Internet to access random.org. */ diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7dad77d7cf..d080fef95a 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -43,6 +43,20 @@ object AkkaBuild extends Build { Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id), + initialCommands in ThisBuild := + """|import akka.actor._ + |import akka.dispatch._ + |import com.typesafe.config.ConfigFactory + |import akka.util.duration._ + |import akka.util.Timeout + |val config = ConfigFactory.parseString("akka.stdout-loglevel=INFO,akka.loglevel=DEBUG") + |val remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=RemoteActorRefProvider").withFallback(config) + |var system: ActorSystem = null + |def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") } + |implicit def ec = system.dispatcher + |implicit val timeout = Timeout(5 seconds) + |""".stripMargin, + initialCommands in Test in ThisBuild += "import akka.testkit._", // online version of docs sphinxDocs <<= baseDirectory / "akka-docs", sphinxTags in sphinxHtml += "online", @@ -76,6 +90,7 @@ object AkkaBuild extends Build { dependencies = Seq(actor), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.testkit, + initialCommands += "import akka.testkit._", previousArtifact := akkaPreviousArtifact("akka-testkit") ) ) @@ -230,7 +245,8 @@ object AkkaBuild extends Build { base = file("akka-osgi"), dependencies = Seq(actor), settings = defaultSettings ++ OSGi.osgi ++ Seq( - libraryDependencies ++= Dependencies.osgi + libraryDependencies ++= Dependencies.osgi, + parallelExecution in Test := false ) ) @@ -239,7 +255,8 @@ object AkkaBuild extends Build { base = file("akka-osgi-aries"), dependencies = Seq(osgi % "compile;test->test"), settings = defaultSettings ++ OSGi.osgiAries ++ Seq( - libraryDependencies ++= Dependencies.osgiAries + libraryDependencies ++= Dependencies.osgiAries, + parallelExecution in Test := false ) ) @@ -395,6 +412,7 @@ object AkkaBuild extends Build { ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet, parallelExecution in Test := System.getProperty("akka.parallelExecution", "false").toBoolean, + logBuffered in Test := System.getProperty("akka.logBufferedTests", "false").toBoolean, excludeTestNames := useExcludeTestNames, excludeTestTags := useExcludeTestTags, diff --git a/repl b/repl deleted file mode 100644 index 29f505e292..0000000000 --- a/repl +++ /dev/null @@ -1,16 +0,0 @@ -import akka.actor._ -import akka.dispatch.{ Future, Promise } -import com.typesafe.config.ConfigFactory -import akka.testkit._ -val remoteConfig = try { - Class.forName("akka.remote.RemoteActorRefProvider") - "\nakka.actor.provider=akka.remote.RemoteActorRefProvider" - } catch { - case _: ClassNotFoundException => "" - } -val config=ConfigFactory.parseString("akka.daemonic=on" + remoteConfig) -val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem] -implicit val ec=sys.dispatcher -import akka.util.duration._ -import akka.util.Timeout -implicit val timeout=Timeout(5 seconds)