diff --git a/akka-actor-tests/src/test/scala/akka/io/TestUtils.scala b/akka-actor-tests/src/test/scala/akka/TestUtils.scala similarity index 98% rename from akka-actor-tests/src/test/scala/akka/io/TestUtils.scala rename to akka-actor-tests/src/test/scala/akka/TestUtils.scala index 27c9dd21a9..ed19d33367 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TestUtils.scala +++ b/akka-actor-tests/src/test/scala/akka/TestUtils.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2013 Typesafe Inc. */ -package akka.io +package akka import scala.collection.immutable import java.net.InetSocketAddress diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index 2d7b7bc13d..c3c3b4b63d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -159,7 +159,7 @@ class ActorDSLSpec extends AkkaSpec { become { case "die" ⇒ throw new Exception } - whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) } + whenFailing { case m @ (cause, msg) ⇒ testActor ! m } whenRestarted { cause ⇒ testActor ! cause } }) //#failing-actor diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index ae6532acea..0de13e7239 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -6,9 +6,6 @@ package akka.actor import language.postfixOps -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers - import akka.testkit._ import akka.util.Timeout import scala.concurrent.duration._ @@ -17,6 +14,7 @@ import java.lang.IllegalStateException import scala.concurrent.Promise import akka.pattern.ask import akka.serialization.JavaSerializer +import akka.TestUtils.verifyActorTermination object ActorRefSpec { @@ -43,19 +41,17 @@ object ActorRefSpec { import context.system def receive = { case "work" ⇒ { - work + work() sender ! "workDone" context.stop(self) } case ReplyTo(replyTo) ⇒ { - work + work() replyTo ! "complexReply" } } - private def work { - Thread.sleep(1.second.dilated.toMillis) - } + private def work(): Unit = Thread.sleep(1.second.dilated.toMillis) } class SenderActor(replyActor: ActorRef, latch: TestLatch) extends Actor { @@ -143,7 +139,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { new Actor { def receive = { case _ ⇒ } } } - def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None + def contextStackMustBeEmpty(): Unit = ActorCell.contextStack.get.headOption must be === None EventFilter[ActorInitializationException](occurrences = 1) intercept { intercept[akka.actor.ActorInitializationException] { @@ -154,7 +150,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { }))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -163,7 +159,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result)))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -172,7 +168,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -181,7 +177,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(promiseIntercept(new FailingInheritingOuterActor(actorOf(Props(new InnerActor))))(result)))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 2) intercept { @@ -190,7 +186,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 2) intercept { @@ -199,7 +195,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 2) intercept { @@ -208,7 +204,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -219,7 +215,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { })))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 2) intercept { @@ -228,7 +224,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -237,7 +233,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -246,7 +242,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result))))))) } - contextStackMustBeEmpty + contextStackMustBeEmpty() } EventFilter[ActorInitializationException](occurrences = 1) intercept { @@ -255,7 +251,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result))))))) }).getMessage must be === "Ur state be b0rked" - contextStackMustBeEmpty + contextStackMustBeEmpty() } } @@ -318,17 +314,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val out = new ObjectOutputStream(baos) val sysImpl = system.asInstanceOf[ActorSystemImpl] - val addr = sysImpl.provider.rootPath.address - val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing")) + val ref = system.actorOf(Props[ReplyActor], "non-existing") + val serialized = SerializedActorRef(ref) out.writeObject(serialized) out.flush out.close + ref ! PoisonPill + + verifyActorTermination(ref) + JavaSerializer.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream) + in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream) } } @@ -403,7 +403,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Await.result(ffive, timeout.duration) must be("five") Await.result(fnull, timeout.duration) must be("null") - awaitCond(ref.isTerminated, 2000 millis) + verifyActorTermination(ref) } "restart when Kill:ed" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala index 2432cc113d..5a9b6a8e98 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Bench.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Bench.scala @@ -104,7 +104,7 @@ object Chameneos { } } - def run { + def run(): Unit = { // System.setProperty("akka.config", "akka.conf") Chameneos.start = System.currentTimeMillis val system = ActorSystem() @@ -114,5 +114,5 @@ object Chameneos { system.shutdown() } - def main(args: Array[String]): Unit = run + def main(args: Array[String]): Unit = run() } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index a89c6a931e..383165cf2a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -46,7 +46,7 @@ object FSMActorSpec { case incomplete if incomplete.length < code.length ⇒ stay using CodeState(incomplete, code) case codeTry if (codeTry == code) ⇒ { - doUnlock + doUnlock() goto(Open) using CodeState("", code) forMax timeout } case wrong ⇒ { @@ -60,7 +60,7 @@ object FSMActorSpec { when(Open) { case Event(StateTimeout, _) ⇒ { - doLock + doLock() goto(Locked) } } @@ -87,19 +87,15 @@ object FSMActorSpec { onTermination { case StopEvent(FSM.Shutdown, Locked, _) ⇒ // stop is called from lockstate with shutdown as reason... - terminatedLatch.open + terminatedLatch.open() } // initialize the lock - initialize + initialize() - private def doLock() { - lockedLatch.open - } + private def doLock(): Unit = lockedLatch.open() - private def doUnlock = { - unlockedLatch.open - } + private def doUnlock(): Unit = unlockedLatch.open() } case class CodeState(soFar: String, code: String) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 783efc8f1f..c15da0cd00 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -25,7 +25,7 @@ object FSMTransitionSpec { whenUnhandled { case Event("reply", _) ⇒ stay replying "reply" } - initialize + initialize() override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index c815518b80..c5d922ccf6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -142,7 +142,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit } "be canceled if cancel is performed before execution" in { - val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)()) + val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)(())) task.cancel() must be(true) task.isCancelled must be(true) task.cancel() must be(false) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index f2a937112a..b4e8840b0e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -698,8 +698,7 @@ object SupervisorHierarchySpec { stop } - initialize - + initialize() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index fcaf47d49a..0afed9cb14 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -68,8 +68,8 @@ object ActorModelSpec { def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor] - def ack { - if (!busy.switchOn()) { + def ack(): Unit = { + if (!busy.switchOn(())) { throw new Exception("isolation violated") } else { interceptor.getStats(self).msgsProcessed.incrementAndGet() @@ -81,21 +81,21 @@ object ActorModelSpec { } def receive = { - case AwaitLatch(latch) ⇒ { ack; latch.await(); busy.switchOff() } - case Meet(sign, wait) ⇒ { ack; sign.countDown(); wait.await(); busy.switchOff() } - case Wait(time) ⇒ { ack; Thread.sleep(time); busy.switchOff() } - case WaitAck(time, l) ⇒ { ack; Thread.sleep(time); l.countDown(); busy.switchOff() } - case Reply(msg) ⇒ { ack; sender ! msg; busy.switchOff() } - case TryReply(msg) ⇒ { ack; sender.tell(msg, null); busy.switchOff() } - case Forward(to, msg) ⇒ { ack; to.forward(msg); busy.switchOff() } - case CountDown(latch) ⇒ { ack; latch.countDown(); busy.switchOff() } - case Increment(count) ⇒ { ack; count.incrementAndGet(); busy.switchOff() } - case CountDownNStop(l) ⇒ { ack; l.countDown(); context.stop(self); busy.switchOff() } - case Restart ⇒ { ack; busy.switchOff(); throw new Exception("Restart requested") } - case Interrupt ⇒ { ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") } - case InterruptNicely(msg) ⇒ { ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() } - case ThrowException(e: Throwable) ⇒ { ack; busy.switchOff(); throw e } - case DoubleStop ⇒ { ack; context.stop(self); context.stop(self); busy.switchOff } + case AwaitLatch(latch) ⇒ { ack(); latch.await(); busy.switchOff(()) } + case Meet(sign, wait) ⇒ { ack(); sign.countDown(); wait.await(); busy.switchOff(()) } + case Wait(time) ⇒ { ack(); Thread.sleep(time); busy.switchOff(()) } + case WaitAck(time, l) ⇒ { ack(); Thread.sleep(time); l.countDown(); busy.switchOff(()) } + case Reply(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()) } + case TryReply(msg) ⇒ { ack(); sender.tell(msg, null); busy.switchOff(()) } + case Forward(to, msg) ⇒ { ack(); to.forward(msg); busy.switchOff(()) } + case CountDown(latch) ⇒ { ack(); latch.countDown(); busy.switchOff(()) } + case Increment(count) ⇒ { ack(); count.incrementAndGet(); busy.switchOff(()) } + case CountDownNStop(l) ⇒ { ack(); l.countDown(); context.stop(self); busy.switchOff(()) } + case Restart ⇒ { ack(); busy.switchOff(()); throw new Exception("Restart requested") } + case Interrupt ⇒ { ack(); sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") } + case InterruptNicely(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()); Thread.currentThread().interrupt() } + case ThrowException(e: Throwable) ⇒ { ack(); busy.switchOff(()); throw e } + case DoubleStop ⇒ { ack(); context.stop(self); context.stop(self); busy.switchOff } } } @@ -124,12 +124,12 @@ object ActorModelSpec { } } - abstract override def suspend(actor: ActorCell) { + protected[akka] abstract override def suspend(actor: ActorCell) { getStats(actor.self).suspensions.incrementAndGet() super.suspend(actor) } - abstract override def resume(actor: ActorCell) { + protected[akka] abstract override def resume(actor: ActorCell) { super.resume(actor) getStats(actor.self).resumes.incrementAndGet() } @@ -330,16 +330,12 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa } def spawn(f: ⇒ Unit) { - val thread = new Thread { - override def run { - try { - f - } catch { - case e ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread")) + (new Thread { + override def run(): Unit = + try f catch { + case e: Throwable ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread")) } - } - } - thread.start() + }).start() } "not process messages for a suspended actor" in { @@ -380,7 +376,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa try { assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) } catch { - case e ⇒ + case e: Throwable ⇒ dispatcher match { case dispatcher: BalancingDispatcher ⇒ val team = dispatcher.team diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index 830a19eb29..026e6c178c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -55,7 +55,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { } callingThreadLock.compareAndSet(1, 0) // Disable the lock } - Await.result(p.future, timeout.duration) must be === () + Await.result(p.future, timeout.duration) must be === (()) } "be able to avoid starvation when Batching is used and Await/blocking is called" in { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 9c732d7279..196468d263 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -573,11 +573,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } "should not deadlock with nested await (ticket 1313)" in { - val simple = Future() map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration)) + val simple = Future(()) map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration)) FutureSpec.ready(simple, timeout.duration) must be('completed) val l1, l2 = new TestLatch - val complex = Future() map { _ ⇒ + val complex = Future(()) map { _ ⇒ val nested = Future(()) nested foreach (_ ⇒ l1.open()) FutureSpec.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed @@ -589,7 +589,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "re-use the same thread for nested futures with batching ExecutionContext" in { val failCount = new java.util.concurrent.atomic.AtomicInteger - val f = Future() flatMap { _ ⇒ + val f = Future(()) flatMap { _ ⇒ val originalThread = Thread.currentThread // run some nested futures val nested = diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala index f153c30649..aa4a82db0f 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggerSpec.scala @@ -9,7 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.{ ActorRef, Actor, ActorSystem } import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import akka.event.Logging.{ LogEvent, LoggerInitialized, InitializeLogger } +import akka.serialization.SerializationExtension +import akka.event.Logging.{ Warning, LogEvent, LoggerInitialized, InitializeLogger } object LoggerSpec { @@ -37,6 +38,21 @@ object LoggerSpec { } """).withFallback(AkkaSpec.testConf) + val ticket3165Config = ConfigFactory.parseString(""" + akka { + stdout-loglevel = "WARNING" + loglevel = "DEBUG" + loggers = ["akka.event.LoggerSpec$TestLogger1"] + actor { + serialize-messages = on + serialization-bindings { + "akka.event.Logging$LogEvent" = bytes + "java.io.Serializable" = java + } + } + } + """).withFallback(AkkaSpec.testConf) + case class SetTarget(ref: ActorRef, qualifier: Int) class TestLogger1 extends TestLogger(1) @@ -127,4 +143,16 @@ class LoggerSpec extends WordSpec with MustMatchers { } } } + + "Ticket 3165 - serialize-messages and dual-entry serialization of LogEvent" must { + "not cause StackOverflowError" in { + implicit val s = ActorSystem("foo", ticket3165Config) + try { + SerializationExtension(s).serialize(Warning("foo", classOf[String])) + } finally { + s.shutdown() + s.awaitTermination(5.seconds.dilated) + } + } + } } diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index ea4e1a05b5..ad5b6a208b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -6,6 +6,7 @@ package akka.io import akka.testkit.{ TestProbe, AkkaSpec } import Tcp._ +import akka.TestUtils import TestUtils._ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index fceb271ab4..50add9d95b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -16,6 +16,7 @@ import scala.util.control.NonFatal import org.scalatest.matchers._ import akka.io.Tcp._ import akka.io.SelectionHandler._ +import akka.TestUtils import TestUtils._ import akka.actor.{ ActorRef, PoisonPill, Terminated } import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } @@ -637,7 +638,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") def interestsDesc(interests: Int): String = interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ") } - def withUnacceptedConnection( + private[io] def withUnacceptedConnection( setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), connectionActorCons: (ActorRef, ActorRef) ⇒ TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup ⇒ Any): Unit = diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 9f35951ad9..cac2ab5a88 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -7,6 +7,7 @@ package akka.io import akka.testkit.AkkaSpec import akka.util.ByteString import Tcp._ +import akka.TestUtils import TestUtils._ import akka.testkit.EventFilter import java.io.IOException diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index 4ed3bd9950..358b1d614a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -10,6 +10,7 @@ import akka.actor.ActorRef import scala.collection.immutable import akka.io.Inet.SocketOption import Tcp._ +import akka.TestUtils import TestUtils._ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 4c0652664f..30103b7344 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -13,6 +13,7 @@ import Tcp._ import akka.testkit.EventFilter import akka.io.SelectionHandler._ import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } +import akka.TestUtils class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index e387255150..4651700ed1 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -4,6 +4,7 @@ package akka.io import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } +import akka.TestUtils import TestUtils._ import akka.util.ByteString import java.net.InetSocketAddress diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala index 1e2b960219..9bcf1d4400 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala @@ -5,6 +5,7 @@ package akka.io import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } import akka.io.UdpFF._ +import akka.TestUtils import TestUtils._ import akka.util.ByteString import java.net.InetSocketAddress diff --git a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala index 028463c8bc..fda9850fe8 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/PatternSpec.scala @@ -44,7 +44,7 @@ class PatternSpec extends AkkaSpec { "complete Future with AskTimeoutException when actor not terminated within timeout" in { val target = system.actorOf(Props[TargetActor]) val latch = TestLatch() - target ! (latch, remaining) + target ! ((latch, remaining)) intercept[AskTimeoutException] { Await.result(gracefulStop(target, 500 millis), remaining) } latch.open() } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/OrderbookTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/OrderbookTest.scala index 764e03ddce..4237973b7d 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/domain/OrderbookTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/domain/OrderbookTest.scala @@ -11,7 +11,7 @@ class OrderbookTest extends JUnitSuite { var tradeObserverMock: TradeObserver = null @Before - def setUp = { + def setUp(): Unit = { tradeObserverMock = mock(classOf[TradeObserver]) orderbook = new Orderbook("ERI") with TradeObserver { def trade(bid: Bid, ask: Ask) = tradeObserverMock.trade(bid, ask) @@ -19,7 +19,7 @@ class OrderbookTest extends JUnitSuite { } @Test - def shouldTradeSamePrice = { + def shouldTradeSamePrice(): Unit = { val bid = new Bid("ERI", 100, 1000) val ask = new Ask("ERI", 100, 1000) orderbook.addOrder(bid) @@ -33,7 +33,7 @@ class OrderbookTest extends JUnitSuite { } @Test - def shouldTradeTwoLevels = { + def shouldTradeTwoLevels(): Unit = { val bid1 = new Bid("ERI", 101, 1000) val bid2 = new Bid("ERI", 100, 1000) val bid3 = new Bid("ERI", 99, 1000) @@ -62,7 +62,7 @@ class OrderbookTest extends JUnitSuite { } @Test - def shouldSplitBid = { + def shouldSplitBid(): Unit = { val bid = new Bid("ERI", 100, 300) val ask = new Ask("ERI", 100, 1000) orderbook.addOrder(bid) @@ -77,7 +77,7 @@ class OrderbookTest extends JUnitSuite { } @Test - def shouldSplitAsk = { + def shouldSplitAsk(): Unit = { val bid = new Bid("ERI", 100, 1000) val ask = new Ask("ERI", 100, 600) orderbook.addOrder(bid) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index f2b913d962..f1150688f2 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -379,19 +379,19 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with val busy = TestLatch(1) val received0 = TestLatch(1) - router ! (busy, received0) + router ! ((busy, received0)) Await.ready(received0, TestLatch.DefaultTimeout) val received1 = TestLatch(1) - router ! (1, received1) + router ! ((1, received1)) Await.ready(received1, TestLatch.DefaultTimeout) val received2 = TestLatch(1) - router ! (2, received2) + router ! ((2, received2)) Await.ready(received2, TestLatch.DefaultTimeout) val received3 = TestLatch(1) - router ! (3, received3) + router ! ((3, received3)) Await.ready(received3, TestLatch.DefaultTimeout) busy.countDown() diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 5b4ee232c0..ad351e4fff 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -329,8 +329,8 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR val ser = SerializationExtension(system) "Cross-version serialization compatibility" must { - def verify(obj: Any, asExpected: String): Unit = - String.valueOf(encodeHex(ser.serialize(obj, obj.getClass).get)) must be(asExpected) + def verify(obj: SystemMessage, asExpected: String): Unit = + String.valueOf(ser.serialize((obj, obj.getClass)).map(encodeHex).get) must be === asExpected "be preserved for the Create SystemMessage" in { verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e437265617465bcdf9f7f2675038d02000078707671007e0003") @@ -388,9 +388,9 @@ class OverriddenSystemMessageSerializationSpec extends AkkaSpec(SerializationTes } } -trait TestSerializable +protected[akka] trait TestSerializable -class TestSerializer extends Serializer { +protected[akka] class TestSerializer extends Serializer { def includeManifest: Boolean = false def identifier = 9999 @@ -403,12 +403,12 @@ class TestSerializer extends Serializer { } @SerialVersionUID(1) -case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable { +protected[akka] case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable { override def fillInStackTrace = null } @SerialVersionUID(1) -case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope { +protected[akka] case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope { override def path = RootActorPath(Address("proto", "SomeSystem"), name) override def forward(message: Any)(implicit context: ActorContext) = ??? override def isTerminated = ??? diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index 6f94770dcc..c801ee474b 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -89,7 +89,7 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers { val (bsAIt, bsBIt) = (a.iterator, b.iterator) val (vecAIt, vecBIt) = (Vector(a: _*).iterator.buffered, Vector(b: _*).iterator.buffered) (body(bsAIt, bsBIt) == body(vecAIt, vecBIt)) && - (!strict || (bsAIt.toSeq, bsBIt.toSeq) == (vecAIt.toSeq, vecBIt.toSeq)) + (!strict || (bsAIt.toSeq -> bsBIt.toSeq) == (vecAIt.toSeq -> vecBIt.toSeq)) } def likeVecBld(body: Builder[Byte, _] ⇒ Unit): Boolean = { diff --git a/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala index 10fca828a1..9e7db5da30 100644 --- a/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/SwitchSpec.scala @@ -17,17 +17,17 @@ class SwitchSpec extends WordSpec with MustMatchers { s.isOff must be(true) s.isOn must be(false) - s.switchOn("hello") must be(true) + s.switchOn(()) must be(true) s.isOn must be(true) s.isOff must be(false) - s.switchOn("hello") must be(false) + s.switchOn(()) must be(false) s.isOn must be(true) s.isOff must be(false) - s.switchOff("hello") must be(true) + s.switchOff(()) must be(true) s.isOff must be(true) s.isOn must be(false) - s.switchOff("hello") must be(false) + s.switchOff(()) must be(false) s.isOff must be(true) s.isOn must be(false) } @@ -44,34 +44,34 @@ class SwitchSpec extends WordSpec with MustMatchers { val s = new Switch(false) s.ifOffYield("yes") must be(Some("yes")) s.ifOnYield("no") must be(None) - s.ifOff("yes") must be(true) - s.ifOn("no") must be(false) + s.ifOff(()) must be(true) + s.ifOn(()) must be(false) - s.switchOn() + s.switchOn(()) s.ifOnYield("yes") must be(Some("yes")) s.ifOffYield("no") must be(None) - s.ifOn("yes") must be(true) - s.ifOff("no") must be(false) + s.ifOn(()) must be(true) + s.ifOff(()) must be(false) } "run action with locking" in { val s = new Switch(false) s.whileOffYield("yes") must be(Some("yes")) s.whileOnYield("no") must be(None) - s.whileOff("yes") must be(true) - s.whileOn("no") must be(false) + s.whileOff(()) must be(true) + s.whileOn(()) must be(false) - s.switchOn() + s.switchOn(()) s.whileOnYield("yes") must be(Some("yes")) s.whileOffYield("no") must be(None) - s.whileOn("yes") must be(true) - s.whileOff("no") must be(false) + s.whileOn(()) must be(true) + s.whileOff(()) must be(false) } "run first or second action depending on state" in { val s = new Switch(false) s.fold("on")("off") must be("off") - s.switchOn() + s.switchOn(()) s.fold("on")("off") must be("on") } @@ -80,14 +80,14 @@ class SwitchSpec extends WordSpec with MustMatchers { s.locked { Thread.sleep(500) - s.switchOn() + s.switchOn(()) s.isOn must be(true) } val latch = new CountDownLatch(1) new Thread { override def run(): Unit = { - s.switchOff() + s.switchOff(()) latch.countDown() } }.start() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index fe0f66f5f5..adf945d6b8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -6,14 +6,12 @@ package akka.actor import akka.dispatch._ import akka.dispatch.sysmsg._ -import akka.util._ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.{ Serialization, JavaSerializer } import akka.event.EventStream import scala.annotation.tailrec import java.util.concurrent.ConcurrentHashMap import akka.event.LoggingAdapter -import scala.collection.JavaConverters /** * Immutable and serializable handle to an actor, which may or may not reside @@ -381,7 +379,7 @@ private[akka] class LocalActorRef private[akka] ( override def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } /** @@ -392,6 +390,10 @@ private[akka] class LocalActorRef private[akka] ( private[akka] case class SerializedActorRef private (path: String) { import akka.serialization.JavaSerializer.currentSystem + def this(actorRef: ActorRef) = { + this(Serialization.serializedActorPath(actorRef)) + } + @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = currentSystem.value match { case null ⇒ @@ -407,11 +409,8 @@ private[akka] case class SerializedActorRef private (path: String) { * INTERNAL API */ private[akka] object SerializedActorRef { - def apply(path: ActorPath): SerializedActorRef = { - Serialization.currentTransportAddress.value match { - case null ⇒ new SerializedActorRef(path.toSerializationFormat) - case addr ⇒ new SerializedActorRef(path.toSerializationFormatWithAddress(addr)) - } + def apply(actorRef: ActorRef): SerializedActorRef = { + new SerializedActorRef(actorRef) } } @@ -437,7 +436,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def restart(cause: Throwable): Unit = () @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } /** @@ -470,7 +469,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, override val path: ActorPath, val eventStream: EventStream) extends MinimalActorRef { - override def isTerminated(): Boolean = true + override def isTerminated: Boolean = true override def sendSystemMessage(message: SystemMessage): Unit = { if (Mailbox.debug) println(s"ELAR $path having enqueued $message") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 11f36cfc22..e4aa58f235 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -329,7 +329,7 @@ private[akka] object SystemGuardian { * * Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported. */ -class LocalActorRefProvider private[akka] ( +private[akka] class LocalActorRefProvider private[akka] ( _systemName: String, override val settings: ActorSystem.Settings, val eventStream: EventStream, diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index ad70be0a0f..99f75dbaea 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -451,7 +451,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { * Verify existence of initial state and setup timers. This should be the * last call within the constructor. */ - final def initialize: Unit = makeTransition(currentState) + final def initialize(): Unit = makeTransition(currentState) /** * Return current state name (i.e. object of type S) diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 123a576462..9ce7169be3 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -157,7 +157,7 @@ private[akka] class RepointableActorRef( def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message) @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path) + protected def writeReplace(): AnyRef = SerializedActorRef(this) } private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 2aac6e8e89..bd721d4051 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -613,7 +613,7 @@ private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout](Con case some ⇒ if (!compareAndSet(some, newTimeout)) swap(newTimeout) } - def isCancelled(): Boolean = get().isCancelled() + override def isCancelled: Boolean = get().isCancelled() def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel() } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 083d65f907..56892f1b11 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -248,7 +248,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor { val me = withContext[T](createInstance) - override def supervisorStrategy(): SupervisorStrategy = me match { + override def supervisorStrategy: SupervisorStrategy = me match { case l: Supervisor ⇒ l.supervisorStrategy case _ ⇒ super.supervisorStrategy } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 282a66e796..5d7cc9a30a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -74,7 +74,7 @@ private[akka] object MessageDispatcher { // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) final val debug = false // Deliberately without type ascription to make it a compile-time constant lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _) - def printActors: Unit = + def printActors(): Unit = if (debug) { for { d ← actors.keys @@ -242,7 +242,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ - def suspend(actor: ActorCell): Unit = { + protected[akka] def suspend(actor: ActorCell): Unit = { val mbox = actor.mailbox if ((mbox.actor eq actor) && (mbox.dispatcher eq this)) mbox.suspend() @@ -251,7 +251,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ - def resume(actor: ActorCell): Unit = { + protected[akka] def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) registerForExecution(mbox, false, false) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 559574a795..25e1646c1c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -406,10 +406,15 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = { val currentList = systemQueueGet - if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents) + if (currentList.head == NoMessage) new EarliestFirstSystemMessageList(null) + else if (systemQueuePut(currentList, newContents)) currentList.reverse + else systemDrain(newContents) } - def hasSystemMessages: Boolean = systemQueueGet.nonEmpty + def hasSystemMessages: Boolean = systemQueueGet.head match { + case null | NoMessage ⇒ false + case _ ⇒ true + } } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 266958bf83..dc35609dee 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -570,7 +570,7 @@ object Logging { /** * Base type of LogEvents */ - sealed trait LogEvent { + sealed trait LogEvent extends NoSerializationVerificationNeeded { /** * The thread that created this log event */ diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index ec866ae75f..adfa7a2358 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -276,7 +276,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, override def postRestart(reason: Throwable): Unit = throw new IllegalStateException("Restarting not supported for connection actors.") - private[TcpConnection] case class PendingWrite( + private[io] case class PendingWrite( commander: ActorRef, ack: Any, remainingData: ByteString, diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c880496077..8e9b742f9d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -75,10 +75,11 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo */ def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil - case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil } - case msg if route.isDefinedAt(sender, msg) ⇒ route(sender, message) - case _ ⇒ Nil + case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil + case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil } + case _ ⇒ + val payload = (sender, message) + if (route isDefinedAt payload) route(payload) else Nil } /** diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index ef21355502..64f1031c3a 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -5,7 +5,7 @@ package akka.serialization import com.typesafe.config.Config -import akka.actor.{ Extension, ExtendedActorSystem, Address } +import akka.actor._ import akka.event.Logging import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer @@ -21,10 +21,11 @@ object Serialization { type ClassSerializer = (Class[_], Serializer) /** - * This holds a reference to the current transport address to be inserted - * into local actor refs during serialization. + * This holds a reference to the current transport serialization information used for + * serializing local actor refs. + * INTERNAL API */ - val currentTransportAddress = new DynamicVariable[Address](null) + private[akka] val currentTransportInformation = new DynamicVariable[Information](null) class Settings(val config: Config) { val Serializers: Map[String, String] = configToMap("akka.actor.serializers") @@ -35,6 +36,35 @@ object Serialization { config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) ⇒ (k -> v.toString) } } } + + /** + * Serialization information needed for serializing local actor refs. + * INTERNAL API + */ + private[akka] case class Information(address: Address, system: ActorSystem) + + /** + * The serialized path of an actorRef, based on the current transport serialization information. + * If there is no external address available for the requested address then the systems default + * address will be used. + */ + def serializedActorPath(actorRef: ActorRef): String = { + val path = actorRef.path + val originalSystem: ExtendedActorSystem = actorRef match { + case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem] + case _ ⇒ null + } + Serialization.currentTransportInformation.value match { + case null ⇒ path.toSerializationFormat + case Information(address, system) ⇒ + if (originalSystem == null || originalSystem == system) + path.toSerializationFormatWithAddress(address) + else { + val provider = originalSystem.provider + path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress)) + } + } + } } /** diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index c639b33837..e68f1c42e7 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -57,10 +57,9 @@ object Agent { * Internal helper method */ private final def withinTransaction(run: Runnable): Unit = { - def dispatch = updater.execute(run) Txn.findCurrent match { - case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn) - case _ ⇒ dispatch + case Some(txn) ⇒ Txn.afterCommit(_ ⇒ updater.execute(run))(txn) + case _ ⇒ updater.execute(run) } } diff --git a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala index 926720600c..a10d375327 100644 --- a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -4,6 +4,7 @@ import language.postfixOps import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +import scala.util.control.NonFatal import akka.util.Timeout import akka.testkit._ import scala.concurrent.stm._ @@ -112,7 +113,7 @@ class AgentSpec extends AkkaSpec { agent send (_ * 2) throw new RuntimeException("Expected failure") } - } catch { case _ ⇒ } + } catch { case NonFatal(_) ⇒ } agent send countDown diff --git a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala index 9c88a5e81e..72925c78ea 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/CamelSupervisor.scala @@ -150,7 +150,7 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto try { val endpoint = camelContext.getEndpoint(endpointUri) val processor = new SendProcessor(endpoint) - camelObjects += producer -> (endpoint, processor) + camelObjects = camelObjects.updated(producer, endpoint -> processor) // if this throws, the supervisor stops the producer and de-registers it on termination processor.start() producer ! CamelProducerObjects(endpoint, processor) @@ -159,10 +159,10 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto case NonFatal(e) ⇒ throw new ActorActivationException(producer, e) } } else { - camelObjects.get(producer).foreach { case (endpoint, processor) ⇒ producer ! CamelProducerObjects(endpoint, processor) } + camelObjects.get(producer) foreach { case (endpoint, processor) ⇒ producer ! CamelProducerObjects(endpoint, processor) } } case DeRegister(producer) ⇒ - camelObjects.get(producer).foreach { + camelObjects.get(producer) foreach { case (_, processor) ⇒ try { camelObjects.get(producer).foreach(_._2.stop()) diff --git a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala index 9e51164819..7d014f8a8c 100644 --- a/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConcurrentActivationTest.scala @@ -70,8 +70,8 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared } val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations) val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations) - assertContainsSameElements(activatedConsumerNames, deactivatedConsumerNames) - assertContainsSameElements(activatedProducerNames, deactivatedProducerNames) + assertContainsSameElements(activatedConsumerNames -> deactivatedConsumerNames) + assertContainsSameElements(activatedProducerNames -> deactivatedProducerNames) } finally { system.eventStream.publish(TestEvent.UnMute(eventFilter)) } @@ -97,7 +97,7 @@ class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[L allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture context.actorOf(Props(new Registrar(i, number, activationListPromise, deactivationListPromise)), "registrar-" + i) } - promise.success((Future.sequence(allActivationFutures)), Future.sequence(allDeactivationFutures)) + promise.success(Future.sequence(allActivationFutures) -> Future.sequence(allDeactivationFutures)) broadcaster = Some(context.actorOf(Props[Registrar] withRouter (BroadcastRouter(routees)), "registrarRouter")) case reg: Any ⇒ diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 6fd865bfd1..43349dc9ed 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -282,7 +282,7 @@ object ProducerFeatureTest { } override def postStop() { - for (msg ← lastMessage; aref ← lastSender) context.parent ! (aref, msg) + for (msg ← lastMessage; aref ← lastSender) context.parent ! ((aref, msg)) super.postStop() } } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index f353669bd3..4d729c8790 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -147,7 +147,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "asynchronous" when { - def verifyFailureIsSet { + def verifyFailureIsSet(): Unit = { producer.processExchangeAdapter(exchange, asyncCallback) asyncCallback.awaitCalled() verify(exchange).setFailure(any[FailureResult]) @@ -158,7 +158,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "consumer actor doesnt exist" must { "set failure message on exchange" in { producer = given(actor = null, outCapable = true) - verifyFailureIsSet + verifyFailureIsSet() } } @@ -226,7 +226,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with "consumer actor doesnt exist" must { "set failure message on exchange" in { producer = given(actor = null, outCapable = false) - verifyFailureIsSet + verifyFailureIsSet() } } @@ -325,7 +325,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with } } -trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with MustMatchers with Suite ⇒ +private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with MustMatchers with Suite ⇒ var camel: Camel = _ var exchange: CamelExchangeAdapter = _ var callback: AsyncCallback = _ @@ -427,9 +427,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo } def echoActor = system.actorOf(Props(new Actor { - def receive = { - case msg ⇒ sender ! "received " + msg - } + def receive = { case msg ⇒ sender ! "received " + msg } }), name = "echoActor") } diff --git a/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala b/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala index 5a18741902..8966496ea3 100644 --- a/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala +++ b/akka-channels/src/main/scala/akka/channels/macros/Helpers.scala @@ -26,7 +26,7 @@ object Helpers { def imp[T: c.WeakTypeTag](c: Context): c.Expr[T] = { import c.universe._ - c.Expr[T](TypeApply(Ident("implicitly"), List(TypeTree().setType(weakTypeOf[T])))) + c.Expr[T](TypeApply(Ident(newTermName("implicitly")), List(TypeTree().setType(weakTypeOf[T])))) } def bool(c: Context, b: Boolean): c.Expr[Boolean] = c.Expr[Boolean](c.universe.Literal(c.universe.Constant(b))) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index bde47e3c34..a1f3b6941d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -73,7 +73,7 @@ private[akka] class ClusterActorRefProvider( */ override def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { super.useActorOnNode(path, props, deploy, supervisor) - remoteDeploymentWatcher ! (actorFor(path), supervisor) + remoteDeploymentWatcher ! ((actorFor(path), supervisor)) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 6c4166f95f..aa79103317 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -229,9 +229,9 @@ private[cluster] case class ClusterHeartbeatSenderState private ( heartbeatRequest: Map[Address, Deadline] = Map.empty) { // FIXME can be disabled as optimization - assertInvariants + assertInvariants() - private def assertInvariants: Unit = { + private def assertInvariants(): Unit = { val currentAndEnding = current.intersect(ending.keySet) require(currentAndEnding.isEmpty, s"Same nodes in current and ending not allowed, got [${currentAndEnding}]") diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index a3c3c0d908..b29a61a623 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -63,9 +63,9 @@ private[cluster] case class Gossip( with Versioned[Gossip] { // FIXME can be disabled as optimization - assertInvariants + assertInvariants() - private def assertInvariants: Unit = { + private def assertInvariants(): Unit = { val unreachableAndLive = members.intersect(overview.unreachable) if (unreachableAndLive.nonEmpty) throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala index 8af61e7712..6e8a2a301f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancingRouter.scala @@ -317,7 +317,7 @@ abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMe combined.foldLeft(Map.empty[Address, (Double, Int)].withDefaultValue((0.0, 0))) { case (acc, (address, capacity)) ⇒ val (sum, count) = acc(address) - acc + (address -> (sum + capacity, count + 1)) + acc + (address -> ((sum + capacity, count + 1))) }.map { case (addr, (sum, count)) ⇒ (addr -> sum / count) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala index 1b6fabc63e..777165a80b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterMetricsSpec.scala @@ -31,7 +31,7 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec { import ClusterMetricsMultiJvmSpec._ - def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] + private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] "Cluster metrics" must { "periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " + diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 332e2792d5..7fca11ba1d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -51,7 +51,7 @@ import akka.testkit.TestEvent._ * 8. while nodes are removed remote death watch is also exercised * 9. while nodes are removed a few cluster aware routers are also working */ -object StressMultiJvmSpec extends MultiNodeConfig { +private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig @@ -521,14 +521,14 @@ object StressMultiJvmSpec extends MultiNodeConfig { log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", totalActors, levels, width) val tree = context.actorOf(Props(new TreeNode(levels, width)), "tree") - tree forward (idx, SimpleJob(id, payload)) + tree forward ((idx, SimpleJob(id, payload))) context.become(treeWorker(tree)) } def treeWorker(tree: ActorRef): Receive = { case SimpleJob(id, payload) ⇒ sender ! Ack(id) case TreeJob(id, payload, idx, _, _) ⇒ - tree forward (idx, SimpleJob(id, payload)) + tree forward ((idx, SimpleJob(id, payload))) } } @@ -539,7 +539,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { 0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector def receive = { - case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward (idx, job) + case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job)) } } @@ -701,7 +701,7 @@ abstract class StressSpec lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver") - def awaitClusterResult: Unit = { + def awaitClusterResult(): Unit = { runOn(roles.head) { val r = clusterResultAggregator watch(r) @@ -734,7 +734,7 @@ abstract class StressSpec } } - awaitClusterResult + awaitClusterResult() enterBarrier("join-one-" + step) } @@ -754,7 +754,7 @@ abstract class StressSpec } } - awaitClusterResult + awaitClusterResult() enterBarrier("join-several-" + step) } @@ -804,7 +804,7 @@ abstract class StressSpec } enterBarrier("watch-verified-" + step) - awaitClusterResult + awaitClusterResult() enterBarrier("remove-one-" + step) } @@ -828,7 +828,7 @@ abstract class StressSpec awaitMembersUp(currentRoles.size, timeout = remaining) } } - awaitClusterResult + awaitClusterResult() enterBarrier("remove-several-" + step) } @@ -885,7 +885,7 @@ abstract class StressSpec (nextAS, nextAddresses) } } - awaitClusterResult + awaitClusterResult() step += 1 loop(counter + 1, nextAS, nextAddresses) @@ -936,7 +936,7 @@ abstract class StressSpec } } - awaitClusterResult + awaitClusterResult() } def awaitWorkResult: WorkResult = { @@ -983,7 +983,7 @@ abstract class StressSpec } - awaitClusterResult + awaitClusterResult() step += 1 } } @@ -1004,7 +1004,7 @@ abstract class StressSpec } } - awaitClusterResult + awaitClusterResult() nbrUsedRoles += size enterBarrier("after-" + step) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 7063212643..680bb87041 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -66,7 +66,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod lazy val victim = sortedRoles(1) var endBarrierNumber = 0 - def endBarrier: Unit = { + def endBarrier(): Unit = { endBarrierNumber += 1 enterBarrier("after_" + endBarrierNumber) } @@ -75,7 +75,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod "reach initial convergence" taggedAs LongRunningTest in { awaitClusterUp(roles: _*) - endBarrier + endBarrier() } "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { @@ -125,7 +125,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod } } - endBarrier + endBarrier() } "mark the node as DOWN" taggedAs LongRunningTest in { @@ -139,7 +139,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds) } - endBarrier + endBarrier() } "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { @@ -158,7 +158,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod awaitMembersUp(roles.size) - endBarrier + endBarrier() } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index fc16b503df..d01540ae75 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -35,8 +35,8 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val eUp = Member(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) val eDown = Member(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) - def converge(gossip: Gossip): (Gossip, Set[Address]) = - ((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) ⇒ (gs._1.seen(m.address), gs._2 + m.address) } + private[cluster] def converge(gossip: Gossip): (Gossip, Set[Address]) = + ((gossip, Set.empty[Address]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.address), as + m.address) } "Domain events" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala index b92ec1eb71..0b1a330a80 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -128,5 +128,5 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒ new JmxMetricsCollector(selfAddress, defaultDecayFactor) }.get - def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] + private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector] } diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala index 3e1d2c6908..01003aff3d 100644 --- a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala +++ b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala @@ -276,7 +276,7 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[ case Active -> Idle ⇒ stopTimer() } - initialize + initialize() private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true) private def stopTimer() = cancelTimer("morePermits") diff --git a/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala index 33faf44320..5a0d82210e 100644 --- a/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala @@ -61,7 +61,7 @@ class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) { } "log info without stackTrace" in { - producer ! ("{} is the magic number", 3) + producer ! (("{} is the magic number", 3)) val record = expectMsgType[logging.LogRecord] diff --git a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala index a9999b7083..fed2e14115 100644 --- a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala +++ b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala @@ -60,7 +60,7 @@ class DataflowSpec extends AkkaSpec with DefaultTimeout { val x = Future("Hello") val y = x map (_.length) - val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) + val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply) intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) } @@ -74,7 +74,7 @@ class DataflowSpec extends AkkaSpec with DefaultTimeout { val x = Future(3) val y = (actor ? "Hello").mapTo[Int] - val r = flow(x() + y(), 100) + val r = flow(x() + y()) intercept[ClassCastException](Await.result(r, timeout.duration)) } diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index 525492ab44..994a3e3619 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -25,9 +25,9 @@ class DangerousActor extends Actor with ActorLogging { new CircuitBreaker(context.system.scheduler, maxFailures = 5, callTimeout = 10.seconds, - resetTimeout = 1.minute).onOpen(notifyMeOnOpen) + resetTimeout = 1.minute).onOpen(notifyMeOnOpen()) - def notifyMeOnOpen = + def notifyMeOnOpen(): Unit = log.warning("My CircuitBreaker is now open, and will not close for one minute") //#circuit-breaker-initialization diff --git a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java index a0632ed03b..15ae81bda8 100644 --- a/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/rst/java/code/docs/future/FutureDocTestBase.java @@ -564,7 +564,7 @@ public class FutureDocTestBase { return "foo"; } }, ec); - Future result = Futures.firstCompletedOf(Arrays.asList(future, delayed), ec); + Future result = Futures.firstCompletedOf(Arrays.>asList(future, delayed), ec); //#after Await.result(result, Duration.create(2, SECONDS)); } diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java index 3eac0502d2..705f6ed5e4 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java @@ -7,7 +7,6 @@ import org.junit.Test; import static org.junit.Assert.*; //#imports import akka.actor.*; -import akka.remote.RemoteActorRefProvider; import akka.serialization.*; //#imports @@ -58,20 +57,10 @@ public class SerializationDocTestBase { //#actorref-serializer // Serialize // (beneath toBinary) - final Address transportAddress = - Serialization.currentTransportAddress().value(); - String identifier; + String identifier = Serialization.serializedActorPath(theActorRef); - // If there is no transportAddress, - // it means that either this Serializer isn't called - // within a piece of code that sets it, - // so either you need to supply your own, - // or simply use the local path. - if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat(); - else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress); // Then just serialize the identifier however you like - // Deserialize // (beneath fromBinary) final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier); @@ -118,16 +107,20 @@ public class SerializationDocTestBase { } //#external-address - - public void demonstrateExternalAddress() { - // this is not meant to be run, only to be compiled + static + //#external-address + public class ExternalAddressExample { + //#external-address final ActorSystem system = ActorSystem.create(); - final Address remoteAddr = new Address("", ""); - // #external-address - final Address addr = ExternalAddress.ID.get(system).getAddressFor(remoteAddr); - // #external-address + //#external-address + public String serializeTo(ActorRef ref, Address remote) { + return ref.path().toSerializationFormatWithAddress( + ExternalAddress.ID.get(system).getAddressFor(remote)); + } } + //#external-address + static //#external-address-default public class DefaultAddressExt implements Extension { diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index 7d72b6ef43..f58646ce4a 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -109,8 +109,11 @@ list which classes that should be serialized using it. Serializing ActorRefs --------------------- -All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer, -you might want to know how to serialize and deserialize them properly, here's the magic incantation: +All ActorRefs are serializable using JavaSerializer, but in case you are writing your +own serializer, you might want to know how to serialize and deserialize them properly. +In the general case, the local address to be used depends on the type of remote +address which shall be the recipient of the serialized information. Use +:meth:`Serialization.serializedActorPath(actorRef)` like this: .. includecode:: code/docs/serialization/SerializationDocTestBase.java :include: imports @@ -118,6 +121,22 @@ you might want to know how to serialize and deserialize them properly, here's th .. includecode:: code/docs/serialization/SerializationDocTestBase.java :include: actorref-serializer +This assumes that serialization happens in the context of sending a message +through the remote transport. There are other uses of serialization, though, +e.g. storing actor references outside of an actor application (database, +durable mailbox, etc.). In this case, it is important to keep in mind that the +address part of an actor’s path determines how that actor is communicated with. +Storing a local actor path might be the right choice if the retrieval happens +in the same logical context, but it is not enough when deserializing it on a +different network host: for that it would need to include the system’s remote +transport address. An actor system is not limited to having just one remote +transport per se, which makes this question a bit more interesting. To find out +the appropriate address to use when sending to ``remoteAddr`` you can use +:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this: + +.. includecode:: code/docs/serialization/SerializationDocTestBase.java + :include: external-address + .. note:: ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the @@ -132,25 +151,6 @@ you might want to know how to serialize and deserialize them properly, here's th include the unique id. -This assumes that serialization happens in the context of sending a message -through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the -address part of an actor’s path determines how that actor is communicated with. -Storing a local actor path might be the right choice if the retrieval happens -in the same logical context, but it is not enough when deserializing it on a -different network host: for that it would need to include the system’s remote -transport address. An actor system is not limited to having just one remote -transport per se, which makes this question a bit more interesting. - -In the general case, the local address to be used depends on the type of remote -address which shall be the recipient of the serialized information. Use -:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system -for the appropriate address to use when sending to ``remoteAddr``: - -.. includecode:: code/docs/serialization/SerializationDocTestBase.java - :include: external-address - This requires that you know at least which type of address will be supported by the system which will deserialize the resulting actor reference; if you have no concrete address handy you can create a dummy one for the right protocol using diff --git a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala index 4ce3ac07a6..59c62e93b9 100644 --- a/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/FSMDocSpec.scala @@ -81,7 +81,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { //#unhandled-elided //#fsm-body - initialize + initialize() } //#simple-fsm object DemoCode { diff --git a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala index fd54bbcf53..c3d07f3dc8 100644 --- a/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala +++ b/akka-docs/rst/scala/code/docs/actor/FaultHandlingDocSample.scala @@ -204,7 +204,7 @@ class CounterService extends Actor { if (backlog.size >= MaxBacklog) throw new ServiceUnavailable( "CounterService not available, lack of initial value") - backlog = backlog :+ (sender, msg) + backlog :+= (sender -> msg) } } diff --git a/akka-docs/rst/scala/code/docs/camel/Introduction.scala b/akka-docs/rst/scala/code/docs/camel/Introduction.scala index 2adc7a3863..14ef99f30f 100644 --- a/akka-docs/rst/scala/code/docs/camel/Introduction.scala +++ b/akka-docs/rst/scala/code/docs/camel/Introduction.scala @@ -7,7 +7,7 @@ import language.postfixOps import akka.util.Timeout object Introduction { - def foo = { + def foo(): Unit = { //#Consumer-mina import akka.camel.{ CamelMessage, Consumer } @@ -27,7 +27,7 @@ object Introduction { val mina = system.actorOf(Props[MyEndpoint]) //#Consumer-mina } - def bar = { + def bar(): Unit = { //#Consumer import akka.camel.{ CamelMessage, Consumer } @@ -41,7 +41,7 @@ object Introduction { } //#Consumer } - def baz = { + def baz(): Unit = { //#Producer import akka.actor.Actor import akka.camel.{ Producer, Oneway } diff --git a/akka-docs/rst/scala/code/docs/dataflow/DataflowDocSpec.scala b/akka-docs/rst/scala/code/docs/dataflow/DataflowDocSpec.scala index 3f3cdf492c..ebec2c54a4 100644 --- a/akka-docs/rst/scala/code/docs/dataflow/DataflowDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dataflow/DataflowDocSpec.scala @@ -50,7 +50,7 @@ class DataflowDocSpec extends WordSpec with MustMatchers { val v1, v2 = Promise[Int]() flow { // v1 will become the value of v2 + 10 when v2 gets a value - v1 << v2() + 10 + v1 << 10 + v2() v1() + v2() } onComplete println flow { v2 << 5 } // As you can see, no blocking above! diff --git a/akka-docs/rst/scala/code/docs/future/FutureDocSpec.scala b/akka-docs/rst/scala/code/docs/future/FutureDocSpec.scala index 3a04d9b109..91d416de75 100644 --- a/akka-docs/rst/scala/code/docs/future/FutureDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/future/FutureDocSpec.scala @@ -340,12 +340,12 @@ class FutureDocSpec extends AkkaSpec { def loadPage(s: String) = s val url = "foo bar" def log(cause: Throwable) = () - def watchSomeTV = () + def watchSomeTV(): Unit = () //#and-then val result = Future { loadPage(url) } andThen { case Failure(exception) ⇒ log(exception) } andThen { - case _ ⇒ watchSomeTV + case _ ⇒ watchSomeTV() } result foreach println //#and-then diff --git a/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala index 109bed0e36..342186f0c9 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterViaProgramDocSpec.scala @@ -16,7 +16,7 @@ object RouterViaProgramDocSpec { class ExampleActor1 extends Actor { def receive = { - case m @ Message1(nbr) ⇒ sender ! (self, m) + case m @ Message1(nbr) ⇒ sender ! ((self, m)) } } diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index 0dacdeff3b..da3809b211 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -4,7 +4,6 @@ package docs.serialization { - import org.scalatest.matchers.MustMatchers import akka.testkit._ //#imports import akka.actor.{ ActorRef, ActorSystem } @@ -16,7 +15,6 @@ package docs.serialization { import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.Address - import akka.remote.RemoteActorRefProvider //#my-own-serializer class MyOwnSerializer extends Serializer { @@ -164,16 +162,8 @@ package docs.serialization { //#actorref-serializer // Serialize // (beneath toBinary) + val identifier: String = Serialization.serializedActorPath(theActorRef) - // If there is no transportAddress, - // it means that either this Serializer isn't called - // within a piece of code that sets it, - // so either you need to supply your own, - // or simply use the local path. - val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ theActorRef.path.toSerializationFormat - case address ⇒ theActorRef.path.toSerializationFormatWithAddress(address) - } // Then just serialize the identifier however you like // Deserialize diff --git a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala index 9931b49903..f812b07bd9 100644 --- a/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/testkit/TestkitDocSpec.scala @@ -186,7 +186,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val probe1 = TestProbe() val probe2 = TestProbe() val actor = system.actorOf(Props[MyDoubleEcho]) - actor ! (probe1.ref, probe2.ref) + actor ! ((probe1.ref, probe2.ref)) actor ! "hello" probe1.expectMsg(500 millis, "hello") probe2.expectMsg(500 millis, "hello") diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index 1e59226b7d..285dd52e49 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -101,12 +101,31 @@ list which classes that should be serialized using it. Serializing ActorRefs --------------------- -All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer, -you might want to know how to serialize and deserialize them properly, here's the magic incantation: +All ActorRefs are serializable using JavaSerializer, but in case you are writing your +own serializer, you might want to know how to serialize and deserialize them properly. +In the general case, the local address to be used depends on the type of remote +address which shall be the recipient of the serialized information. Use +:meth:`Serialization.serializedActorPath(actorRef)` like this: .. includecode:: code/docs/serialization/SerializationDocSpec.scala :include: imports,actorref-serializer +This assumes that serialization happens in the context of sending a message +through the remote transport. There are other uses of serialization, though, +e.g. storing actor references outside of an actor application (database, +durable mailbox, etc.). In this case, it is important to keep in mind that the +address part of an actor’s path determines how that actor is communicated with. +Storing a local actor path might be the right choice if the retrieval happens +in the same logical context, but it is not enough when deserializing it on a +different network host: for that it would need to include the system’s remote +transport address. An actor system is not limited to having just one remote +transport per se, which makes this question a bit more interesting. To find out +the appropriate address to use when sending to ``remoteAddr`` you can use +:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this: + +.. includecode:: code/docs/serialization/SerializationDocSpec.scala + :include: external-address + .. note:: ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the @@ -120,24 +139,6 @@ you might want to know how to serialize and deserialize them properly, here's th storage of the reference, you can use ``toStringWithAddress``, which doesn't include the unique id. -This assumes that serialization happens in the context of sending a message -through the remote transport. There are other uses of serialization, though, -e.g. storing actor references outside of an actor application (database, -durable mailbox, etc.). In this case, it is important to keep in mind that the -address part of an actor’s path determines how that actor is communicated with. -Storing a local actor path might be the right choice if the retrieval happens -in the same logical context, but it is not enough when deserializing it on a -different network host: for that it would need to include the system’s remote -transport address. An actor system is not limited to having just one remote -transport per se, which makes this question a bit more interesting. - -In the general case, the local address to be used depends on the type of remote -address which shall be the recipient of the serialized information. Use -:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system -for the appropriate address to use when sending to ``remoteAddr``: - -.. includecode:: code/docs/serialization/SerializationDocSpec.scala - :include: external-address This requires that you know at least which type of address will be supported by the system which will deserialize the resulting actor reference; if you have no diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala index 1ba5f23f1c..50e64b1fe2 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/filebased/FileBasedMailboxSpec.scala @@ -3,6 +3,7 @@ package akka.actor.mailbox.filebased import language.postfixOps import akka.actor.mailbox._ +import scala.concurrent.duration._ import org.apache.commons.io.FileUtils import akka.dispatch.Mailbox @@ -27,18 +28,13 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp "read the file-based section" in { settings.QueuePath must be("file-based") settings.CircuitBreakerMaxFailures must be(5) - - import scala.concurrent.duration._ - settings.CircuitBreakerCallTimeout must be(5 seconds) } } - def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] + private[akka] def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] - def clean() { - FileUtils.deleteDirectory(new java.io.File(settings.QueuePath)) - } + def clean(): Unit = FileUtils.deleteDirectory(new java.io.File(settings.QueuePath)) override def atStartup() { clean() diff --git a/scripts/generate_config_with_secure_cookie.sh b/akka-kernel/src/main/dist/scripts/generate_config_with_secure_cookie.sh similarity index 77% rename from scripts/generate_config_with_secure_cookie.sh rename to akka-kernel/src/main/dist/scripts/generate_config_with_secure_cookie.sh index a8ad406dc0..19b0ccde32 100755 --- a/scripts/generate_config_with_secure_cookie.sh +++ b/akka-kernel/src/main/dist/scripts/generate_config_with_secure_cookie.sh @@ -42,21 +42,11 @@ object Crypt { } print(""" -# This config imports the Akka reference configuration. -include "akka-reference.conf" - -# In this file you can override any option defined in the 'akka-reference.conf' file. -# Copy in all or parts of the 'akka-reference.conf' file and modify as you please. - akka { remote { - netty { - secure-cookie = """") -print(Crypt.generateSecureCookie) -print("""" - require-cookie = on - } + secure-cookie = "%s" + require-cookie = on } } -""") +""".format(Crypt.generateSecureCookie)) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 512881a4fd..392b7f5925 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -336,7 +336,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex stay } - initialize + initialize() } /** @@ -574,7 +574,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor throw BarrierTimeout(d) } - initialize + initialize() def handleBarrier(data: Data): State = { log.debug("handleBarrier({})", data) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 384d8cad38..0316e2608c 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -191,7 +191,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) case EnterBarrier(barrier, timeout) ⇒ barrier case GetAddress(node) ⇒ node.name } - stay using d.copy(runningOp = Some(token, sender)) + stay using d.copy(runningOp = Some(token -> sender)) case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒ log.error("cannot write {} while waiting for {}", op, token) stay @@ -257,8 +257,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) channel.close() } - initialize - + initialize() } /** diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 9b870e0f46..ccb1e148de 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -15,9 +15,9 @@ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.serialization.Serialization import akka.util.ByteString +import scala.util.control.NonFatal import akka.remote.transport.Transport.InvalidAssociationException import java.io.NotSerializableException -import scala.util.control.{ NoStackTrace, NonFatal } /** * INTERNAL API @@ -332,7 +332,7 @@ private[remote] class EndpointWriter( private def serializeMessage(msg: Any): MessageProtocol = handle match { case Some(h) ⇒ - Serialization.currentTransportAddress.withValue(h.localAddress) { + Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) { (MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])) } case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" + diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4ac2b95229..f9521bebda 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -271,8 +271,17 @@ private[akka] class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) actorFor(rootGuardian, elems) - else new RemoteActorRef(transport, transport.localAddressForRemote(address), - new RootActorPath(address) / elems, Nobody, props = None, deploy = None) + else { + val rootPath = RootActorPath(address) / elems + try { + new RemoteActorRef(transport, transport.localAddressForRemote(address), + rootPath, Nobody, props = None, deploy = None) + } catch { + case NonFatal(e) ⇒ + log.error(e, "Error while looking up address {}", rootPath.address) + new EmptyLocalActorRef(this, rootPath, eventStream) + } + } case _ ⇒ local.actorFor(ref, path) } @@ -378,5 +387,5 @@ private[akka] class RemoteActorRef private[akka] ( def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path) + private def writeReplace(): AnyRef = SerializedActorRef(this) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index c697068b07..6725383c49 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -22,7 +22,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * * The remote transport is responsible for sending and receiving messages. * Each transport has an address, which it should provide in - * Serialization.currentTransportAddress (thread-local) while serializing + * Serialization.currentTransportInformation (thread-local) while serializing * actor references (which might also be part of messages). This address must * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1fe6fc21a7..9be6deb426 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -74,7 +74,7 @@ private[remote] object Remoting { null) } case None ⇒ throw new RemoteTransportException( - s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null) + s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null) } } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala index 056439c23e..acbbc26cf2 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala @@ -6,10 +6,8 @@ package akka.remote.serialization import akka.serialization.{ Serializer, Serialization } import com.google.protobuf.Message -import akka.actor.DynamicAccess +import akka.actor.{ ActorSystem, ActorRef } import akka.remote.RemoteProtocol.ActorRefProtocol -import akka.actor.ActorSystem -import akka.actor.ActorRef object ProtobufSerializer { @@ -18,11 +16,7 @@ object ProtobufSerializer { * protobuf representation. */ def serializeActorRef(ref: ActorRef): ActorRefProtocol = { - val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ ref.path.toSerializationFormat - case address ⇒ ref.path.toSerializationFormatWithAddress(address) - } - ActorRefProtocol.newBuilder.setPath(identifier).build + ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build } /** diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index fb82986873..ba7a347661 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -109,9 +109,9 @@ class TestTransport( (_) ⇒ defaultShutdown, (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress))) - override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior() + override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(()) override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress) - override def shutdown(): Unit = shutdownBehavior() + override def shutdown(): Unit = shutdownBehavior(()) private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { registry.getRemoteReadHandlerFor(params._1) match { diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 19b6b0b1d9..07c3b6c144 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -210,30 +210,29 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A case AssociateUnderlying(remoteAddress, statusPromise) ⇒ wrappedTransport.associate(remoteAddress) onComplete { // Slight modification of pipe, only success is sent, failure is propagated to a separate future - case Success(handle) ⇒ self ! (handle, statusPromise) + case Success(handle) ⇒ self ! ((handle, statusPromise)) case Failure(e) ⇒ statusPromise.failure(e) } // Finished outbound association and got back the handle - case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒ + case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒ //FIXME switch to a real message iso Tuple2 val wrappedHandle = wrapHandle(handle, associationListener, inbound = false) val naked = nakedAddress(handle.remoteAddress) val inMode = getInboundMode(naked) wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked)) - wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor + wrappedHandle.readHandlerPromise.future map { _ -> inMode } pipeTo wrappedHandle.throttlerActor handleTable ::= naked -> wrappedHandle statusPromise.success(wrappedHandle) case SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) - throttlingModes += naked -> (mode, direction) + throttlingModes = throttlingModes.updated(naked, (mode, direction)) val ok = Future.successful(SetThrottleAck) - val allAcks = handleTable.map { + Future.sequence(handleTable map { case (`naked`, handle) ⇒ setMode(handle, mode, direction) case _ ⇒ ok - } - Future.sequence(allAcks).map(_ ⇒ SetThrottleAck) pipeTo sender + }).map(_ ⇒ SetThrottleAck) pipeTo sender case ForceDisassociate(address) ⇒ val naked = nakedAddress(address) - handleTable.foreach { + handleTable foreach { case (`naked`, handle) ⇒ handle.disassociate() case _ ⇒ } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala index 65e4fd702a..d117c65924 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala @@ -14,6 +14,7 @@ import java.io.{ IOException, FileNotFoundException, FileInputStream } import java.security._ import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext } import org.jboss.netty.handler.ssl.SslHandler +import scala.util.Try /** * INTERNAL API @@ -92,7 +93,7 @@ private[akka] object NettySSLSupport { trustManagerFactory.init({ val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) val fin = new FileInputStream(trustStorePath) - try trustStore.load(fin, trustStorePassword.toCharArray) finally fin.close() + try trustStore.load(fin, trustStorePassword.toCharArray) finally Try(fin.close()) trustStore }) trustManagerFactory.getTrustManagers @@ -140,10 +141,23 @@ private[akka] object NettySSLSupport { factory.init({ val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) val fin = new FileInputStream(keyStorePath) - try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close() + try keyStore.load(fin, keyStorePassword.toCharArray) finally Try(fin.close()) keyStore }, keyStorePassword.toCharArray) - Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, rng); ctx } + + val trustManagers: Option[Array[TrustManager]] = settings.SSLTrustStore map { + path ⇒ + val pwd = settings.SSLTrustStorePassword.map(_.toCharArray).orNull + val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init({ + val trustStore = KeyStore.getInstance(KeyStore.getDefaultType) + val fin = new FileInputStream(path) + try trustStore.load(fin, pwd) finally Try(fin.close()) + trustStore + }) + trustManagerFactory.getTrustManagers + } + Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, trustManagers.orNull, rng); ctx } } catch { case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e) case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 37be9b5553..f49a2244f5 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -37,14 +37,24 @@ object RemotingSpec { class Echo2 extends Actor { def receive = { - case "ping" ⇒ sender ! (("pong", sender)) + case "ping" ⇒ sender ! (("pong", sender)) + case a: ActorRef ⇒ a ! (("ping", sender)) + case ("ping", a: ActorRef) ⇒ sender ! (("pong", a)) + case ("pong", a: ActorRef) ⇒ a ! (("pong", sender.path.toSerializationFormat)) } } - val cfg: Config = ConfigFactory parseString (""" + class Proxy(val one: ActorRef, val another: ActorRef) extends Actor { + def receive = { + case s if sender.path == one.path ⇒ another ! s + case s if sender.path == another.path ⇒ one ! s + } + } + + val cfg: Config = ConfigFactory parseString (s""" common-ssl-settings { - key-store = "%s" - trust-store = "%s" + key-store = "${getClass.getClassLoader.getResource("keystore").getPath}" + trust-store = "${getClass.getClassLoader.getResource("truststore").getPath}" key-store-password = "changeme" trust-store-password = "changeme" protocol = "TLSv1" @@ -83,10 +93,10 @@ object RemotingSpec { } } - netty.tcp = ${common-netty-settings} - netty.udp = ${common-netty-settings} - netty.ssl = ${common-netty-settings} - netty.ssl.security = ${common-ssl-settings} + netty.tcp = $${common-netty-settings} + netty.udp = $${common-netty-settings} + netty.ssl = $${common-netty-settings} + netty.ssl.security = $${common-ssl-settings} test { transport-class = "akka.remote.transport.TestTransport" @@ -104,9 +114,7 @@ object RemotingSpec { /looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345" } } - """.format( - getClass.getClassLoader.getResource("keystore").getPath, - getClass.getClassLoader.getResource("truststore").getPath)) + """) } @@ -122,14 +130,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D maximum-payload-bytes = 48000 bytes } """).withFallback(system.settings.config).resolve() - val otherSystem = ActorSystem("remote-sys", conf) + val remoteSystem = ActorSystem("remote-sys", conf) for ( (name, proto) ← Seq( "/gonk" -> "tcp", "/zagzag" -> "udp", "/roghtaar" -> "ssl.tcp") - ) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto)))) + ) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto)))) def addr(sys: ActorSystem, proto: String) = sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get @@ -138,12 +146,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) } - val remote = otherSystem.actorOf(Props[Echo2], "echo") + val remote = remoteSystem.actorOf(Props[Echo2], "echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") private def verifySend(msg: Any)(afterSend: ⇒ Unit) { - val bigBounceOther = otherSystem.actorOf(Props(new Actor { + val bigBounceOther = remoteSystem.actorOf(Props(new Actor { def receive = { case x: Int ⇒ sender ! byteStringOfSize(x) case x ⇒ sender ! x @@ -166,16 +174,26 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent]) system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent]) system.stop(eventForwarder) - otherSystem.stop(bigBounceOther) + remoteSystem.stop(bigBounceOther) } } + override def atStartup() = { + system.eventStream.publish(TestEvent.Mute( + EventFilter.error(start = "AssociationError"), + EventFilter.warning(pattern = "received dead letter.*"))) + remoteSystem.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointException](), + EventFilter.error(start = "AssociationError"), + EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)"))) + } + private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte)) val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt override def afterTermination() { - otherSystem.shutdown() + remoteSystem.shutdown() AssociationRegistry.clear() } @@ -203,16 +221,21 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "send dead letters on remote if actor does not exist" in { EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(otherSystem) + }(remoteSystem) } "not be exhausted by sending to broken connections" in { val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]"""). - withFallback(otherSystem.settings.config) - val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig)) - moreSystems foreach (_.actorOf(Props[Echo2], name = "echo")) + withFallback(remoteSystem.settings.config) + val moreSystems = Vector.fill(5)(ActorSystem(remoteSystem.name, tcpOnlyConfig)) + moreSystems foreach { sys ⇒ + sys.eventStream.publish(TestEvent.Mute( + EventFilter[EndpointDisassociatedException](), + EventFilter.warning(pattern = "received dead letter.*"))) + sys.actorOf(Props[Echo2], name = "echo") + } val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo")) - val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo") + val aliveEcho = system.actorFor(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo") val n = 100 // first everything is up and running @@ -229,7 +252,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D moreSystems foreach { sys ⇒ sys.shutdown() sys.awaitTermination(5.seconds.dilated) - sys.isTerminated must be(true) } 1 to n foreach { x ⇒ @@ -259,7 +281,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } "not send to remote re-created actor with same name" in { - val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1") + val echo = remoteSystem.actorOf(Props[Echo1], "otherEcho1") echo ! 71 expectMsg(71) echo ! PoisonPill @@ -267,7 +289,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D echo ! 72 expectNoMsg(1.second) - val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1") + val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1") echo2 ! 73 expectMsg(73) // msg to old ActorRef (different uid) should not get through @@ -275,7 +297,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D echo ! 74 expectNoMsg(1.second) - otherSystem.actorFor("/user/otherEcho1") ! 75 + remoteSystem.actorFor("/user/otherEcho1") ! 75 expectMsg(75) system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76 @@ -289,11 +311,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case s: String ⇒ sender ! context.actorFor(s) } }), "looker") - // child is configured to be deployed on remote-sys (otherSystem) - l ! (Props[Echo1], "child") + // child is configured to be deployed on remote-sys (remoteSystem) + l ! ((Props[Echo1], "child")) val child = expectMsgType[ActorRef] // grandchild is configured to be deployed on RemotingSpec (system) - child ! (Props[Echo1], "grandchild") + child ! ((Props[Echo1], "grandchild")) val grandchild = expectMsgType[ActorRef] grandchild.asInstanceOf[ActorRefScope].isLocal must be(true) grandchild ! 43 @@ -313,7 +335,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D child ! PoisonPill expectMsg("postStop") expectMsgType[Terminated].actor must be === child - l ! (Props[Echo1], "child") + l ! ((Props[Echo1], "child")) val child2 = expectMsgType[ActorRef] child2 ! 45 expectMsg(45) @@ -335,7 +357,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (TCP)" in { val r = system.actorOf(Props[Echo1], "gonk") r.path.toString must be === - s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" + s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" r ! 42 expectMsg(42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -351,7 +373,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (UDP)" in { val r = system.actorOf(Props[Echo1], "zagzag") r.path.toString must be === - s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" + s"akka.udp://remote-sys@localhost:${port(remoteSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -367,7 +389,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (SSL)" in { val r = system.actorOf(Props[Echo1], "roghtaar") r.path.toString must be === - s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" + s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -415,15 +437,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } } + "be able to serialize a local actor ref from another actor system" in { + val config = ConfigFactory.parseString(""" + akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"] + akka.remote.test.local-address = "test://other-system@localhost:12347" + """).withFallback(remoteSystem.settings.config) + val otherSystem = ActorSystem("other-system", config) + try { + val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy") + // check that we use the specified transport address instead of the default + val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp")) + val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo") + val proxyTcp = system.actorOf(Props(new Proxy(remoteEchoHereTcp, testActor)), "proxy-tcp") + proxyTcp ! otherGuy + expectMsg(3.seconds, ("pong", otherGuyRemoteTcp)) + // now check that we fall back to default when we haven't got a corresponding transport + val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test")) + val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo") + val proxySsl = system.actorOf(Props(new Proxy(remoteEchoHereSsl, testActor)), "proxy-ssl") + proxySsl ! otherGuy + expectMsg(3.seconds, ("pong", otherGuyRemoteTest)) + } finally { + otherSystem.shutdown() + otherSystem.awaitTermination(5.seconds.dilated) + } + } } - - override def beforeTermination() { - system.eventStream.publish(TestEvent.Mute( - EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) - otherSystem.eventStream.publish(TestEvent.Mute( - EventFilter[EndpointException](), - EventFilter.error(start = "AssociationError"), - EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)"))) - } - } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index 7d78f053a4..860b073c39 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -54,7 +54,7 @@ object AkkaProtocolStressTest { losses += seq - maxSeq - 1 maxSeq = seq if (seq > limit * 0.9) { - controller ! (maxSeq, losses) + controller ! ((maxSeq, losses)) } } else { controller ! s"Received out of order message. Previous: ${maxSeq} Received: ${seq}" diff --git a/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala index 0c130d9cd9..25a696f131 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SwitchableLoggedBehaviorSpec.scala @@ -21,19 +21,19 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout { "execute default behavior" in { val behavior = defaultBehavior - Await.result(behavior(), timeout.duration) == 3 must be(true) + Await.result(behavior(()), timeout.duration) must be === 3 } "be able to push generic behavior" in { val behavior = defaultBehavior behavior.push((_) ⇒ Promise.successful(4).future) - Await.result(behavior(), timeout.duration) must be(4) + Await.result(behavior(()), timeout.duration) must be(4) behavior.push((_) ⇒ Promise.failed(TestException).future) - behavior().value match { - case Some(Failure(e)) if e eq TestException ⇒ - case _ ⇒ fail("Expected exception") + behavior(()).value match { + case Some(Failure(`TestException`)) ⇒ + case _ ⇒ fail("Expected exception") } } @@ -41,15 +41,15 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout { val behavior = defaultBehavior behavior.pushConstant(5) - Await.result(behavior(), timeout.duration) must be(5) - Await.result(behavior(), timeout.duration) must be(5) + Await.result(behavior(()), timeout.duration) must be(5) + Await.result(behavior(()), timeout.duration) must be(5) } "be able to push failure behavior" in { val behavior = defaultBehavior behavior.pushError(TestException) - behavior().value match { + behavior(()).value match { case Some(Failure(e)) if e eq TestException ⇒ case _ ⇒ fail("Expected exception") } @@ -59,16 +59,16 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout { val behavior = defaultBehavior behavior.pushConstant(5) - Await.result(behavior(), timeout.duration) must be(5) + Await.result(behavior(()), timeout.duration) must be(5) behavior.pushConstant(7) - Await.result(behavior(), timeout.duration) must be(7) + Await.result(behavior(()), timeout.duration) must be(7) behavior.pop() - Await.result(behavior(), timeout.duration) must be(5) + Await.result(behavior(()), timeout.duration) must be(5) behavior.pop() - Await.result(behavior(), timeout.duration) must be(3) + Await.result(behavior(()), timeout.duration) must be(3) } @@ -78,13 +78,13 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout { behavior.pop() behavior.pop() - Await.result(behavior(), timeout.duration) must be(3) + Await.result(behavior(()), timeout.duration) must be(3) } "enable delayed completition" in { val behavior = defaultBehavior val controlPromise = behavior.pushDelayed - val f = behavior() + val f = behavior(()) f.isCompleted must be(false) controlPromise.success(()) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 57aaf1f3b3..adaafd20f2 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -111,13 +111,13 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig) //#test-statsService "show usage of the statsService from one node" in within(15 seconds) { runOn(second) { - assertServiceOk + assertServiceOk() } testConductor.enter("done-2") } - def assertServiceOk: Unit = { + def assertServiceOk(): Unit = { val service = system.actorFor(node(third) / "user" / "statsService") // eventually the service should be ok, // first attempts might fail because worker actors not started yet @@ -135,7 +135,7 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig) //#test-statsService "show usage of the statsService from all nodes" in within(15 seconds) { - assertServiceOk + assertServiceOk() testConductor.enter("done-3") } diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala index 8cd068123b..f2be44b973 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala @@ -94,13 +94,13 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf "show usage of the statsService from one node" in within(15 seconds) { runOn(second) { - assertServiceOk + assertServiceOk() } testConductor.enter("done-2") } - def assertServiceOk: Unit = { + def assertServiceOk(): Unit = { val service = system.actorFor(node(third) / "user" / "statsService") // eventually the service should be ok, // first attempts might fail because worker actors not started yet @@ -117,7 +117,7 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf //#test-statsService "show usage of the statsService from all nodes" in within(15 seconds) { - assertServiceOk + assertServiceOk() testConductor.enter("done-3") } diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala index ef7730d08e..e43538596f 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/TransformationSampleSpec.scala @@ -84,7 +84,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp testConductor.enter("backend1-started") runOn(frontend1) { - assertServiceOk + assertServiceOk() } testConductor.enter("frontend1-backend1-ok") @@ -105,7 +105,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp testConductor.enter("all-started") runOn(frontend1, frontend2) { - assertServiceOk + assertServiceOk() } testConductor.enter("all-ok") @@ -114,7 +114,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp } - def assertServiceOk: Unit = { + def assertServiceOk(): Unit = { val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend") // eventually the service should be ok, // backends might not have registered initially diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala index c781ffe809..58406f6717 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/transformation/japi/TransformationSampleJapiSpec.scala @@ -86,7 +86,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation testConductor.enter("backend1-started") runOn(frontend1) { - assertServiceOk + assertServiceOk() } testConductor.enter("frontend1-backend1-ok") @@ -106,7 +106,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation testConductor.enter("all-started") runOn(frontend1, frontend2) { - assertServiceOk + assertServiceOk() } testConductor.enter("all-ok") @@ -115,7 +115,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation } - def assertServiceOk: Unit = { + def assertServiceOk(): Unit = { val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend") // eventually the service should be ok, // backends might not have registered initially diff --git a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala index 8ebaa4a2e9..b773f316d4 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/Buncher.scala @@ -76,7 +76,7 @@ abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: FiniteDuration, stop } - initialize + initialize() } object Buncher { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index ad91e540b3..e5c621d74a 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -133,9 +133,9 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { object DiningHakkers { val system = ActorSystem() - def main(args: Array[String]): Unit = run + def main(args: Array[String]): Unit = run() - def run { + def run(): Unit = { //Create 5 chopsticks val chopsticks = for (i ← 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index f7bb9e6bd7..fc43289c3c 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -55,7 +55,7 @@ class Chopstick extends Actor with FSM[ChopstickState, TakenBy] { } // Initialze the chopstick - initialize + initialize() } /** @@ -155,7 +155,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } // Initialize the hakker - initialize + initialize() private def startThinking(duration: FiniteDuration): State = { goto(Thinking) using TakenChopsticks(None, None) forMax duration @@ -169,9 +169,9 @@ object DiningHakkersOnFsm { val system = ActorSystem() - def main(args: Array[String]): Unit = run + def main(args: Array[String]): Unit = run() - def run = { + def run(): Unit = { // Create 5 chopsticks val chopsticks = for (i ← 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i) // Create 5 awesome fsm hakkers and assign them their left and right chopstick diff --git a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/CreationApplication.scala b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/CreationApplication.scala index b45e2e7ec4..db022a638d 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/CreationApplication.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/CreationApplication.scala @@ -20,9 +20,8 @@ class CreationApplication extends Bootable { val remoteActor = system.actorOf(Props[AdvancedCalculatorActor], "advancedCalculator") - def doSomething(op: MathOp) = { - localActor ! (remoteActor, op) - } + def doSomething(op: MathOp): Unit = + localActor ! ((remoteActor, op)) //#setup def startup() { diff --git a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/LookupApplication.scala b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/LookupApplication.scala index 6fa2d1b6df..159537042b 100644 --- a/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/LookupApplication.scala +++ b/akka-samples/akka-sample-remote/src/main/scala/sample/remote/calculator/LookupApplication.scala @@ -22,9 +22,8 @@ class LookupApplication extends Bootable { val remoteActor = system.actorFor( "akka.tcp://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator") - def doSomething(op: MathOp) = { - actor ! (remoteActor, op) - } + def doSomething(op: MathOp): Unit = + actor ! ((remoteActor, op)) //#setup def startup() { diff --git a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala index efeba0beab..04a305e716 100644 --- a/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala +++ b/akka-slf4j/src/test/scala/akka/event/slf4j/Slf4jLoggerSpec.scala @@ -81,7 +81,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft } "log info with parameters" in { - producer ! ("test x={} y={}", 3, 17) + producer ! (("test x={} y={}", 3, 17)) awaitCond(outputString.contains("----"), 5 seconds) val s = outputString diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 361c6d4c29..13413eb462 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -48,7 +48,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { private var lastGC = 0l // we have to forget about long-gone threads sometime - private def gc { + private def gc(): Unit = { queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]] /: queues) { case (m, (k, v)) ⇒ val nv = v filter (_.get ne null) @@ -66,7 +66,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { val now = System.nanoTime if (now - lastGC > 1000000000l) { lastGC = now - gc + gc() } } @@ -165,16 +165,14 @@ class CallingThreadDispatcher( mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues } - override def suspend(actor: ActorCell) { + protected[akka] override def suspend(actor: ActorCell) { actor.mailbox match { - case m: CallingThreadMailbox ⇒ - m.suspendSwitch.switchOn; m.suspend() - case m ⇒ - m.systemEnqueue(actor.self, Suspend()) + case m: CallingThreadMailbox ⇒ { m.suspendSwitch.switchOn; m.suspend() } + case m ⇒ m.systemEnqueue(actor.self, Suspend()) } } - override def resume(actor: ActorCell) { + protected[akka] override def resume(actor: ActorCell) { actor.mailbox match { case mbox: CallingThreadMailbox ⇒ val queue = mbox.queue diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 043be02744..9c0a103487 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -47,7 +47,7 @@ class TestActorRef[T <: Actor]( import TestActorRef.InternalGetActor - override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = + protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = new ActorCell(system, ref, props, supervisor) { override def autoReceiveMessage(msg: Envelope) { msg.message match { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala index 989d689a53..8d0403dbfc 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala @@ -38,5 +38,5 @@ class TestBarrier(count: Int) { } } - def reset = barrier.reset + def reset(): Unit = barrier.reset() } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index cb48deb302..e6c1967ea4 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -788,3 +788,13 @@ private[testkit] abstract class CachingPartialFunction[A, B <: AnyRef] extends s final def isDefinedAt(x: A): Boolean = try { cache = `match`(x); true } catch { case NoMatch ⇒ cache = null.asInstanceOf[B]; false } final override def apply(x: A): B = cache } + +/** + * Wrapper for implicit conversion to add dilated function to Duration. + */ +class TestDuration(duration: FiniteDuration) { + def dilated(implicit system: ActorSystem): FiniteDuration = { + // this cast will succeed unless TestTimeFactor is non-finite (which would be a misconfiguration) + (duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration] + } +} diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 7677793414..4f8167acd7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -47,14 +47,4 @@ package object testkit { * Corresponding Java API is available in TestKit.dilated */ implicit def duration2TestDuration(duration: FiniteDuration) = new TestDuration(duration) - - /** - * Wrapper for implicit conversion to add dilated function to Duration. - */ - class TestDuration(duration: FiniteDuration) { - def dilated(implicit system: ActorSystem): FiniteDuration = { - // this cast will succeed unless TestTimeFactor is non-finite (which would be a misconfiguration) - (duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration] - } - } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index f0ac8469da..1c06a63043 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -109,13 +109,9 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA import TestActorRefSpec._ - override def beforeEach { - otherthread = null - } + override def beforeEach(): Unit = otherthread = null - private def assertThread { - otherthread must (be(null) or equal(thread)) - } + private def assertThread(): Unit = otherthread must (be(null) or equal(thread)) "A TestActorRef must be an ActorRef, hence it" must { @@ -167,7 +163,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA counter must be(0) - assertThread + assertThread() } "stop when sent a poison pill" in { @@ -185,7 +181,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA case WrappedTerminated(Terminated(`a`)) ⇒ true } a.isTerminated must be(true) - assertThread + assertThread() } } @@ -209,7 +205,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA boss ! "sendKill" counter must be(0) - assertThread + assertThread() } } diff --git a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala index 990fd48a2f..e7323536a5 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/SocketOption.scala @@ -77,7 +77,7 @@ class Context(numIoThreads: Int) extends SocketMeta { def poller: Poller = context.poller - def term: Unit = context.term + def term(): Unit = context.term() } /** diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 06d4a8c86d..eabeb7373c 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -62,7 +62,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { * @return the [[akka.actor.Props]] */ def newSocketProps(socketParameters: SocketOption*): Props = { - verifyZeroMQVersion + verifyZeroMQVersion() require(socketParameters exists { case s: SocketType.ZMQSocketType ⇒ true case _ ⇒ false @@ -239,7 +239,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { def newRepSocket(socketParameters: Array[SocketOption]): ActorRef = newSocket((SocketType.Rep +: socketParameters): _*) private val zeromqGuardian: ActorRef = { - verifyZeroMQVersion + verifyZeroMQVersion() system.actorOf(Props(new Actor { import SupervisorStrategy._ @@ -257,7 +257,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { }), "zeromq") } - private def verifyZeroMQVersion = { + private def verifyZeroMQVersion(): Unit = { require( JZMQ.getFullVersion > ZeroMQExtension.minVersion, "Unsupported ZeroMQ version: %s, akka needs at least: %s".format(JZMQ.getVersionString, ZeroMQExtension.minVersionString)) diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 8eb10ff79c..4697db8b23 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -15,14 +15,14 @@ class ConcurrentSocketActorSpec extends AkkaSpec { implicit val timeout: Timeout = Timeout(15 seconds) - def checkZeroMQInstallation = + def checkZeroMQInstallation() = try { zmq.version match { case ZeroMQVersion(x, y, _) if x >= 3 || (x >= 2 && y >= 1) ⇒ Unit case version ⇒ invalidZeroMQVersion(version) } } catch { - case e: LinkageError ⇒ zeroMQNotInstalled + case e: LinkageError ⇒ zeroMQNotInstalled() } def invalidZeroMQVersion(version: ZeroMQVersion) { @@ -30,19 +30,19 @@ class ConcurrentSocketActorSpec extends AkkaSpec { pending } - def zeroMQNotInstalled { + def zeroMQNotInstalled(): Unit = { info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.") pending } val endpoint = "tcp://127.0.0.1:%s" format { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() } - // this must stay a def for checkZeroMQInstallation to work correctly + // this must stay a def for checkZeroMQInstallation() to work correctly def zmq = ZeroMQExtension(system) "ConcurrentSocketActor" should { "support pub-sub connections" in { - checkZeroMQInstallation + checkZeroMQInstallation() val subscriberProbe = TestProbe() val context = Context() val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint)) @@ -79,7 +79,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { } "support req-rep connections" in { - checkZeroMQInstallation + checkZeroMQInstallation() val requesterProbe = TestProbe() val replierProbe = TestProbe() val context = Context() @@ -106,7 +106,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { } "should support push-pull connections" in { - checkZeroMQInstallation + checkZeroMQInstallation() val pullerProbe = TestProbe() val context = Context() val pusher = zmq.newSocket(SocketType.Push, context, Bind(endpoint)) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 218dc0c188..fa9460168f 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -572,7 +572,7 @@ object AkkaBuild extends Build { lazy val defaultSettings = baseSettings ++ formatSettings ++ mimaSettings ++ lsSettings ++ Seq( // compile options - scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.6", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Ywarn-adapted-args"), + scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.6", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint"), javacOptions in Compile ++= Seq("-source", "1.6", "-target", "1.6", "-Xlint:unchecked", "-Xlint:deprecation"), // if changing this between binary and full, also change at the bottom of akka-sbt-plugin/sample/project/Build.scala diff --git a/scripts/samples/start b/scripts/samples/start deleted file mode 100755 index 21563159f0..0000000000 --- a/scripts/samples/start +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -SAMPLE="$(cd "$(cd "$(dirname "$0")"; pwd -P)"/..; pwd)" - -AKKA_HOME="$(cd "$SAMPLE"/../../../..; pwd)" - -[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xms1536M -Xmx1536M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC" - -[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*" - -SAMPLE_CLASSPATH="$SAMPLE/config:$AKKA_CLASSPATH:$SAMPLE/lib/*" - -java $JAVA_OPTS -cp "$SAMPLE_CLASSPATH" -Dakka.home="$SAMPLE" akka.kernel.Main diff --git a/scripts/samples/start.bat b/scripts/samples/start.bat deleted file mode 100644 index a6a3ec5e33..0000000000 --- a/scripts/samples/start.bat +++ /dev/null @@ -1,8 +0,0 @@ -@echo off -set SAMPLE=%~dp0.. -set AKKA_HOME=%SAMPLE%\..\..\..\.. -set JAVA_OPTS=-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC -set AKKA_CLASSPATH=%AKKA_HOME%\lib\scala-library.jar;%AKKA_HOME%\lib\akka\* -set SAMPLE_CLASSPATH=%SAMPLE%\config;%AKKA_CLASSPATH%;%SAMPLE%\lib\* - -java %JAVA_OPTS% -cp "%SAMPLE_CLASSPATH%" -Dakka.home="%SAMPLE%" akka.kernel.Main