diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala index a836572ad2..b3b9a57839 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala @@ -14,71 +14,82 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.actor.ActorSystem.Settings import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.Assertions.intercept import org.scalatest.BeforeAndAfterEach object ActorWithBoundedStashSpec { - class StashingActor(implicit sys: ActorSystem) extends Actor with Stash { + class StashingActor extends Actor with Stash { def receive = { - case "hello" ⇒ stash() - case "world" ⇒ unstashAll() + case "hello1" ⇒ stash() + case "world" ⇒ unstashAll() } } - class StashingActorWithOverflow(implicit sys: ActorSystem) extends Actor with Stash { + class StashingActorWithOverflow extends Actor with Stash { var numStashed = 0 def receive = { - case "hello" ⇒ + case "hello2" ⇒ numStashed += 1 - try stash() catch { case e: StashOverflowException ⇒ if (numStashed == 21) sender ! "STASHOVERFLOW" } + try stash() catch { + case _: StashOverflowException ⇒ + if (numStashed == 21) { + sender ! "STASHOVERFLOW" + context stop self + } + } } } + // bounded deque-based mailbox with capacity 10 + class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 10 millis) + + val dispatcherId = "my-dispatcher" + val testConf: Config = ConfigFactory.parseString(""" - my-dispatcher { - mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded" + %s { + mailbox-type = "%s" stash-capacity = 20 } - """) - - // bounded deque-based mailbox with capacity 10 - class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 1 seconds) + """.format(dispatcherId, classOf[Bounded].getName)) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach with ImplicitSender { +class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender { import ActorWithBoundedStashSpec._ - implicit val sys = system + override def atStartup: Unit = { + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello1"))) + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello2"))) + } - override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) } + override def beforeEach(): Unit = + system.eventStream.subscribe(testActor, classOf[DeadLetter]) - def myProps(creator: ⇒ Actor): Props = Props(creator).withDispatcher("my-dispatcher") + override def afterEach(): Unit = + system.eventStream.unsubscribe(testActor, classOf[DeadLetter]) - "An Actor with Stash and BoundedDequeBasedMailbox" must { + "An Actor with Stash" must { "end up in DeadLetters in case of a capacity violation" in { - system.eventStream.subscribe(testActor, classOf[DeadLetter]) - - val stasher = system.actorOf(myProps(new StashingActor)) + val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId)) // fill up stash - (1 to 11) foreach { _ ⇒ stasher ! "hello" } + (1 to 11) foreach { _ ⇒ stasher ! "hello1" } // cause unstashAll with capacity violation stasher ! "world" - expectMsg(DeadLetter("hello", testActor, stasher)) - system.eventStream.unsubscribe(testActor, classOf[DeadLetter]) + expectMsg(DeadLetter("hello1", testActor, stasher)) + system stop stasher + (1 to 10) foreach { _ ⇒ expectMsg(DeadLetter("hello1", testActor, stasher)) } } - } - - "An Actor with bounded Stash" must { "throw a StashOverflowException in case of a stash capacity violation" in { - val stasher = system.actorOf(myProps(new StashingActorWithOverflow)) + val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId)) // fill up stash - (1 to 21) foreach { _ ⇒ stasher ! "hello" } + (1 to 21) foreach { _ ⇒ stasher ! "hello2" } expectMsg("STASHOVERFLOW") + (1 to 20) foreach { _ ⇒ expectMsg(DeadLetter("hello2", testActor, stasher)) } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 3c38d332a4..7dc121604f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -99,7 +99,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout filterException[ActorKilledException] { val supervisor = system.actorOf(Props(new Supervisor( OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Exception]))))) - val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) + val terminalProps = Props(new Actor { def receive = { case x ⇒ sender ! x } }) val terminal = Await.result((supervisor ? terminalProps).mapTo[ActorRef], timeout.duration) val monitor = startWatching(terminal) @@ -150,7 +150,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout val parent = system.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ - val currentKid = context.watch(context.actorOf(Props(ctx ⇒ { case "NKOTB" ⇒ ctx stop ctx.self }), "kid")) + val currentKid = context.watch(context.actorOf(Props(new Actor { def receive = { case "NKOTB" ⇒ context stop self } }), "kid")) currentKid forward "NKOTB" context become { case Terminated(`currentKid`) ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 084d2ddf8b..a71a9a09f8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -34,7 +34,7 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi "An LocalActorRefProvider" must { "find actor refs using actorFor" in { - val a = system.actorOf(Props(ctx ⇒ { case _ ⇒ })) + val a = system.actorOf(Props(new Actor { def receive = { case _ ⇒ } })) val b = system.actorFor(a.path) a must be === b } @@ -52,7 +52,7 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi for (i ← 0 until 100) { val address = "new-actor" + i implicit val timeout = Timeout(5 seconds) - val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(c ⇒ { case _ ⇒ }), address)) + val actors = for (j ← 1 to 4) yield Future(system.actorOf(Props(new Actor { def receive = { case _ ⇒ } }), address)) val set = Set() ++ actors.map(a ⇒ Await.ready(a, timeout.duration).value match { case Some(Success(a: ActorRef)) ⇒ 1 case Some(Failure(ex: InvalidActorNameException)) ⇒ 2 @@ -79,6 +79,8 @@ class LocalActorRefProviderSpec extends AkkaSpec(LocalActorRefProviderSpec.confi intercept[InvalidActorNameException](system.actorOf(Props.empty, "")).getMessage.contains("empty") must be(true) intercept[InvalidActorNameException](system.actorOf(Props.empty, "$hallo")).getMessage.contains("conform") must be(true) intercept[InvalidActorNameException](system.actorOf(Props.empty, "a%")).getMessage.contains("conform") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "%3")).getMessage.contains("conform") must be(true) + intercept[InvalidActorNameException](system.actorOf(Props.empty, "%1t")).getMessage.contains("conform") must be(true) intercept[InvalidActorNameException](system.actorOf(Props.empty, "a?")).getMessage.contains("conform") must be(true) intercept[InvalidActorNameException](system.actorOf(Props.empty, "üß")).getMessage.contains("conform") must be(true) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index f525cd61ec..71e210559c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -192,7 +192,9 @@ object SupervisorHierarchySpec { case x ⇒ (x, x) } override val supervisorStrategy = OneForOneStrategy()(unwrap andThen { - case _ if pongsToGo > 0 ⇒ Resume + case _: Failure if pongsToGo > 0 ⇒ + log :+= Event("pongOfDeath resuming " + sender) + Resume case (f: Failure, orig) ⇒ if (f.depth > 0) { setFlags(f.directive) @@ -225,6 +227,9 @@ object SupervisorHierarchySpec { context.watch(context.actorOf(props, name)) } } + if (context.children.size != state.kids.size) { + abort("invariant violated: " + state.kids.size + " != " + context.children.size) + } cause match { case f: Failure if f.failPost > 0 ⇒ f.failPost -= 1; throw f case PostRestartException(`self`, f: Failure, _) if f.failPost > 0 ⇒ f.failPost -= 1; throw f @@ -279,11 +284,14 @@ object SupervisorHierarchySpec { val kids = stateCache.get(self).kids(ref) val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) + } else { + log :+= Event(sender + " terminated while pongOfDeath") } case Abort ⇒ abort("terminating") case PingOfDeath ⇒ if (size > 1) { pongsToGo = context.children.size + log :+= Event("sending " + pongsToGo + " pingOfDeath") context.children foreach (_ ! PingOfDeath) } else { context stop self @@ -567,8 +575,12 @@ object SupervisorHierarchySpec { } case Event(StateTimeout, _) ⇒ errors :+= self -> ErrorLog("timeout while Stopping", Vector.empty) - context stop hierarchy - goto(Failed) + println(system.asInstanceOf[ActorSystemImpl].printTree) + getErrors(hierarchy, 10) + printErrors() + idleChildren foreach println + testActor ! "timeout in Stopping" + stop case Event(e: ErrorLog, _) ⇒ errors :+= sender -> e goto(Failed) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index c0b14896f8..a6b071d804 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -81,12 +81,15 @@ class DispatcherActorSpec extends AkkaSpec(DispatcherActorSpec.config) with Defa val latch = new CountDownLatch(100) val start = new CountDownLatch(1) val fastOne = system.actorOf( - Props(context ⇒ { case "sabotage" ⇒ works.set(false) }).withDispatcher(throughputDispatcher)) + Props(new Actor { def receive = { case "sabotage" ⇒ works.set(false) } }) + .withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( - Props(context ⇒ { - case "hogexecutor" ⇒ context.sender ! "OK"; start.await - case "ping" ⇒ if (works.get) latch.countDown() + Props(new Actor { + def receive = { + case "hogexecutor" ⇒ sender ! "OK"; start.await + case "ping" ⇒ if (works.get) latch.countDown() + } }).withDispatcher(throughputDispatcher)) assert(Await.result(slowOne ? "hogexecutor", timeout.duration) === "OK") @@ -109,14 +112,18 @@ class DispatcherActorSpec extends AkkaSpec(DispatcherActorSpec.config) with Defa val ready = new CountDownLatch(1) val fastOne = system.actorOf( - Props(context ⇒ { - case "ping" ⇒ if (works.get) latch.countDown(); context.stop(context.self) + Props(new Actor { + def receive = { + case "ping" ⇒ if (works.get) latch.countDown(); context.stop(self) + } }).withDispatcher(throughputDispatcher)) val slowOne = system.actorOf( - Props(context ⇒ { - case "hogexecutor" ⇒ ready.countDown(); start.await - case "ping" ⇒ works.set(false); context.stop(context.self) + Props(new Actor { + def receive = { + case "hogexecutor" ⇒ ready.countDown(); start.await + case "ping" ⇒ works.set(false); context.stop(self) + } }).withDispatcher(throughputDispatcher)) slowOne ! "hogexecutor" diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index 6dfbf2aedd..c1e8f8a3f6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -36,7 +36,7 @@ class PinnedActorSpec extends AkkaSpec(PinnedActorSpec.config) with BeforeAndAft "support tell" in { var oneWay = new CountDownLatch(1) - val actor = system.actorOf(Props(self ⇒ { case "OneWay" ⇒ oneWay.countDown() }).withDispatcher("pinned-dispatcher")) + val actor = system.actorOf(Props(new Actor { def receive = { case "OneWay" ⇒ oneWay.countDown() } }).withDispatcher("pinned-dispatcher")) val result = actor ! "OneWay" assert(oneWay.await(1, TimeUnit.SECONDS)) system.stop(actor) diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index 97f5e0d236..0e3d358322 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -23,14 +23,14 @@ class Future2ActorSpec extends AkkaSpec with DefaultTimeout { } "support convenient sending to multiple destinations with implicit sender" in { - implicit val someActor = system.actorOf(Props(ctx ⇒ Actor.emptyBehavior)) + implicit val someActor = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) Future(42) pipeTo testActor pipeTo testActor expectMsgAllOf(1 second, 42, 42) lastSender must be(someActor) } "support convenient sending with explicit sender" in { - val someActor = system.actorOf(Props(ctx ⇒ Actor.emptyBehavior)) + val someActor = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) Future(42).to(testActor, someActor) expectMsgAllOf(1 second, 42) lastSender must be(someActor) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 82cea61625..bc423998f0 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -20,7 +20,7 @@ import scala.runtime.NonLocalReturnControl import akka.pattern.ask import java.lang.{ IllegalStateException, ArithmeticException } import java.util.concurrent._ -import scala.reflect.ClassTag +import scala.reflect.{ ClassTag, classTag } import scala.util.{ Failure, Success, Try } object FutureSpec { @@ -260,7 +260,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } yield b + "-" + c Await.result(future1, timeout.duration) must be("10-14") - assert(checkType(future1, manifest[String])) + assert(checkType(future1, classTag[String])) intercept[ClassCastException] { Await.result(future2, timeout.duration) } system.stop(actor) } @@ -479,7 +479,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } })) - val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo manifest[Int]) + val oddFutures = List.fill(100)(oddActor ? 'GetNext mapTo classTag[Int]) assert(Await.result(Future.sequence(oddFutures), timeout.duration).sum === 10000) system.stop(oddActor) @@ -939,9 +939,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa f((future, message) ⇒ { future.value must be('defined) future.value.get must be('failure) - future.value.get match { - case Failure(f) ⇒ f.getMessage must be(message) - } + val Failure(f) = future.value.get + f.getMessage must be(message) }) } "throw exception with 'get'" in { f((future, message) ⇒ (evaluating { Await.result(future, timeout.duration) } must produce[java.lang.Exception]).getMessage must be(message)) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index 31a314da2e..a67f65d870 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -9,9 +9,9 @@ import akka.testkit.AkkaSpec import scala.concurrent.util.duration._ import scala.concurrent.Await import akka.testkit.DefaultTimeout -import akka.actor.{ Props, ActorRef } import akka.util.Timeout import scala.util.Failure +import akka.actor.{ Actor, Props, ActorRef } class AskSpec extends AkkaSpec { @@ -50,7 +50,7 @@ class AskSpec extends AkkaSpec { "return broken promises on 0 timeout" in { implicit val timeout = Timeout(0 seconds) - val echo = system.actorOf(Props(ctx ⇒ { case x ⇒ ctx.sender ! x })) + val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) val f = echo ? "foo" val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo intercept[IllegalArgumentException] { @@ -60,7 +60,7 @@ class AskSpec extends AkkaSpec { "return broken promises on < 0 timeout" in { implicit val timeout = Timeout(-1000 seconds) - val echo = system.actorOf(Props(ctx ⇒ { case x ⇒ ctx.sender ! x })) + val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) val f = echo ? "foo" val expectedMsg = "Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format echo intercept[IllegalArgumentException] { @@ -70,7 +70,7 @@ class AskSpec extends AkkaSpec { "return broken promises on infinite timeout" in { implicit val timeout = Timeout.never - val echo = system.actorOf(Props(ctx ⇒ { case x ⇒ ctx.sender ! x })) + val echo = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) val f = echo ? "foo" val expectedMsg = "Timeouts to `ask` must be finite. Question not sent to [%s]" format echo intercept[IllegalArgumentException] { diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index 7bf5afa6f9..72370d98a4 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -10,7 +10,9 @@ import scala.concurrent.{ Promise, Future, Await } class CircuitBreakerMTSpec extends AkkaSpec { implicit val ec = system.dispatcher "A circuit breaker being called by many threads" must { - val breaker = new CircuitBreaker(system.scheduler, 5, 100.millis.dilated, 500.millis.dilated) + val callTimeout = 1.second.dilated + val resetTimeout = 2.seconds.dilated + val breaker = new CircuitBreaker(system.scheduler, 5, callTimeout, resetTimeout) def openBreaker(): Unit = Await.ready(Future.sequence((1 to 5).map(_ ⇒ breaker.withCircuitBreaker(Future(throw new RuntimeException("FAIL"))).failed)), 1.second.dilated) @@ -47,7 +49,7 @@ class CircuitBreakerMTSpec extends AkkaSpec { openBreaker() - Await.ready(halfOpenLatch, 2.seconds.dilated) + Await.ready(halfOpenLatch, resetTimeout + 1.seconds.dilated) val futures = for (i ← 1 to 100) yield breaker.withCircuitBreaker(Future { Thread.sleep(10); "succeed" @@ -66,7 +68,7 @@ class CircuitBreakerMTSpec extends AkkaSpec { breaker.onHalfOpen(halfOpenLatch.countDown()) openBreaker() Await.ready(halfOpenLatch, 5.seconds.dilated) - Await.ready(breaker.withCircuitBreaker(Future("succeed")), 1.second.dilated) + Await.ready(breaker.withCircuitBreaker(Future("succeed")), resetTimeout) val futures = (1 to 100) map { i ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index ec5d825fec..d3fe3308ea 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -32,6 +32,9 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /router3 { + router = round-robin + } /myrouter { router = "akka.routing.RoutingSpec$MyRouter" foo = bar @@ -123,12 +126,12 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with val theActor = system.actorOf(Props(new TheActor), "theActor") theActor ! "doIt" - Await.ready(doneLatch, 1 seconds) + Await.ready(doneLatch, remaining) } "use configured nr-of-instances when FromConfig" in { val router = system.actorOf(Props[TestActor].withRouter(FromConfig), "router1") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) watch(router) system.stop(router) expectMsgType[Terminated] @@ -136,7 +139,22 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with "use configured nr-of-instances when router is specified" in { val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) + system.stop(router) + } + + "use specified resizer when resizer not configured" in { + val latch = TestLatch(1) + val resizer = new Resizer { + def isTimeForResize(messageCounter: Long): Boolean = messageCounter == 0 + def resize(routeeProvider: RouteeProvider): Unit = { + routeeProvider.createRoutees(nrOfInstances = 3) + latch.countDown() + } + } + val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(resizer = Some(resizer))), "router3") + Await.ready(latch, remaining) + Await.result(router ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees].routees.size must be(3) system.stop(router) } @@ -223,7 +241,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! "hello" routedActor ! "end" - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) counter.get must be(1) } @@ -270,7 +288,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! Broadcast("end") //now wait some and do validations. - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) for (i ← 0 until connectionCount) { val counter = counters.get(i).get @@ -302,7 +320,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! Broadcast(1) routedActor ! Broadcast("end") - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) counter1.get must be(1) counter2.get must be(1) @@ -340,7 +358,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! Broadcast(1) routedActor ! Broadcast("end") - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) counter1.get must be(1) counter2.get must be(1) @@ -431,7 +449,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! 1 routedActor ! "end" - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) counter1.get must be(1) counter2.get must be(1) @@ -462,7 +480,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ? 1 routedActor ! "end" - Await.ready(doneLatch, 5 seconds) + Await.ready(doneLatch, remaining) counter1.get must be(1) counter2.get must be(1) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 90cf83a3a9..01181d57aa 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -87,50 +87,23 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.config) { } "serialize Address" in { - val b = serialize(addr) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Address]) match { - case Left(exception) ⇒ fail(exception) - case Right(add) ⇒ assert(add === addr) - } + assert(deserialize(serialize(addr).get, classOf[Address]).get === addr) } "serialize Person" in { - - val b = serialize(person) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Person]) match { - case Left(exception) ⇒ fail(exception) - case Right(p) ⇒ assert(p === person) - } + assert(deserialize(serialize(person).get, classOf[Person]).get === person) } "serialize record with default serializer" in { - val r = Record(100, person) - val b = serialize(r) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - deserialize(b.asInstanceOf[Array[Byte]], classOf[Record]) match { - case Left(exception) ⇒ fail(exception) - case Right(p) ⇒ assert(p === r) - } + assert(deserialize(serialize(r).get, classOf[Record]).get === r) } "not serialize ActorCell" in { val a = system.actorOf(Props(new Actor { def receive = { case o: ObjectOutputStream ⇒ - try { - o.writeObject(this) - } catch { - case _: NotSerializableException ⇒ testActor ! "pass" - } + try o.writeObject(this) catch { case _: NotSerializableException ⇒ testActor ! "pass" } } })) a ! new ObjectOutputStream(new ByteArrayOutputStream()) diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 196c794f27..b1e36f7559 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -80,17 +80,15 @@ object ActorDSL extends dsl.Inbox with dsl.Creators { protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { - val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props.empty, "dsl").asInstanceOf[RepointableActorRef] + val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props( + new Actor { + def receive = { case any ⇒ sender ! any } + }), "dsl").asInstanceOf[RepointableActorRef] { - val timeout = system.settings.CreationTimeout.duration - val deadline = Deadline.now + timeout - while (!boss.isStarted) { - if (deadline.hasTimeLeft) - if (system.isTerminated) throw new IllegalStateException("actor system is already shutdown") - else Thread.sleep(10) - else throw new TimeoutException("failed to create /system/dsl actor within " + timeout) - } + implicit val timeout = system.settings.CreationTimeout + if (Await.result(boss ? "OK", system.settings.CreationTimeout.duration) != "OK") + throw new IllegalStateException("Creation of boss actor did not succeed!") } lazy val config = system.settings.config.getConfig("akka.actor.dsl") diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 74d7a5540c..cc21e0de16 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -19,7 +19,7 @@ object ActorPath { * Since Actors form a tree, it is addressable using an URL, therefor an Actor Name has to conform to: * http://www.ietf.org/rfc/rfc2396.txt */ - val ElementRegex = """[-\w:@&=+,.!~*'_;][-\w:@&=+,.!~*'$_;]*""".r + val ElementRegex = """(?:[-\w:@&=+,.!~*'_;]|%\p{XDigit}{2})(?:[-\w:@&=+,.!~*'$_;]|%\p{XDigit}{2})*""".r } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index fa85a19ac2..3f96bd839c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -413,7 +413,7 @@ class LocalActorRefProvider( def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras private def guardianSupervisorStrategyConfigurator = - dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x) + dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).get /** * Overridable supervision strategy to be used by the “/user” guardian. @@ -438,8 +438,9 @@ class LocalActorRefProvider( new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { - case "temp" ⇒ tempContainer - case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other)) + case "temp" ⇒ tempContainer + case "deadLetters" ⇒ deadLetters + case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other)) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 7c227f3757..bce966b99e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -19,6 +19,7 @@ import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, R import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.cell.ChildrenContainer import scala.concurrent.util.FiniteDuration +import util.{ Failure, Success } object ActorSystem { @@ -540,10 +541,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classOf[Scheduler] -> scheduler, classOf[DynamicAccess] -> dynamicAccess) - dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments) match { - case Left(e) ⇒ throw e - case Right(p) ⇒ p - } + dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get } def deadLetters: ActorRef = provider.deadLetters @@ -678,13 +676,12 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, private def loadExtensions() { import scala.collection.JavaConversions._ settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ - dynamicAccess.getObjectFor[AnyRef](fqcn).fold(_ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()), Right(_)) match { - case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); - case Right(p: ExtensionId[_]) ⇒ registerExtension(p); - case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) - case Left(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) + dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Seq()) } match { + case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) + case Success(p: ExtensionId[_]) ⇒ registerExtension(p) + case Success(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) + case Failure(problem) ⇒ log.error(problem, "While trying to load extension [{}], skipping...", fqcn) } - } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index abe4c18a70..9ceebaacb3 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -156,15 +156,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) - dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match { - case Right(router) ⇒ router - case Left(exception) ⇒ - throw new IllegalArgumentException( - ("Cannot instantiate router [%s], defined in [%s], " + - "make sure it extends [akka.routing.RouterConfig] and has constructor with " + - "[com.typesafe.config.Config] parameter") - .format(fqn, key), exception) - } + dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( + ("Cannot instantiate router [%s], defined in [%s], " + + "make sure it extends [akka.routing.RouterConfig] and has constructor with " + + "[com.typesafe.config.Config] parameter") + .format(fqn, key), exception) + }).get } Some(Deploy(key, deployment, router, NoScopeGiven)) diff --git a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala index bc5bef5d61..50f5d26177 100644 --- a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala @@ -6,6 +6,7 @@ package akka.actor import scala.util.control.NonFatal import java.lang.reflect.InvocationTargetException import scala.reflect.ClassTag +import scala.util.Try /** * The DynamicAccess implementation is the class which is used for @@ -16,7 +17,6 @@ import scala.reflect.ClassTag * unless they are extending Akka in ways which go beyond simple Extensions. */ abstract class DynamicAccess { - /** * Convenience method which given a `Class[_]` object and a constructor description * will create a new instance of that class. @@ -25,23 +25,13 @@ abstract class DynamicAccess { * val obj = DynamicAccess.createInstanceFor(clazz, Seq(classOf[Config] -> config, classOf[String] -> name)) * }}} */ - def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = { - val types = args.map(_._1).toArray - val values = args.map(_._2).toArray - withErrorHandling { - val constructor = clazz.getDeclaredConstructor(types: _*) - constructor.setAccessible(true) - val obj = constructor.newInstance(values: _*).asInstanceOf[T] - val t = implicitly[ClassTag[T]].runtimeClass - if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(clazz + " is not a subtype of " + t)) - } - } + def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] /** * Obtain a `Class[_]` object loaded with the right class loader (i.e. the one * returned by `classLoader`). */ - def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] + def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] /** * Obtain an object conforming to the type T, which is expected to be @@ -50,35 +40,18 @@ abstract class DynamicAccess { * `args` argument. The exact usage of args depends on which type is requested, * see the relevant requesting code for details. */ - def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] + def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] /** * Obtain the Scala “object” instance for the given fully-qualified class name, if there is one. */ - def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] + def getObjectFor[T: ClassTag](fqcn: String): Try[T] /** * This is the class loader to be used in those special cases where the * other factory method are not applicable (e.g. when constructing a ClassLoaderBinaryInputStream). */ def classLoader: ClassLoader - - /** - * Caught exception is returned as Left(exception). - * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. - * Other `Throwable`, such as `Error` is thrown. - */ - @inline - final def withErrorHandling[T](body: ⇒ Either[Throwable, T]): Either[Throwable, T] = - try body catch { - case e: InvocationTargetException ⇒ - e.getTargetException match { - case NonFatal(t) ⇒ Left(t) - case t ⇒ throw t - } - case NonFatal(e) ⇒ Left(e) - } - } /** @@ -89,42 +62,41 @@ abstract class DynamicAccess { * by default. */ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { - //FIXME switch to Scala Reflection for 2.10 - override def getClassFor[T: ClassTag](fqcn: String): Either[Throwable, Class[_ <: T]] = - try { + + override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] = + Try[Class[_ <: T]]({ val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]] val t = implicitly[ClassTag[T]].runtimeClass - if (t.isAssignableFrom(c)) Right(c) else Left(new ClassCastException(t + " is not assignable from " + c)) - } catch { - case NonFatal(e) ⇒ Left(e) - } - - override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Throwable, T] = - getClassFor(fqcn).fold(Left(_), { c ⇒ - val types = args.map(_._1).toArray - val values = args.map(_._2).toArray - withErrorHandling { - val constructor = c.getDeclaredConstructor(types: _*) - constructor.setAccessible(true) - val obj = constructor.newInstance(values: _*) - val t = implicitly[ClassTag[T]].runtimeClass - if (t.isInstance(obj)) Right(obj) else Left(new ClassCastException(fqcn + " is not a subtype of " + t)) - } + if (t.isAssignableFrom(c)) c else throw new ClassCastException(t + " is not assignable from " + c) }) - override def getObjectFor[T: ClassTag](fqcn: String): Either[Throwable, T] = { - getClassFor(fqcn).fold(Left(_), { c ⇒ - withErrorHandling { + override def createInstanceFor[T: ClassTag](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Try[T] = + Try { + val types = args.map(_._1).toArray + val values = args.map(_._2).toArray + val constructor = clazz.getDeclaredConstructor(types: _*) + constructor.setAccessible(true) + val obj = constructor.newInstance(values: _*) + val t = implicitly[ClassTag[T]].runtimeClass + if (t.isInstance(obj)) obj.asInstanceOf[T] else throw new ClassCastException(clazz.getName + " is not a subtype of " + t) + } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } + + override def createInstanceFor[T: ClassTag](fqcn: String, args: Seq[(Class[_], AnyRef)]): Try[T] = + getClassFor(fqcn) flatMap { c ⇒ createInstanceFor(c, args) } + + override def getObjectFor[T: ClassTag](fqcn: String): Try[T] = { + getClassFor(fqcn) flatMap { c ⇒ + Try { val module = c.getDeclaredField("MODULE$") module.setAccessible(true) val t = implicitly[ClassTag[T]].runtimeClass module.get(null) match { - case null ⇒ Left(new NullPointerException) - case x if !t.isInstance(x) ⇒ Left(new ClassCastException(fqcn + " is not a subtype of " + t)) - case x ⇒ Right(x.asInstanceOf[T]) + case null ⇒ throw new NullPointerException + case x if !t.isInstance(x) ⇒ throw new ClassCastException(fqcn + " is not a subtype of " + t) + case x: T ⇒ x } - } - }) + } recover { case i: InvocationTargetException if i.getTargetException ne null ⇒ throw i.getTargetException } + } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index da0e7e6769..6fab4ceb07 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -98,9 +98,5 @@ abstract class ExtensionKey[T <: Extension](implicit m: ClassTag[T]) extends Ext def this(clazz: Class[T]) = this()(ClassTag(clazz)) override def lookup(): ExtensionId[T] = this - def createExtension(system: ExtendedActorSystem): T = - system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)) match { - case Left(ex) ⇒ throw ex - case Right(r) ⇒ r - } + def createExtension(system: ExtendedActorSystem): T = system.dynamicAccess.createInstanceFor[T](m.runtimeClass, Seq(classOf[ExtendedActorSystem] -> system)).get } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 637877dd66..693ca75565 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -74,11 +74,6 @@ object Props { * using the supplied thunk. */ def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create) - - /** - * Returns a new Props whose creator will instantiate an Actor that has the behavior specified - */ - def apply(behavior: ActorContext ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(context) }) } /** diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 7076a996df..05b618d03a 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -101,13 +101,11 @@ An (unbounded) deque-based mailbox can be configured as follows: * `MessageQueueAppendFailedException` is thrown. * * The stash is guaranteed to be empty after calling `unstashAll()`. - * - * @throws MessageQueueAppendFailedException in case of a capacity violation when - * prepending the stash to a bounded mailbox */ def unstashAll(): Unit = { try { - for (msg ← theStash.reverseIterator) mailbox.enqueueFirst(self, msg) + val i = theStash.reverseIterator + while (i.hasNext) mailbox.enqueueFirst(self, i.next()) } finally { theStash = Vector.empty[Envelope] } @@ -115,15 +113,22 @@ An (unbounded) deque-based mailbox can be configured as follows: /** * Overridden callback. Prepends all messages in the stash to the mailbox, - * clears the stash, stops all children and invokes the postStop() callback of the superclass. + * clears the stash, stops all children and invokes the postStop() callback. */ - override def preRestart(reason: Throwable, message: Option[Any]) { + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { try unstashAll() finally { context.children foreach context.stop postStop() } } + /** + * Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash. + * Must be called when overriding this method, otherwise stashed messages won't be propagated to DeadLetters + * when actor stops. + */ + override def postStop(): Unit = unstashAll() + } /** diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index 53b4dfb99e..4e59695cff 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -78,16 +78,17 @@ private[akka] trait Children { this: ActorCell ⇒ swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) } - @tailrec final def initChild(ref: ActorRef): Option[ChildRestartStats] = - childrenRefs.getByName(ref.path.name) match { + @tailrec final def initChild(ref: ActorRef): Option[ChildRestartStats] = { + val cc = childrenRefs + cc.getByName(ref.path.name) match { case old @ Some(_: ChildRestartStats) ⇒ old.asInstanceOf[Option[ChildRestartStats]] case Some(ChildNameReserved) ⇒ val crs = ChildRestartStats(ref) val name = ref.path.name - val c = childrenRefs - if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref) + if (swapChildrenRefs(cc, cc.add(name, crs))) Some(crs) else initChild(ref) case None ⇒ None } + } @tailrec final protected def shallDie(ref: ActorRef): Boolean = { val c = childrenRefs @@ -169,13 +170,7 @@ private[akka] trait Children { this: ActorCell ⇒ private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = { if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(cell.system) - ser.serialize(props.creator) match { - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } + ser.deserialize(ser.serialize(props.creator).get, props.creator.getClass).get } /* * in case we are currently terminating, fail external attachChild requests diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 0c0f9bb9c0..97e85fe049 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -155,6 +155,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ private def finishTerminate() { val a = actor + // The following order is crucial for things to work properly. Only chnage this if you're very confident and lucky. try if (a ne null) a.postStop() finally try dispatcher.detach(this) finally try parent.sendSystemMessage(ChildTerminated(self)) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index f251ac4588..340195d1a6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -25,13 +25,7 @@ object Envelope { if (msg eq null) throw new InvalidMessageException("Message is null") if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) { val ser = SerializationExtension(system) - ser.serialize(msg) match { //Verify serializability - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, msg.getClass) match { //Verify deserializability - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } + ser.deserialize(ser.serialize(msg).get, msg.getClass).get } new Envelope(message, sender) } @@ -426,14 +420,13 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) case fqcn ⇒ val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) - prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match { - case Right(instance) ⇒ instance - case Left(exception) ⇒ + prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("Cannot instantiate MailboxType [%s], defined in [%s], " + "make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters") .format(fqcn, config.getString("id")), exception) - } + }).get } } @@ -445,13 +438,12 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit val args = Seq( classOf[Config] -> config, classOf[DispatcherPrerequisites] -> prerequisites) - prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args) match { - case Right(instance) ⇒ instance - case Left(exception) ⇒ throw new IllegalArgumentException( + prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], make sure it has an accessible constructor with a [%s,%s] signature""") .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) - } + }).get } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 1e6dbc8546..125c400bb6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -148,15 +148,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) - prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args) match { - case Right(configurator) ⇒ configurator - case Left(exception) ⇒ + prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({ + case exception ⇒ throw new IllegalArgumentException( ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " + "make sure it has constructor with [com.typesafe.config.Config] and " + "[akka.dispatch.DispatcherPrerequisites] parameters") .format(fqn, cfg.getString("id")), exception) - } + }).get } } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index cca9ee5588..2a15860b97 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -101,17 +101,13 @@ trait LoggingBus extends ActorEventBus { loggerName ← defaultLoggers if loggerName != StandardOutLogger.getClass.getName } yield { - try { - system.dynamicAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) - case Left(exception) ⇒ throw exception - } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) - } + system.dynamicAccess.getClassFor[Actor](loggerName).map({ + case actorClass ⇒ addLogger(system, actorClass, level, logName) + }).recover({ + case e ⇒ throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) + }).get } guard.withGuard { loggers = myloggers diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 249772ab77..80bec9944a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -543,6 +543,16 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): RoundRobinRouter = copy(supervisorStrategy = strategy) + + /** + * Uses the resizer of the given Routerconfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } } trait RoundRobinLike { this: RouterConfig ⇒ @@ -666,6 +676,16 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): RandomRouter = copy(supervisorStrategy = strategy) + + /** + * Uses the resizer of the given Routerconfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } } trait RandomLike { this: RouterConfig ⇒ @@ -796,6 +816,16 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): SmallestMailboxRouter = copy(supervisorStrategy = strategy) + + /** + * Uses the resizer of the given Routerconfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } } trait SmallestMailboxLike { this: RouterConfig ⇒ @@ -878,7 +908,9 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ currentScore: Long = Long.MaxValue, at: Int = 0, deep: Boolean = false): ActorRef = - if (at >= targets.size) { + if (targets.isEmpty) + routeeProvider.context.system.deadLetters + else if (at >= targets.size) { if (deep) { if (proposedTarget.isTerminated) targets(ThreadLocalRandom.current.nextInt(targets.size)) else proposedTarget } else getNext(targets, proposedTarget, currentScore, 0, deep = true) @@ -999,6 +1031,16 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy): BroadcastRouter = copy(supervisorStrategy = strategy) + + /** + * Uses the resizer of the given Routerconfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } } trait BroadcastLike { this: RouterConfig ⇒ @@ -1118,6 +1160,16 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It * Router actor. */ def withSupervisorStrategy(strategy: SupervisorStrategy) = copy(supervisorStrategy = strategy) + + /** + * Uses the resizer of the given Routerconfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } } trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index ee5e87466b..b9d6298784 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -5,7 +5,6 @@ package akka.serialization import akka.AkkaException -import scala.util.DynamicVariable import com.typesafe.config.Config import akka.actor.{ Extension, ExtendedActorSystem, Address, DynamicAccess } import akka.event.Logging @@ -13,6 +12,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.util.control.NonFatal import scala.collection.mutable.ArrayBuffer import java.io.NotSerializableException +import util.{ Try, DynamicVariable } object Serialization { @@ -56,9 +56,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration * to either an Array of Bytes or an Exception if one was thrown. */ - def serialize(o: AnyRef): Either[Throwable, Array[Byte]] = - try Right(findSerializerFor(o).toBinary(o)) - catch { case NonFatal(e) ⇒ Left(e) } + def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o)) /** * Deserializes the given array of bytes using the specified serializer id, @@ -67,18 +65,14 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { */ def deserialize(bytes: Array[Byte], serializerId: Int, - clazz: Option[Class[_]]): Either[Throwable, AnyRef] = - try Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) - catch { case NonFatal(e) ⇒ Left(e) } + clazz: Option[Class[_]]): Try[AnyRef] = Try(serializerByIdentity(serializerId).fromBinary(bytes, clazz)) /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. * You can specify an optional ClassLoader to load the object into. * Returns either the resulting object or an Exception if one was thrown. */ - def deserialize(bytes: Array[Byte], clazz: Class[_]): Either[Throwable, AnyRef] = - try Right(serializerFor(clazz).fromBinary(bytes, Some(clazz))) - catch { case NonFatal(e) ⇒ Left(e) } + def deserialize(bytes: Array[Byte], clazz: Class[_]): Try[AnyRef] = Try(serializerFor(clazz).fromBinary(bytes, Some(clazz))) /** * Returns the Serializer configured for the given object, returns the NullSerializer if it's null. @@ -128,28 +122,24 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Tries to load the specified Serializer by the fully-qualified name; the actual * loading is performed by the system’s [[akka.actor.DynamicAccess]]. */ - def serializerOf(serializerFQN: String): Either[Throwable, Serializer] = - system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)).fold(_ ⇒ - system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()), Right(_)) + def serializerOf(serializerFQN: String): Try[Serializer] = + system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq(classOf[ExtendedActorSystem] -> system)) recoverWith { + case _ ⇒ system.dynamicAccess.createInstanceFor[Serializer](serializerFQN, Seq()) + } /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * By default always contains the following mapping: "java" -> akka.serialization.JavaSerializer */ private val serializers: Map[String, Serializer] = - for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).fold(throw _, identity) + for ((k: String, v: String) ← settings.Serializers) yield k -> serializerOf(v).get /** * bindings is a Seq of tuple representing the mapping from Class to Serializer. * It is primarily ordered by the most specific classes first, and secondly in the configured order. */ - private[akka] val bindings: Seq[ClassSerializer] = { - val configuredBindings = for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield { - val c = system.dynamicAccess.getClassFor[Any](k).fold(throw _, identity[Class[_]]) - (c, serializers(v)) - } - sort(configuredBindings) - } + private[akka] val bindings: Seq[ClassSerializer] = + sort(for ((k: String, v: String) ← settings.SerializationBindings if v != "none") yield (system.dynamicAccess.getClassFor[Any](k).get, serializers(v))) /** * Sort so that subtypes always precede their supertypes, but without diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala index ada4497f29..a98ffc9fd0 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorEndpointPathTest.scala @@ -7,15 +7,15 @@ package akka.camel.internal.component import org.scalatest.mock.MockitoSugar import org.scalatest.matchers.MustMatchers import akka.camel.TestSupport.SharedCamelSystem -import akka.actor.Props import org.scalatest.WordSpec +import akka.actor.{ Actor, Props } class ActorEndpointPathTest extends WordSpec with SharedCamelSystem with MustMatchers with MockitoSugar { def find(path: String) = ActorEndpointPath.fromCamelPath(path).findActorIn(system) "findActorIn returns Some(actor ref) if actor exists" in { - val path = system.actorOf(Props(behavior = ctx ⇒ { case _ ⇒ {} }), "knownactor").path + val path = system.actorOf(Props(new Actor { def receive = { case _ ⇒ } }), "knownactor").path find(path.toString) must be('defined) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 0433a46ba3..5248db6ad8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -46,9 +46,9 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { val failureDetector = { import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn } system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold( - e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), - identity) + fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).recover({ + case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + }).get } new Cluster(system, failureDetector) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c7c9083af8..ee8d1e0374 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -854,6 +854,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) * When at least one reply has been received it stops itself after * an idle SeedNodeTimeout. * + * The seed nodes can be started in any order, but they will not be "active", + * until they have been able to join another seed node (seed1). + * They will retry the join procedure. + * So one possible startup scenario is: + * 1. seed2 started, but doesn't get any ack from seed1 or seed3 + * 2. seed3 started, doesn't get any ack from seed1 or seed3 (seed2 doesn't reply) + * 3. seed1 is started and joins itself + * 4. seed2 retries the join procedure and gets an ack from seed1, and then joins to seed1 + * 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2 + * */ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { import InternalClusterAction._ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala index a7a5c94512..e198694aab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala @@ -25,7 +25,7 @@ object JoinInProgressMultiJvmSpec extends MultiNodeConfig { threshold = 4 acceptable-heartbeat-pause = 1 second } - }""") // increase the leader action task interval + }""") .withFallback(MultiNodeClusterSpec.clusterConfig))) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index b509341ee6..15e308cafb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -11,6 +11,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Props import akka.actor.Actor +import akka.cluster.MemberStatus._ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -62,7 +63,7 @@ abstract class LeaderLeavingSpec awaitCond(!cluster.isRunning) // verify that the LEADER is REMOVED - awaitCond(clusterView.status == MemberStatus.Removed) + awaitCond(clusterView.status == Removed) } else { @@ -71,6 +72,11 @@ abstract class LeaderLeavingSpec cluster.subscribe(system.actorOf(Props(new Actor { def receive = { + case state: CurrentClusterState ⇒ + if (state.members.exists(m ⇒ m.address == oldLeaderAddress && m.status == Leaving)) + leavingLatch.countDown() + if (state.members.exists(m ⇒ m.address == oldLeaderAddress && m.status == Exiting)) + exitingLatch.countDown() case MemberLeft(m) if m.address == oldLeaderAddress ⇒ leavingLatch.countDown() case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown() case _ ⇒ // ignore diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 649b57ee59..1d50fe0d37 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -12,6 +12,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Props import akka.actor.Actor +import akka.cluster.MemberStatus._ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -22,7 +23,6 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-interval = 5 s # increase the leader action task interval unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) @@ -56,9 +56,13 @@ abstract class MembershipChangeListenerExitingSpec runOn(third) { val exitingLatch = TestLatch() + val secondAddress = address(second) cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MemberExited(m) if m.address == address(second) ⇒ + case state: CurrentClusterState ⇒ + if (state.members.exists(m ⇒ m.address == secondAddress && m.status == Exiting)) + exitingLatch.countDown() + case MemberExited(m) if m.address == secondAddress ⇒ exitingLatch.countDown() case _ ⇒ // ignore } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 9999f02078..7c14af7203 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,10 +17,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP - .withFallback(MultiNodeClusterSpec.clusterConfig))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 8e33497d00..b1a7663154 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -12,6 +12,7 @@ import akka.testkit._ import akka.actor.Address import akka.actor.Props import akka.actor.Actor +import akka.cluster.MemberStatus._ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -21,7 +22,6 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.clusterView.leader-actions-interval = 5 s akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) @@ -54,9 +54,13 @@ abstract class MembershipChangeListenerLeavingSpec runOn(third) { val latch = TestLatch() + val secondAddress = address(second) cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MemberLeft(m) if m.address == address(second) ⇒ + case state: CurrentClusterState ⇒ + if (state.members.exists(m ⇒ m.address == secondAddress && m.status == Leaving)) + latch.countDown() + case MemberLeft(m) if m.address == secondAddress ⇒ latch.countDown() case _ ⇒ // ignore } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala deleted file mode 100644 index de21d714bb..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import scala.concurrent.util.duration._ - -object NodeJoinMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval - .withFallback(MultiNodeClusterSpec.clusterConfig))) -} - -class NodeJoinMultiJvmNode1 extends NodeJoinSpec with FailureDetectorPuppetStrategy -class NodeJoinMultiJvmNode2 extends NodeJoinSpec with FailureDetectorPuppetStrategy - -abstract class NodeJoinSpec - extends MultiNodeSpec(NodeJoinMultiJvmSpec) - with MultiNodeClusterSpec { - - import NodeJoinMultiJvmSpec._ - - "A cluster node" must { - "join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in { - - runOn(first) { - startClusterNode() - } - - enterBarrier("first-started") - - runOn(second) { - cluster.join(first) - } - - awaitCond(clusterView.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) - - enterBarrier("after") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 5073e17aa1..529866c433 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -11,6 +11,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Props import akka.actor.Actor +import akka.cluster.MemberStatus._ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -48,6 +49,11 @@ abstract class NodeLeavingAndExitingSpec val exitingLatch = TestLatch() cluster.subscribe(system.actorOf(Props(new Actor { def receive = { + case state: CurrentClusterState ⇒ + if (state.members.exists(m ⇒ m.address == secondAddess && m.status == Leaving)) + leavingLatch.countDown() + if (state.members.exists(m ⇒ m.address == secondAddess && m.status == Exiting)) + exitingLatch.countDown() case MemberLeft(m) if m.address == secondAddess ⇒ leavingLatch.countDown() case MemberExited(m) if m.address == secondAddess ⇒ exitingLatch.countDown() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 7ae98bf261..b291e48579 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -3,6 +3,7 @@ */ package akka.cluster.routing +import language.postfixOps import scala.concurrent.Await import scala.concurrent.util.duration._ import com.typesafe.config.ConfigFactory diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst new file mode 100644 index 0000000000..7bf4e754db --- /dev/null +++ b/akka-docs/cluster/cluster-usage.rst @@ -0,0 +1,164 @@ + +.. _cluster_usage: + +############### + Cluster Usage +############### + +.. note:: This document describes how to use the features implemented so far of the + new clustering coming in Akka Coltrane and is not available in the latest stable release. + The API might change before it is released. + +For introduction to the Akka Cluster concepts please see :ref:`cluster`. + +Preparing Your Project for Clustering +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project: + +.. parsed-literal:: + + "com.typesafe.akka" % "akka-cluster_|scalaVersion|" % "2.1-SNAPSHOT" + +If you are using the latest nightly build you should pick a timestamped Akka version from ``_. + +A Simple Cluster Example +^^^^^^^^^^^^^^^^^^^^^^^^ + +The following small program together with its configuration starts an ``ActorSystem`` +with the Cluster extension enabled. It joins the cluster and logs some membership events. + +Try it out: + +1. Add the following ``application.conf`` in your project, place it in ``src/main/resources``: + + +.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/resources/application.conf + :language: none + +To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala` +settings and the ``akka.cluster.seed-nodes`` to your ``application.conf`` file. + +The seed nodes are configured contact points for initial, automatic, join of the cluster. + +Note that if you are going to start the nodes on different machines you need to specify the +ip-addresses or host names of the machines in ``application.conf`` instead of ``127.0.0.1`` + +2. Add the following main program to your project, place it in ``src/main/scala``: + +.. literalinclude:: ../../akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala + :language: scala + + +3. Start the first seed node. Open a sbt session in one terminal window and run:: + + run-main sample.cluster.ClusterApp 2551 + +2551 corresponds to the port of the first seed-nodes element in the configuration. +In the log output you see that the cluster node has been started and changed status to 'Up'. + +4. Start the second seed node. Open a sbt session in another terminal window and run:: + + run-main sample.cluster.ClusterApp 2552 + + +2552 corresponds to the port of the second seed-nodes element in the configuration. +In the log output you see that the cluster node has been started and joins the other seed node +and becomes a member of the cluster. It's status changed to 'Up'. + +Switch over to the first terminal window and see in the log output that the member joined. + +5. Start another node. Open a sbt session in yet another terminal window and run:: + + run-main sample.cluster.ClusterApp + +Now you don't need to specify the port number, and it will use a random available port. +It joins one of the configured seed nodes. Look at the log output in the different terminal +windows. + +Start even more nodes in the same way, if you like. + +6. Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. +The other nodes will detect the failure after a while, which you can see in the log +output in the other terminals. + +Look at the source code of the program again. What it does is to create an actor +and register it as subscriber of certain cluster events. It gets notified with +an snapshot event, 'CurrentClusterState' that holds full state information of +the cluster. After that it receives events for changes that happen in the cluster. + +Automatic vs. Manual Joining +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You may decide if joining to the cluster should be done automatically or manually. +By default it is automatic and you need to define the seed nodes in configuration +so that a new node has an initial contact point. When a new node is started it +sends a message to all seed nodes and then sends join command to the one that +answers first. If no one of the seed nodes replied (might not be started yet) +it retries this procedure until successful or shutdown. + +There is one thing to be aware of regarding the seed node configured as the +first element in the ``seed-nodes`` configuration list. +The seed nodes can be started in any order and it is not necessary to have all +seed nodes running, but the first seed node must be started when initially +starting a cluster, otherwise the other seed-nodes will not become initialized +and no other node can join the cluster. Once more than two seed nodes have been +started it is no problem to shut down the first seed node. If it goes down it +must be manually joined to the cluster again. +Automatic joining of the first seed node is not possible, it would only join +itself. It is only the first seed node that has this restriction. + +You can disable automatic joining with configuration: + + akka.cluster.auto-join = off + +Then you need to join manually, using JMX or the provided script. +You can join to any node in the cluster. It doesn't have to be configured as +seed node. If you are not using auto-join there is no need to configure +seed nodes at all. + +Joining can also be performed programatically with ``Cluster(system).join``. + + +Automatic vs. Manual Downing +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When a member is considered by the failure detector to be unreachable the +leader is not allowed to perform its duties, such as changing status of +new joining members to 'Up'. The status of the unreachable member must be +changed to 'Down'. This can be performed automatically or manually. By +default it must be done manually, using using JMX or the provided script. + +It can also be performed programatically with ``Cluster(system).down``. + +You can enable automatic downing with configuration: + + akka.cluster.auto-down = on + +Be aware of that using auto-down implies that two separate clusters will +automatically be formed in case of network partition. That might be +desired by some applications but not by others. + +Configuration +^^^^^^^^^^^^^ + +There are several configuration properties for the cluster. We refer to the following +reference file for more information: + + +.. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf + :language: none + +It is recommended that you change the ``tick-duration`` to 33 ms or less +of the default scheduler when using cluster, if you don't need to have it +configured to a longer duration for other reasons. If you don't do this +a dedicated scheduler will be used for periodic tasks of the cluster, which +introduce the extra overhead of another thread. + +:: + + # shorter tick-duration of default scheduler when using cluster + akka.scheduler.tick-duration.tick-duration = 33ms + + + diff --git a/akka-docs/cluster/index.rst b/akka-docs/cluster/index.rst index 35c4b2250a..dac3a558d9 100644 --- a/akka-docs/cluster/index.rst +++ b/akka-docs/cluster/index.rst @@ -5,3 +5,4 @@ Cluster :maxdepth: 2 cluster + cluster-usage diff --git a/akka-docs/general/supervision.rst b/akka-docs/general/supervision.rst index 12053ed6ad..88e23875a0 100644 --- a/akka-docs/general/supervision.rst +++ b/akka-docs/general/supervision.rst @@ -206,7 +206,7 @@ but processed afterwards. Normally stopping a child (i.e. not in response to a failure) will not automatically terminate the other children in an all-for-one strategy, that can easily be done by watching their lifecycle: if the :class:`Terminated` message -is not handled by the supervisor, it will throw a :class:`DeathPathException` +is not handled by the supervisor, it will throw a :class:`DeathPactException` which (depending on its supervisor) will restart it, and the default :meth:`preRestart` action will terminate all children. Of course this can be handled explicitly as well. diff --git a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java index a64fa48615..2b335dee99 100644 --- a/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/docs/actor/UntypedActorDocTestBase.java @@ -317,7 +317,7 @@ public class UntypedActorDocTestBase { Procedure angry = new Procedure() { @Override public void apply(Object message) { - if (message.equals("foo")) { + if (message.equals("bar")) { getSender().tell("I am already angry?"); } else if (message.equals("foo")) { getContext().become(happy); diff --git a/akka-docs/java/futures.rst b/akka-docs/java/futures.rst index 641f7e7eda..b3ef1b5be5 100644 --- a/akka-docs/java/futures.rst +++ b/akka-docs/java/futures.rst @@ -6,9 +6,10 @@ Futures (Java) Introduction ------------ -In Akka, a `Future `_ is a data structure used -to retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor`` or -by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). +In the Scala Standard Library, a `Future `_ is a data structure +used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) +or asynchronously (non-blocking). To be able to use this from Java, Akka provides a java friendly interface +in ``akka.dispatch.Futures``. Execution Contexts ------------------ @@ -27,7 +28,7 @@ Use with Actors There are generally two ways of getting a reply from an ``UntypedActor``: the first is by a sent message (``actorRef.tell(msg)``), which only works if the original sender was an ``UntypedActor``) and the second is through a ``Future``. -Using the ``ActorRef``\'s ``ask`` method to send a message will return a Future. +Using the ``ActorRef``\'s ``ask`` method to send a message will return a ``Future``. To wait for and retrieve the actual result the simplest method is: .. includecode:: code/docs/future/FutureDocTestBase.java @@ -68,7 +69,7 @@ Or failures: Functional Futures ------------------ -Akka's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections. +Scala's ``Future`` has several monadic methods that are very similar to the ones used by ``Scala``'s collections. These allow you to create 'pipelines' or 'streams' that the result will travel through. Future is a Monad @@ -81,12 +82,12 @@ The return value of the ``map`` method is another ``Future`` that will contain t .. includecode:: code/docs/future/FutureDocTestBase.java :include: imports2,map -In this example we are joining two strings together within a Future. Instead of waiting for f1 to complete, +In this example we are joining two strings together within a ``Future``. Instead of waiting for f1 to complete, we apply our function that calculates the length of the string using the ``map`` method. -Now we have a second Future, f2, that will eventually contain an ``Integer``. -When our original ``Future``, f1, completes, it will also apply our function and complete the second Future +Now we have a second ``Future``, f2, that will eventually contain an ``Integer``. +When our original ``Future``, f1, completes, it will also apply our function and complete the second ``Future`` with its result. When we finally ``get`` the result, it will contain the number 10. -Our original Future still contains the string "HelloWorld" and is unaffected by the ``map``. +Our original ``Future`` still contains the string "HelloWorld" and is unaffected by the ``map``. Something to note when using these methods: if the ``Future`` is still being processed when one of these methods are called, it will be the completing thread that actually does the work. @@ -115,7 +116,7 @@ to have this done concurrently, and for that we use ``flatMap``: .. includecode:: code/docs/future/FutureDocTestBase.java :include: flat-map -Now our second Future is executed concurrently as well. This technique can also be used to combine the results +Now our second ``Future`` is executed concurrently as well. This technique can also be used to combine the results of several Futures into a single calculation, which will be better explained in the following sections. If you need to do conditional propagation, you can use ``filter``: @@ -157,7 +158,7 @@ That's all it takes! If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be empty String. -In some cases you don't have a start-value and you're able to use the value of the first completing Future +In some cases you don't have a start-value and you're able to use the value of the first completing ``Future`` in the sequence as the start-value, you can use ``reduce``, it works like this: .. includecode:: code/docs/future/FutureDocTestBase.java @@ -172,7 +173,7 @@ Callbacks --------- Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new Future, but by side-effecting. -For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first. +For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first. .. includecode:: code/docs/future/FutureDocTestBase.java :include: onSuccess @@ -188,8 +189,8 @@ Ordering Since callbacks are executed in any order and potentially in parallel, it can be tricky at the times when you need sequential ordering of operations. -But there's a solution! And it's name is ``andThen``, and it creates a new Future with -the specified callback, a Future that will have the same result as the Future it's called on, +But there's a solution! And it's name is ``andThen``, and it creates a new ``Future`` with +the specified callback, a ``Future`` that will have the same result as the ``Future`` it's called on, which allows for ordering like in the following sample: .. includecode:: code/docs/future/FutureDocTestBase.java diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 16858bc896..845e2825e3 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -75,7 +75,8 @@ How Routing is Designed within Akka Routers behave like single actors, but they should also not hinder scalability. This apparent contradiction is solved by making routers be represented by a -special :class:`RoutedActorRef`, which dispatches incoming messages destined +special :class:`RoutedActorRef` (implementation detail, what the user gets is +an :class:`ActorRef` as usual) which dispatches incoming messages destined for the routees without actually invoking the router actor’s behavior (and thus avoiding its mailbox; the single router actor’s task is to manage all aspects related to the lifecycle of the routees). This means that the code which decides diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index a983a578c9..f1cf49f7d1 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -81,7 +81,8 @@ a top level actor, that is supervised by the system (internal guardian actor). The name parameter is optional, but you should preferably name your actors, since that is used in log messages and for identifying actors. The name must not be empty -or start with ``$``. If the given name is already in use by another child to the +or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space). +If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. Actors are automatically started asynchronously when created. diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 57590376fe..f9a5cb91bd 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -181,6 +181,18 @@ v2.1:: } }, ec); +API changes of DynamicAccess +============================ + +All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X]. + +DynamicAccess.withErrorHandling has been removed since scala.util.Try now fulfills that role. + +API changes of Serialization +============================ + +All methods with scala.Either[Throwable, X] have been changed to used scala.util.Try[X]. + Empty Props =========== @@ -201,7 +213,18 @@ v2.0 Java:: v2.1 Java:: - ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees))); + ActorRef router2 = system.actorOf(new Props().withRouter(RoundRobinRouter.create(routees))); + +Props: Function-based creation +============================== + +v2.0 Scala:: + + Props(context => { case someMessage => context.sender ! someMessage }) + +v2.1 Scala:: + + Props(new Actor { def receive = { case someMessage => sender ! someMessage } }) Failing Send ============ @@ -256,6 +279,13 @@ If you don't want these in the log you need to add this to your configuration:: akka.remote.log-remote-lifecycle-events = off +Stash postStop +============== + +Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sure that +stashed messages are put into the dead letters when the actor stops, make sure you call +super.postStop if you override it. + Custom Router or Resizer ======================== diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 548190c6fd..16c6510c89 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -75,7 +75,8 @@ a top level actor, that is supervised by the system (internal guardian actor). The name parameter is optional, but you should preferably name your actors, since that is used in log messages and for identifying actors. The name must not be empty -or start with ``$``. If the given name is already in use by another child to the +or start with ``$``, but it may contain URL encoded characters (eg. ``%20`` for a blank space). +If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. Actors are automatically started asynchronously when created. @@ -108,6 +109,11 @@ Here is an example: meaning of an actor restart, which is described here: :ref:`supervision-restart`. +.. warning:: + + Also avoid passing mutable state into the constructor of the Actor, since + the call-by-name block can be executed by another thread. + Props ----- @@ -380,7 +386,7 @@ futures, because this is likely to be a common combination. Please note that all of the above is completely non-blocking and asynchronous: ``ask`` produces a :class:`Future`, three of which are composed into a new future using the for-comprehension and then ``pipeTo`` installs an ``onComplete``-handler on the -future to effect the submission of the aggregated :class:`Result` to another +future to affect the submission of the aggregated :class:`Result` to another actor. Using ``ask`` will send a message to the receiving Actor as with ``tell``, and diff --git a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala index 690b0ad838..0ce5f87728 100644 --- a/akka-docs/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/ActorDocSpec.scala @@ -372,8 +372,8 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val f: Future[Result] = for { x ← ask(actorA, Request).mapTo[Int] // call pattern directly - s ← actorB ask Request mapTo manifest[String] // call by implicit conversion - d ← actorC ? Request mapTo manifest[Double] // call by symbolic name + s ← (actorB ask Request).mapTo[String] // call by implicit conversion + d ← (actorC ? Request).mapTo[Double] // call by symbolic name } yield Result(x, s, d) f pipeTo actorD // .. or .. diff --git a/akka-docs/scala/code/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/docs/future/FutureDocSpec.scala index 74cc28ce06..421f5ef8fc 100644 --- a/akka-docs/scala/code/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/docs/future/FutureDocSpec.scala @@ -117,7 +117,7 @@ class FutureDocSpec extends AkkaSpec { val f1 = Future { "Hello" + "World" } - val f2 = Promise.successful(3).future + val f2 = Future.successful(3) val f3 = f1 map { x ⇒ f2 map { y ⇒ x.length * y @@ -132,7 +132,7 @@ class FutureDocSpec extends AkkaSpec { val f1 = Future { "Hello" + "World" } - val f2 = Promise.successful(3).future + val f2 = Future.successful(3) val f3 = f1 flatMap { x ⇒ f2 map { y ⇒ x.length * y @@ -145,7 +145,7 @@ class FutureDocSpec extends AkkaSpec { "demonstrate usage of filter" in { //#filter - val future1 = Promise.successful(4).future + val future1 = Future.successful(4) val future2 = future1.filter(_ % 2 == 0) val result = Await.result(future2, 1 second) result must be(4) @@ -290,8 +290,8 @@ class FutureDocSpec extends AkkaSpec { val msg1 = -1 //#try-recover val future = akka.pattern.ask(actor, msg1) recoverWith { - case e: ArithmeticException ⇒ Promise.successful(0).future - case foo: IllegalArgumentException ⇒ Promise.failed[Int](new IllegalStateException("All br0ken!")).future + case e: ArithmeticException ⇒ Future.successful(0) + case foo: IllegalArgumentException ⇒ Future.failed[Int](new IllegalStateException("All br0ken!")) } //#try-recover Await.result(future, 1 second) must be(0) @@ -343,7 +343,7 @@ class FutureDocSpec extends AkkaSpec { Await.result(future, 1 second) must be("foo") } { - val future = Promise.failed[String](new IllegalStateException("OHNOES")).future + val future = Future.failed[String](new IllegalStateException("OHNOES")) //#onFailure future onFailure { case ise: IllegalStateException if ise.getMessage == "OHNOES" ⇒ @@ -367,12 +367,12 @@ class FutureDocSpec extends AkkaSpec { } } - "demonstrate usage of Promise.success & Promise.failed" in { + "demonstrate usage of Future.successful & Future.failed" in { //#successful - val future = Promise.successful("Yay!").future + val future = Future.successful("Yay!") //#successful //#failed - val otherFuture = Promise.failed[String](new IllegalArgumentException("Bang!")).future + val otherFuture = Future.failed[String](new IllegalArgumentException("Bang!")) //#failed Await.result(future, 1 second) must be("Yay!") intercept[IllegalArgumentException] { Await.result(otherFuture, 1 second) } diff --git a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala index c71228d06c..bea1bf16f4 100644 --- a/akka-docs/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/RouterDocSpec.scala @@ -4,9 +4,9 @@ package docs.routing import RouterDocSpec.MyActor -import akka.actor.{ Props, Actor } import akka.testkit.AkkaSpec import akka.routing.RoundRobinRouter +import akka.actor.{ ActorRef, Props, Actor } object RouterDocSpec { class MyActor extends Actor { @@ -21,7 +21,7 @@ class RouterDocSpec extends AkkaSpec { import RouterDocSpec._ //#dispatchers - val router = system.actorOf(Props[MyActor] + val router: ActorRef = system.actorOf(Props[MyActor] .withRouter(RoundRobinRouter(5, routerDispatcher = "router")) // “head” will run on "router" dispatcher .withDispatcher("workers")) // MyActor workers will run on "workers" dispatcher //#dispatchers diff --git a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala index 8d8865dd44..6e77d7a843 100644 --- a/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala +++ b/akka-docs/scala/code/docs/zeromq/ZeromqDocSpec.scala @@ -49,12 +49,12 @@ object ZeromqDocSpec { val timestamp = System.currentTimeMillis // use akka SerializationExtension to convert to bytes - val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).fold(throw _, identity) + val heapPayload = ser.serialize(Heap(timestamp, currentHeap.getUsed, currentHeap.getMax)).get // the first frame is the topic, second is the message pubSocket ! ZMQMessage(Seq(Frame("health.heap"), Frame(heapPayload))) // use akka SerializationExtension to convert to bytes - val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).fold(throw _, identity) + val loadPayload = ser.serialize(Load(timestamp, os.getSystemLoadAverage)).get // the first frame is the topic, second is the message pubSocket ! ZMQMessage(Seq(Frame("health.load"), Frame(loadPayload))) } @@ -71,18 +71,12 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - ser.deserialize(m.payload(1), classOf[Heap]) match { - case Right(Heap(timestamp, used, max)) ⇒ - log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp))) - case Left(e) ⇒ throw e - } + val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get + log.info("Used heap {} bytes, at {}", used, timestampFormat.format(new Date(timestamp))) case m: ZMQMessage if m.firstFrameAsString == "health.load" ⇒ - ser.deserialize(m.payload(1), classOf[Load]) match { - case Right(Load(timestamp, loadAverage)) ⇒ - log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp))) - case Left(e) ⇒ throw e - } + val Load(timestamp, loadAverage) = ser.deserialize(m.payload(1), classOf[Load]).get + log.info("Load average {}, at {}", loadAverage, timestampFormat.format(new Date(timestamp))) } } //#logger @@ -97,13 +91,10 @@ object ZeromqDocSpec { def receive = { // the first frame is the topic, second is the message case m: ZMQMessage if m.firstFrameAsString == "health.heap" ⇒ - ser.deserialize(m.payload(1), classOf[Heap]) match { - case Right(Heap(timestamp, used, max)) ⇒ - if ((used.toDouble / max) > 0.9) count += 1 - else count = 0 - if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max)) - case Left(e) ⇒ throw e - } + val Heap(timestamp, used, max) = ser.deserialize(m.payload(1), classOf[Heap]).get + if ((used.toDouble / max) > 0.9) count += 1 + else count = 0 + if (count > 10) log.warning("Need more memory, using {} %", (100.0 * used / max)) } } //#alerter diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 5a31482e81..1b4df4154b 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -7,9 +7,9 @@ Futures (Scala) Introduction ------------ -In Akka, a `Future `_ is a data structure used to -retrieve the result of some concurrent operation. This operation is usually performed by an ``Actor`` -or by the ``Dispatcher`` directly. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). +In the Scala Standard Library, a `Future `_ is a data structure +used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) +or asynchronously (non-blocking). Execution Contexts ------------------ @@ -28,7 +28,7 @@ Use With Actors There are generally two ways of getting a reply from an ``Actor``: the first is by a sent message (``actor ! msg``), which only works if the original sender was an ``Actor``) and the second is through a ``Future``. -Using an ``Actor``\'s ``?`` method to send a message will return a Future. To wait for and retrieve the actual result the simplest method is: +Using an ``Actor``\'s ``?`` method to send a message will return a ``Future``. To wait for and retrieve the actual result the simplest method is: .. includecode:: code/docs/future/FutureDocSpec.scala :include: ask-blocking @@ -61,7 +61,7 @@ with the return value of the block used to complete the ``Future`` (in this case Unlike a ``Future`` that is returned from an ``Actor``, this ``Future`` is properly typed, and we also avoid the overhead of managing an ``Actor``. -You can also create already completed Futures using the ``Promise`` companion, which can be either successes: +You can also create already completed Futures using the ``Future`` companion, which can be either successes: .. includecode:: code/docs/future/FutureDocSpec.scala :include: successful @@ -74,7 +74,7 @@ Or failures: Functional Futures ------------------ -Akka's ``Future`` has several monadic methods that are very similar to the ones used by Scala's collections. +Scala's ``Future`` has several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through. Future is a Monad @@ -148,7 +148,7 @@ Here we have 2 actors processing a single message each. Once the 2 results are a (note that we don't block to get these results!), they are being added together and sent to a third ``Actor``, which replies with a string, which we assign to 'result'. -This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more then a handful. +This is fine when dealing with a known amount of Actors, but can grow unwieldy if we have more than a handful. The ``sequence`` and ``traverse`` helper methods can make it easier to handle more complex use cases. Both of these methods are ways of turning, for a subclass ``T`` of ``Traversable``, ``T[Future[A]]`` into a ``Future[T[A]]``. For example: @@ -185,20 +185,20 @@ That's all it takes! If the sequence passed to ``fold`` is empty, it will return the start-value, in the case above, that will be 0. -In some cases you don't have a start-value and you're able to use the value of the first completing Future in the sequence +In some cases you don't have a start-value and you're able to use the value of the first completing ``Future`` in the sequence as the start-value, you can use ``reduce``, it works like this: .. includecode:: code/docs/future/FutureDocSpec.scala :include: reduce -Same as with ``fold``, the execution will be done asynchronously when the last of the Future is completed, +Same as with ``fold``, the execution will be done asynchronously when the last of the ``Future`` is completed, you can also parallelize it by chunking your futures into sub-sequences and reduce them, and then reduce the reduced results again. Callbacks --------- -Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new Future, but by side-effecting. -For this Akka supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first. +Sometimes you just want to listen to a ``Future`` being completed, and react to that not by creating a new ``Future``, but by side-effecting. +For this Scala supports ``onComplete``, ``onSuccess`` and ``onFailure``, of which the latter two are specializations of the first. .. includecode:: code/docs/future/FutureDocSpec.scala :include: onSuccess diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index a0bbe37089..353e82d277 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -75,7 +75,8 @@ How Routing is Designed within Akka Routers behave like single actors, but they should also not hinder scalability. This apparent contradiction is solved by making routers be represented by a -special :class:`RoutedActorRef`, which dispatches incoming messages destined +special :class:`RoutedActorRef` (implementation detail, what the user gets is +an :class:`ActorRef` as usual) which dispatches incoming messages destined for the routees without actually invoking the router actor’s behavior (and thus avoiding its mailbox; the single router actor’s task is to manage all aspects related to the lifecycle of the routees). This means that the code which decides diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 41db33d629..2e22a0d819 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -134,6 +134,9 @@ private[akka] object ClientFSM { * coordinator and react to the [[akka.remote.testconductor.Conductor]]’s * requests for failure injection. * + * Note that you can't perform requests concurrently, e.g. enter barrier + * from one thread and ask for node address from another thread. + * * INTERNAL API. */ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala index f49dc53e2b..58d41cc003 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala @@ -13,7 +13,7 @@ import akka.testkit._ object LookupRemoteActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor with Serializable { + class SomeActor extends Actor { def receive = { case "identify" ⇒ sender ! self } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index eca91495d6..094e6692f7 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -15,7 +15,7 @@ import akka.testkit._ object NewRemoteActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor with Serializable { + class SomeActor extends Actor { def receive = { case "identify" ⇒ sender ! self } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala index 6eb601ab8f..3d1286fd10 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RandomRoutedRemoteActorSpec.scala @@ -22,7 +22,7 @@ import scala.concurrent.util.duration._ object RandomRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor with Serializable { + class SomeActor extends Actor { def receive = { case "hit" ⇒ sender ! self } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala index c521964e7f..c6925c0984 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/RoundRobinRoutedRemoteActorSpec.scala @@ -17,17 +17,26 @@ import akka.remote.testkit.MultiNodeSpec import akka.routing.Broadcast import akka.routing.RoundRobinRouter import akka.routing.RoutedActorRef +import akka.routing.Resizer +import akka.routing.RouteeProvider import akka.testkit._ import scala.concurrent.util.duration._ object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor with Serializable { + class SomeActor extends Actor { def receive = { case "hit" ⇒ sender ! self } } + class TestResizer extends Resizer { + def isTimeForResize(messageCounter: Long): Boolean = messageCounter <= 10 + def resize(routeeProvider: RouteeProvider): Unit = { + routeeProvider.createRoutees(nrOfInstances = 1) + } + } + val first = role("first") val second = role("second") val third = role("third") @@ -39,6 +48,9 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { /service-hello.router = "round-robin" /service-hello.nr-of-instances = 3 /service-hello.target.nodes = ["@first@", "@second@", "@third@"] + + /service-hello2.router = "round-robin" + /service-hello2.target.nodes = ["@first@", "@second@", "@third@"] """) } @@ -92,4 +104,44 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo } } } + + "A new remote actor configured with a RoundRobin router and Resizer" must { + "be locally instantiated on a remote node after several resize rounds" taggedAs LongRunningTest in { + + runOn(first, second, third) { + enterBarrier("start", "broadcast-end", "end", "done") + } + + runOn(fourth) { + enterBarrier("start") + val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter( + resizer = Some(new TestResizer))), "service-hello2") + actor.isInstanceOf[RoutedActorRef] must be(true) + + val iterationCount = 9 + + val repliesFrom: Set[ActorRef] = + (for { + i ← 0 until iterationCount + } yield { + actor ! "hit" + receiveOne(5 seconds) match { case ref: ActorRef ⇒ ref } + }).toSet + + enterBarrier("broadcast-end") + actor ! Broadcast(PoisonPill) + + enterBarrier("end") + // at least more than one actor per node + repliesFrom.size must be > (3) + val repliesFromAddresses = repliesFrom.map(_.path.address) + repliesFromAddresses must be === (Set(node(first), node(second), node(third)).map(_.address)) + + // shut down the actor before we let the other node(s) shut down so we don't try to send + // "Terminate" to a shut down node + system.stop(actor) + enterBarrier("done") + } + } + } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala index 903731a505..40f25dce73 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/router/ScatterGatherRoutedRemoteActorSpec.scala @@ -22,7 +22,7 @@ import akka.actor.Address object ScatterGatherRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor with Serializable { + class SomeActor extends Actor { def receive = { case "hit" ⇒ sender ! self } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b2413da3f9..b459a48c98 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -20,16 +20,10 @@ private[akka] object MessageSerializer { * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = { - val clazz = - if (messageProtocol.hasMessageManifest) { - system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8) - .fold(throw _, Some(_)) - } else None - SerializationExtension(system) - .deserialize(messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, clazz) match { - case Left(e) ⇒ throw e - case Right(r) ⇒ r - } + SerializationExtension(system).deserialize( + messageProtocol.getMessage.toByteArray, + messageProtocol.getSerializerId, + if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5377013a42..b118279ae1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -72,10 +72,9 @@ class RemoteActorRefProvider( classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) - system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match { - case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - case Right(remote) ⇒ remote - } + system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({ + case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + }).get } _log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 183ba66beb..1b21b89ccb 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -15,8 +15,9 @@ import akka.remote.RemoteScope import akka.actor.AddressFromURIString import akka.actor.SupervisorStrategy import akka.actor.Address - import scala.collection.JavaConverters._ +import java.util.concurrent.atomic.AtomicInteger +import java.lang.IllegalStateException /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -45,8 +46,10 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext override def resizer: Option[Resizer] = local.resizer override def withFallback(other: RouterConfig): RouterConfig = other match { + case RemoteRouterConfig(local: RemoteRouterConfig, nodes) ⇒ throw new IllegalStateException( + "RemoteRouterConfig is not allowed to wrap a RemoteRouterConfig") case RemoteRouterConfig(local, nodes) ⇒ copy(local = this.local.withFallback(local)) - case _ ⇒ this + case _ ⇒ copy(local = this.local.withFallback(other)) } } @@ -64,6 +67,8 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro // need this iterator as instance variable since Resizer may call createRoutees several times private val nodeAddressIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator + // need this counter as instance variable since Resizer may call createRoutees several times + private val childNameCounter = new AtomicInteger override def registerRouteesFor(paths: Iterable[String]): Unit = throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" @@ -72,7 +77,7 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro override def createRoutees(nrOfInstances: Int): Unit = { val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { - val name = "c" + i + val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, systemService = false, Some(deploy), lookupDeploy = false, async = false) diff --git a/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala index 8f25021253..19aabd398f 100644 --- a/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/DaemonMsgCreateSerializer.scala @@ -13,6 +13,7 @@ import akka.remote.RemoteProtocol.{ DaemonMsgCreateProtocol, DeployProtocol, Pro import akka.routing.{ NoRouter, RouterConfig } import akka.actor.FromClassCreator import scala.reflect.ClassTag +import util.{ Failure, Success } /** * Serializes akka's internal DaemonMsgCreate using protobuf @@ -88,14 +89,10 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e def props = { val creator = - if (proto.getProps.hasFromClassCreator) { - system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator) match { - case Right(clazz) ⇒ FromClassCreator(clazz) - case Left(e) ⇒ throw e - } - } else { + if (proto.getProps.hasFromClassCreator) + FromClassCreator(system.dynamicAccess.getClassFor[Actor](proto.getProps.getFromClassCreator).get) + else deserialize(proto.getProps.getCreator, classOf[() ⇒ Actor]) - } val routerConfig = if (proto.getProps.hasRouterConfig) deserialize(proto.getProps.getRouterConfig, classOf[RouterConfig]) @@ -115,26 +112,22 @@ private[akka] class DaemonMsgCreateSerializer(val system: ExtendedActorSystem) e supervisor = deserializeActorRef(system, proto.getSupervisor)) } - protected def serialize(any: AnyRef): ByteString = - serialization.serialize(any) match { - case Right(bytes) ⇒ ByteString.copyFrom(bytes) - case Left(e) ⇒ throw e - } + protected def serialize(any: AnyRef): ByteString = ByteString.copyFrom(serialization.serialize(any).get) protected def deserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = { val bytes = data.toByteArray serialization.deserialize(bytes, clazz) match { - case Right(x: T) ⇒ x - case Right(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other)) - case Left(e) ⇒ + case Success(x: T) ⇒ x + case Success(other) ⇒ throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other)) + case Failure(e) ⇒ // Fallback to the java serializer, because some interfaces don't implement java.io.Serializable, // but the impl instance does. This could be optimized by adding java serializers in reference.conf: // com.typesafe.config.Config // akka.routing.RouterConfig // akka.actor.Scope serialization.deserialize(bytes, classOf[java.io.Serializable]) match { - case Right(x: T) ⇒ x - case _ ⇒ throw e // the first exception + case Success(x: T) ⇒ x + case _ ⇒ throw e // the first exception } } } diff --git a/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala b/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala index 0c92ec7ee1..38a50ea886 100644 --- a/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/DaemonMsgCreateSerializerSpec.scala @@ -79,15 +79,7 @@ class DaemonMsgCreateSerializerSpec extends AkkaSpec { } def verifySerialization(msg: DaemonMsgCreate): Unit = { - val bytes = ser.serialize(msg) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgCreate]) match { - case Left(exception) ⇒ fail(exception) - case Right(m: DaemonMsgCreate) ⇒ assertDaemonMsgCreate(msg, m) - case other ⇒ throw new MatchError(other) - } + assertDaemonMsgCreate(msg, ser.deserialize(ser.serialize(msg).get, classOf[DaemonMsgCreate]).get.asInstanceOf[DaemonMsgCreate]) } def assertDaemonMsgCreate(expected: DaemonMsgCreate, got: DaemonMsgCreate): Unit = { diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index 779b3825cd..ee403ff23d 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -13,6 +13,8 @@ akka { extensions = ["akka.cluster.Cluster$"] cluster { - seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"] + seed-nodes = [ + "akka://ClusterSystem@127.0.0.1:2551", + "akka://ClusterSystem@127.0.0.1:2552"] } } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala index 521176d142..0fd396784d 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala @@ -8,24 +8,26 @@ object ClusterApp { def main(args: Array[String]): Unit = { + // Override the configuration of the port + // when specified as program argument if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) // Create an Akka system val system = ActorSystem("ClusterSystem") - val clusterListener = system.actorOf(Props(new Actor { + val clusterListener = system.actorOf(Props(new Actor with ActorLogging { def receive = { case state: CurrentClusterState ⇒ - println("Current members: " + state.members) + log.info("Current members: {}", state.members) case MemberJoined(member) ⇒ - println("Member joined: " + member) + log.info("Member joined: {}", member) case MemberUp(member) ⇒ - println("Member is Up: " + member) + log.info("Member is Up: {}", member) case MemberUnreachable(member) ⇒ - println("Member detected as unreachable: " + member) + log.info("Member detected as unreachable: {}", member) case _ ⇒ // ignore } - })) + }), name = "clusterListener") Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent]) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 04d344b707..f47fa1acb6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -237,11 +237,15 @@ class CallingThreadDispatcher( try { if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle) mbox.actor.invoke(handle) + if (Thread.interrupted()) { // clear interrupted flag before we continue + intex = new InterruptedException("Interrupted during message processing") + log.error(intex, "Interrupted during message processing") + } true } catch { case ie: InterruptedException ⇒ log.error(ie, "Interrupted during message processing") - Thread.currentThread().interrupt() + Thread.interrupted() // clear interrupted flag before continuing intex = ie true case NonFatal(e) ⇒ @@ -262,7 +266,7 @@ class CallingThreadDispatcher( runQueue(mbox, queue, intex) } else { if (intex ne null) { - Thread.interrupted // clear interrupted flag before throwing according to java convention + Thread.interrupted() // clear interrupted flag before throwing according to java convention throw intex } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 33559cd56c..57aedba34f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -136,14 +136,13 @@ object TestActorRef { def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](name: String)(implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({ - system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()) match { - case Right(value) ⇒ value - case Left(exception) ⇒ throw ActorInitializationException(null, + system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[T](t.runtimeClass, Seq()).recover({ + case exception ⇒ throw ActorInitializationException(null, "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + "\nOR try to change: 'actorOf(Props[MyActor]' to 'actorOf(Props(new MyActor)'.", exception) - } + }).get }), name) /** diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 19f63c1567..e52fe741ea 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -430,7 +430,7 @@ trait TestKitBase { /** * Same as `expectMsgAllClassOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllClassOf[T](obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(remaining, obj: _*) + def expectMsgAllClassOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -440,9 +440,9 @@ trait TestKitBase { * Wait time is bounded by the given duration, with an AssertionFailure * being thrown in case of timeout. */ - def expectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(max.dilated, obj: _*) + def expectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllClassOf(max.dilated, obj: _*) - private def expectMsgAllClassOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllClassOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (_.getClass eq BoxedType(x)), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) eq x.getClass), "found non-matching object " + x)) @@ -452,7 +452,7 @@ trait TestKitBase { /** * Same as `expectMsgAllConformingOf(remaining, obj...)`, but correctly treating the timeFactor. */ - def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): Seq[T] = expectMsgAllClassOf_internal(remaining, obj: _*) + def expectMsgAllConformingOf[T](obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(remaining, obj: _*) /** * Receive a number of messages from the test actor matching the given @@ -465,9 +465,9 @@ trait TestKitBase { * Beware that one object may satisfy all given class constraints, which * may be counter-intuitive. */ - def expectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = expectMsgAllConformingOf(max.dilated, obj: _*) + def expectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = internalExpectMsgAllConformingOf(max.dilated, obj: _*) - private def expectMsgAllConformingOf_internal[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { + private def internalExpectMsgAllConformingOf[T](max: Duration, obj: Class[_ <: T]*): Seq[T] = { val recv = receiveN_internal(obj.size, max) obj foreach (x ⇒ assert(recv exists (BoxedType(x) isInstance _), "not found " + x)) recv foreach (x ⇒ assert(obj exists (c ⇒ BoxedType(c) isInstance x), "found non-matching object " + x)) diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 67478b35e3..a1d32e019e 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -7,15 +7,14 @@ import language.{ postfixOps, reflectiveCalls } import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag } import org.scalatest.matchers.MustMatchers -import akka.actor.{ Actor, ActorRef, Props, ActorSystem, PoisonPill, DeadLetter } +import akka.actor.{ Actor, Props, ActorSystem, PoisonPill, DeadLetter, ActorSystemImpl } import akka.event.{ Logging, LoggingAdapter } import scala.concurrent.util.duration._ -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import com.typesafe.config.{ Config, ConfigFactory } import java.util.concurrent.TimeoutException -import akka.dispatch.{ MessageDispatcher, Dispatchers } +import akka.dispatch.Dispatchers import akka.pattern.ask -import akka.actor.ActorSystemImpl object TimingTest extends Tag("timing") object LongRunningTest extends Tag("long-running") @@ -90,9 +89,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atTermination() {} - def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) { - system.actorOf(Props(ctx ⇒ { case "go" ⇒ try body finally ctx.stop(ctx.self) }).withDispatcher(dispatcherId)) ! "go" - } + def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit): Unit = + Future(body)(system.dispatchers.lookup(dispatcherId)) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 7df30be4c7..9d88c7b111 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -117,7 +117,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA "used with TestActorRef" in { val a = TestActorRef(Props(new Actor { - val nested = TestActorRef(Props(self ⇒ { case _ ⇒ })) + val nested = TestActorRef(Props(new Actor { def receive = { case _ ⇒ } })) def receive = { case _ ⇒ sender ! nested } })) a must not be (null) @@ -128,7 +128,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA "used with ActorRef" in { val a = TestActorRef(Props(new Actor { - val nested = context.actorOf(Props(self ⇒ { case _ ⇒ })) + val nested = context.actorOf(Props(new Actor { def receive = { case _ ⇒ } })) def receive = { case _ ⇒ sender ! nested } })) a must not be (null) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 0d1e139e81..10c39cdc05 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -68,11 +68,14 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout { } "be able to expect primitive types" in { - for (_ ← 1 to 4) testActor ! 42 + for (_ ← 1 to 7) testActor ! 42 expectMsgType[Int] must be(42) expectMsgAnyClassOf(classOf[Int]) must be(42) expectMsgAllClassOf(classOf[Int]) must be(Seq(42)) expectMsgAllConformingOf(classOf[Int]) must be(Seq(42)) + expectMsgAllConformingOf(5 seconds, classOf[Int]) must be(Seq(42)) + expectMsgAllClassOf(classOf[Int]) must be(Seq(42)) + expectMsgAllClassOf(5 seconds, classOf[Int]) must be(Seq(42)) } } diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index ecaa47fe96..e4724cf8a3 100644 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -68,8 +68,6 @@ object CoordinatedIncrement { class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) with BeforeAndAfterAll { import CoordinatedIncrement._ - implicit val timeout = Timeout(5.seconds.dilated) - val numCounters = 4 def actorOfs = { @@ -80,13 +78,14 @@ class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) wit } "Coordinated increment" should { + implicit val timeout = Timeout(100.millis.dilated) "increment all counters by one with successful transactions" in { val (counters, failer) = actorOfs val coordinated = Coordinated() counters(0) ! coordinated(Increment(counters.tail)) coordinated.await for (counter ← counters) { - Await.result((counter ? GetCount).mapTo[Int], timeout.duration) must be === 1 + Await.result((counter ? GetCount).mapTo[Int], remaining) must be === 1 } counters foreach (system.stop(_)) system.stop(failer) @@ -103,7 +102,7 @@ class CoordinatedIncrementSpec extends AkkaSpec(CoordinatedIncrement.config) wit counters(0) ! Coordinated(Increment(counters.tail :+ failer)) coordinated.await for (counter ← counters) { - Await.result(counter ? GetCount, timeout.duration) must be === 0 + Await.result(counter ? GetCount, remaining) must be === 0 } counters foreach (system.stop(_)) system.stop(failer) diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index fbef659a7b..6fc349b798 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -31,7 +31,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A private val noBytes = Array[Byte]() private val zmqContext = params collectFirst { case c: Context ⇒ c } getOrElse DefaultContext - private val deserializer = deserializerFromParams + private var deserializer = params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer private val socketType = { import SocketType.{ ZMQSocketType ⇒ ST } params.collectFirst { case t: ST ⇒ t }.getOrElse(throw new IllegalArgumentException("A socket type is required")) @@ -39,7 +39,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A private val socket: Socket = zmqContext.socket(socketType) private val poller: Poller = zmqContext.poller - private val log = Logging(context.system, this) private val pendingSends = new ListBuffer[Seq[Frame]] @@ -93,6 +92,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A case MulticastHops(value) ⇒ socket.setMulticastHops(value) case SendBufferSize(value) ⇒ socket.setSendBufferSize(value) case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value) + case d: Deserializer ⇒ deserializer = d } private def handleSocketOptionQuery(msg: SocketOptionQuery): Unit = @@ -135,9 +135,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ } } - private def deserializerFromParams: Deserializer = - params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer - private def setupSocket() = params foreach { case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently case m ⇒ self ! m diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 07fcd8f09b..db819d68d8 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -7,7 +7,7 @@ package akka import sbt._ import sbt.Keys._ import com.typesafe.sbtmultijvm.MultiJvmPlugin -import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions, multiNodeExecuteTests } +import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions, jvmOptions, scalatestOptions, multiNodeExecuteTests, multiNodeJavaName } import com.typesafe.sbtscalariform.ScalariformPlugin import com.typesafe.sbtscalariform.ScalariformPlugin.ScalariformKeys import com.typesafe.sbtosgi.OsgiPlugin.{ OsgiKeys, osgiSettings } @@ -24,7 +24,7 @@ object AkkaBuild extends Build { lazy val buildSettings = Seq( organization := "com.typesafe.akka", version := "2.1-SNAPSHOT", - scalaVersion := "2.10.0-M7" + scalaVersion := System.getProperty("akka.scalaVersion", "2.10.0-M7") ) lazy val akka = Project( @@ -467,7 +467,9 @@ object AkkaBuild extends Build { lazy val multiJvmSettings = MultiJvmPlugin.settings ++ inConfig(MultiJvm)(ScalariformPlugin.scalariformSettings) ++ Seq( compileInputs in MultiJvm <<= (compileInputs in MultiJvm) dependsOn (ScalariformKeys.format in MultiJvm), + compile in MultiJvm <<= (compile in MultiJvm) triggeredBy (compile in Test), ScalariformKeys.preferences in MultiJvm := formattingPreferences) ++ + Option(System.getProperty("akka.test.multi-node.java")).map(x => Seq(multiNodeJavaName in MultiJvm := x)).getOrElse(Seq.empty) ++ ((executeMultiJvmTests, multiNodeEnabled) match { case (true, true) => executeTests in Test <<= ((executeTests in Test), (multiNodeExecuteTests in MultiJvm)) map { @@ -484,6 +486,7 @@ object AkkaBuild extends Build { case (false, _) => Seq.empty }) + lazy val mimaSettings = mimaDefaultSettings ++ Seq( // MiMa previousArtifact := None @@ -581,7 +584,7 @@ object Dependencies { object Dependency { // Compile val camelCore = "org.apache.camel" % "camel-core" % "2.10.0" exclude("org.slf4j", "slf4j-api") // ApacheV2 - val config = "com.typesafe" % "config" % "0.5.0" // ApacheV2 + val config = "com.typesafe" % "config" % "0.5.2" // ApacheV2 val netty = "io.netty" % "netty" % "3.5.4.Final" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD val scalaStm = "org.scala-tools" % "scala-stm" % "0.6" cross CrossVersion.full // Modified BSD (Scala) diff --git a/project/plugins.sbt b/project/plugins.sbt index 2f4e1d369b..69ae35efef 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ resolvers += Classpaths.typesafeResolver -addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M4" cross CrossVersion.full) +addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.2.0-M5" cross CrossVersion.full) addSbtPlugin("com.typesafe.sbtscalariform" % "sbtscalariform" % "0.4.0")