Merged with master
This commit is contained in:
commit
cb5c17dd7f
106 changed files with 528 additions and 476 deletions
|
|
@ -2,7 +2,7 @@
|
|||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
package akka
|
||||
|
||||
import scala.collection.immutable
|
||||
import java.net.InetSocketAddress
|
||||
|
|
@ -159,7 +159,7 @@ class ActorDSLSpec extends AkkaSpec {
|
|||
become {
|
||||
case "die" ⇒ throw new Exception
|
||||
}
|
||||
whenFailing { (cause, msg) ⇒ testActor ! (cause, msg) }
|
||||
whenFailing { case m @ (cause, msg) ⇒ testActor ! m }
|
||||
whenRestarted { cause ⇒ testActor ! cause }
|
||||
})
|
||||
//#failing-actor
|
||||
|
|
|
|||
|
|
@ -6,9 +6,6 @@ package akka.actor
|
|||
|
||||
import language.postfixOps
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
import akka.testkit._
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -17,6 +14,7 @@ import java.lang.IllegalStateException
|
|||
import scala.concurrent.Promise
|
||||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
|
|
@ -43,19 +41,17 @@ object ActorRefSpec {
|
|||
import context.system
|
||||
def receive = {
|
||||
case "work" ⇒ {
|
||||
work
|
||||
work()
|
||||
sender ! "workDone"
|
||||
context.stop(self)
|
||||
}
|
||||
case ReplyTo(replyTo) ⇒ {
|
||||
work
|
||||
work()
|
||||
replyTo ! "complexReply"
|
||||
}
|
||||
}
|
||||
|
||||
private def work {
|
||||
Thread.sleep(1.second.dilated.toMillis)
|
||||
}
|
||||
private def work(): Unit = Thread.sleep(1.second.dilated.toMillis)
|
||||
}
|
||||
|
||||
class SenderActor(replyActor: ActorRef, latch: TestLatch) extends Actor {
|
||||
|
|
@ -143,7 +139,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
new Actor { def receive = { case _ ⇒ } }
|
||||
}
|
||||
|
||||
def contextStackMustBeEmpty = ActorCell.contextStack.get.headOption must be === None
|
||||
def contextStackMustBeEmpty(): Unit = ActorCell.contextStack.get.headOption must be === None
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
intercept[akka.actor.ActorInitializationException] {
|
||||
|
|
@ -154,7 +150,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
})))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -163,7 +159,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -172,7 +168,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -181,7 +177,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(promiseIntercept(new FailingInheritingOuterActor(actorOf(Props(new InnerActor))))(result))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 2) intercept {
|
||||
|
|
@ -190,7 +186,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 2) intercept {
|
||||
|
|
@ -199,7 +195,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 2) intercept {
|
||||
|
|
@ -208,7 +204,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new FailingInheritingOuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -219,7 +215,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
}))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 2) intercept {
|
||||
|
|
@ -228,7 +224,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new FailingOuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -237,7 +233,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -246,7 +242,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result)))))))
|
||||
}
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1) intercept {
|
||||
|
|
@ -255,7 +251,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({ throw new IllegalStateException("Ur state be b0rked"); new InnerActor })(result)))))))
|
||||
}).getMessage must be === "Ur state be b0rked"
|
||||
|
||||
contextStackMustBeEmpty
|
||||
contextStackMustBeEmpty()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -318,17 +314,21 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
val out = new ObjectOutputStream(baos)
|
||||
|
||||
val sysImpl = system.asInstanceOf[ActorSystemImpl]
|
||||
val addr = sysImpl.provider.rootPath.address
|
||||
val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing"))
|
||||
val ref = system.actorOf(Props[ReplyActor], "non-existing")
|
||||
val serialized = SerializedActorRef(ref)
|
||||
|
||||
out.writeObject(serialized)
|
||||
|
||||
out.flush
|
||||
out.close
|
||||
|
||||
ref ! PoisonPill
|
||||
|
||||
verifyActorTermination(ref)
|
||||
|
||||
JavaSerializer.currentSystem.withValue(sysImpl) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream)
|
||||
in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, ref.path, system.eventStream)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -403,7 +403,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
Await.result(ffive, timeout.duration) must be("five")
|
||||
Await.result(fnull, timeout.duration) must be("null")
|
||||
|
||||
awaitCond(ref.isTerminated, 2000 millis)
|
||||
verifyActorTermination(ref)
|
||||
}
|
||||
|
||||
"restart when Kill:ed" in {
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ object Chameneos {
|
|||
}
|
||||
}
|
||||
|
||||
def run {
|
||||
def run(): Unit = {
|
||||
// System.setProperty("akka.config", "akka.conf")
|
||||
Chameneos.start = System.currentTimeMillis
|
||||
val system = ActorSystem()
|
||||
|
|
@ -114,5 +114,5 @@ object Chameneos {
|
|||
system.shutdown()
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = run
|
||||
def main(args: Array[String]): Unit = run()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ object FSMActorSpec {
|
|||
case incomplete if incomplete.length < code.length ⇒
|
||||
stay using CodeState(incomplete, code)
|
||||
case codeTry if (codeTry == code) ⇒ {
|
||||
doUnlock
|
||||
doUnlock()
|
||||
goto(Open) using CodeState("", code) forMax timeout
|
||||
}
|
||||
case wrong ⇒ {
|
||||
|
|
@ -60,7 +60,7 @@ object FSMActorSpec {
|
|||
|
||||
when(Open) {
|
||||
case Event(StateTimeout, _) ⇒ {
|
||||
doLock
|
||||
doLock()
|
||||
goto(Locked)
|
||||
}
|
||||
}
|
||||
|
|
@ -87,19 +87,15 @@ object FSMActorSpec {
|
|||
onTermination {
|
||||
case StopEvent(FSM.Shutdown, Locked, _) ⇒
|
||||
// stop is called from lockstate with shutdown as reason...
|
||||
terminatedLatch.open
|
||||
terminatedLatch.open()
|
||||
}
|
||||
|
||||
// initialize the lock
|
||||
initialize
|
||||
initialize()
|
||||
|
||||
private def doLock() {
|
||||
lockedLatch.open
|
||||
}
|
||||
private def doLock(): Unit = lockedLatch.open()
|
||||
|
||||
private def doUnlock = {
|
||||
unlockedLatch.open
|
||||
}
|
||||
private def doUnlock(): Unit = unlockedLatch.open()
|
||||
}
|
||||
|
||||
case class CodeState(soFar: String, code: String)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ object FSMTransitionSpec {
|
|||
whenUnhandled {
|
||||
case Event("reply", _) ⇒ stay replying "reply"
|
||||
}
|
||||
initialize
|
||||
initialize()
|
||||
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
|
|||
}
|
||||
|
||||
"be canceled if cancel is performed before execution" in {
|
||||
val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)())
|
||||
val task = collectCancellable(system.scheduler.scheduleOnce(10 seconds)(()))
|
||||
task.cancel() must be(true)
|
||||
task.isCancelled must be(true)
|
||||
task.cancel() must be(false)
|
||||
|
|
|
|||
|
|
@ -698,8 +698,7 @@ object SupervisorHierarchySpec {
|
|||
stop
|
||||
}
|
||||
|
||||
initialize
|
||||
|
||||
initialize()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,8 +68,8 @@ object ActorModelSpec {
|
|||
|
||||
def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor]
|
||||
|
||||
def ack {
|
||||
if (!busy.switchOn()) {
|
||||
def ack(): Unit = {
|
||||
if (!busy.switchOn(())) {
|
||||
throw new Exception("isolation violated")
|
||||
} else {
|
||||
interceptor.getStats(self).msgsProcessed.incrementAndGet()
|
||||
|
|
@ -81,21 +81,21 @@ object ActorModelSpec {
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case AwaitLatch(latch) ⇒ { ack; latch.await(); busy.switchOff() }
|
||||
case Meet(sign, wait) ⇒ { ack; sign.countDown(); wait.await(); busy.switchOff() }
|
||||
case Wait(time) ⇒ { ack; Thread.sleep(time); busy.switchOff() }
|
||||
case WaitAck(time, l) ⇒ { ack; Thread.sleep(time); l.countDown(); busy.switchOff() }
|
||||
case Reply(msg) ⇒ { ack; sender ! msg; busy.switchOff() }
|
||||
case TryReply(msg) ⇒ { ack; sender.tell(msg, null); busy.switchOff() }
|
||||
case Forward(to, msg) ⇒ { ack; to.forward(msg); busy.switchOff() }
|
||||
case CountDown(latch) ⇒ { ack; latch.countDown(); busy.switchOff() }
|
||||
case Increment(count) ⇒ { ack; count.incrementAndGet(); busy.switchOff() }
|
||||
case CountDownNStop(l) ⇒ { ack; l.countDown(); context.stop(self); busy.switchOff() }
|
||||
case Restart ⇒ { ack; busy.switchOff(); throw new Exception("Restart requested") }
|
||||
case Interrupt ⇒ { ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") }
|
||||
case InterruptNicely(msg) ⇒ { ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() }
|
||||
case ThrowException(e: Throwable) ⇒ { ack; busy.switchOff(); throw e }
|
||||
case DoubleStop ⇒ { ack; context.stop(self); context.stop(self); busy.switchOff }
|
||||
case AwaitLatch(latch) ⇒ { ack(); latch.await(); busy.switchOff(()) }
|
||||
case Meet(sign, wait) ⇒ { ack(); sign.countDown(); wait.await(); busy.switchOff(()) }
|
||||
case Wait(time) ⇒ { ack(); Thread.sleep(time); busy.switchOff(()) }
|
||||
case WaitAck(time, l) ⇒ { ack(); Thread.sleep(time); l.countDown(); busy.switchOff(()) }
|
||||
case Reply(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()) }
|
||||
case TryReply(msg) ⇒ { ack(); sender.tell(msg, null); busy.switchOff(()) }
|
||||
case Forward(to, msg) ⇒ { ack(); to.forward(msg); busy.switchOff(()) }
|
||||
case CountDown(latch) ⇒ { ack(); latch.countDown(); busy.switchOff(()) }
|
||||
case Increment(count) ⇒ { ack(); count.incrementAndGet(); busy.switchOff(()) }
|
||||
case CountDownNStop(l) ⇒ { ack(); l.countDown(); context.stop(self); busy.switchOff(()) }
|
||||
case Restart ⇒ { ack(); busy.switchOff(()); throw new Exception("Restart requested") }
|
||||
case Interrupt ⇒ { ack(); sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(()); throw new InterruptedException("Ping!") }
|
||||
case InterruptNicely(msg) ⇒ { ack(); sender ! msg; busy.switchOff(()); Thread.currentThread().interrupt() }
|
||||
case ThrowException(e: Throwable) ⇒ { ack(); busy.switchOff(()); throw e }
|
||||
case DoubleStop ⇒ { ack(); context.stop(self); context.stop(self); busy.switchOff }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -124,12 +124,12 @@ object ActorModelSpec {
|
|||
}
|
||||
}
|
||||
|
||||
abstract override def suspend(actor: ActorCell) {
|
||||
protected[akka] abstract override def suspend(actor: ActorCell) {
|
||||
getStats(actor.self).suspensions.incrementAndGet()
|
||||
super.suspend(actor)
|
||||
}
|
||||
|
||||
abstract override def resume(actor: ActorCell) {
|
||||
protected[akka] abstract override def resume(actor: ActorCell) {
|
||||
super.resume(actor)
|
||||
getStats(actor.self).resumes.incrementAndGet()
|
||||
}
|
||||
|
|
@ -330,16 +330,12 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
|
|||
}
|
||||
|
||||
def spawn(f: ⇒ Unit) {
|
||||
val thread = new Thread {
|
||||
override def run {
|
||||
try {
|
||||
f
|
||||
} catch {
|
||||
case e ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
|
||||
(new Thread {
|
||||
override def run(): Unit =
|
||||
try f catch {
|
||||
case e: Throwable ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
|
||||
}
|
||||
}
|
||||
}
|
||||
thread.start()
|
||||
}).start()
|
||||
}
|
||||
|
||||
"not process messages for a suspended actor" in {
|
||||
|
|
@ -380,7 +376,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
|
|||
try {
|
||||
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
|
||||
} catch {
|
||||
case e ⇒
|
||||
case e: Throwable ⇒
|
||||
dispatcher match {
|
||||
case dispatcher: BalancingDispatcher ⇒
|
||||
val team = dispatcher.team
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
callingThreadLock.compareAndSet(1, 0) // Disable the lock
|
||||
}
|
||||
Await.result(p.future, timeout.duration) must be === ()
|
||||
Await.result(p.future, timeout.duration) must be === (())
|
||||
}
|
||||
|
||||
"be able to avoid starvation when Batching is used and Await/blocking is called" in {
|
||||
|
|
|
|||
|
|
@ -573,11 +573,11 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
}
|
||||
|
||||
"should not deadlock with nested await (ticket 1313)" in {
|
||||
val simple = Future() map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration))
|
||||
val simple = Future(()) map (_ ⇒ Await.result((Future(()) map (_ ⇒ ())), timeout.duration))
|
||||
FutureSpec.ready(simple, timeout.duration) must be('completed)
|
||||
|
||||
val l1, l2 = new TestLatch
|
||||
val complex = Future() map { _ ⇒
|
||||
val complex = Future(()) map { _ ⇒
|
||||
val nested = Future(())
|
||||
nested foreach (_ ⇒ l1.open())
|
||||
FutureSpec.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
|
||||
|
|
@ -589,7 +589,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
|
|||
|
||||
"re-use the same thread for nested futures with batching ExecutionContext" in {
|
||||
val failCount = new java.util.concurrent.atomic.AtomicInteger
|
||||
val f = Future() flatMap { _ ⇒
|
||||
val f = Future(()) flatMap { _ ⇒
|
||||
val originalThread = Thread.currentThread
|
||||
// run some nested futures
|
||||
val nested =
|
||||
|
|
|
|||
|
|
@ -9,7 +9,8 @@ import com.typesafe.config.{ Config, ConfigFactory }
|
|||
import akka.actor.{ ActorRef, Actor, ActorSystem }
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.event.Logging.{ LogEvent, LoggerInitialized, InitializeLogger }
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.event.Logging.{ Warning, LogEvent, LoggerInitialized, InitializeLogger }
|
||||
|
||||
object LoggerSpec {
|
||||
|
||||
|
|
@ -37,6 +38,21 @@ object LoggerSpec {
|
|||
}
|
||||
""").withFallback(AkkaSpec.testConf)
|
||||
|
||||
val ticket3165Config = ConfigFactory.parseString("""
|
||||
akka {
|
||||
stdout-loglevel = "WARNING"
|
||||
loglevel = "DEBUG"
|
||||
loggers = ["akka.event.LoggerSpec$TestLogger1"]
|
||||
actor {
|
||||
serialize-messages = on
|
||||
serialization-bindings {
|
||||
"akka.event.Logging$LogEvent" = bytes
|
||||
"java.io.Serializable" = java
|
||||
}
|
||||
}
|
||||
}
|
||||
""").withFallback(AkkaSpec.testConf)
|
||||
|
||||
case class SetTarget(ref: ActorRef, qualifier: Int)
|
||||
|
||||
class TestLogger1 extends TestLogger(1)
|
||||
|
|
@ -127,4 +143,16 @@ class LoggerSpec extends WordSpec with MustMatchers {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
"Ticket 3165 - serialize-messages and dual-entry serialization of LogEvent" must {
|
||||
"not cause StackOverflowError" in {
|
||||
implicit val s = ActorSystem("foo", ticket3165Config)
|
||||
try {
|
||||
SerializationExtension(s).serialize(Warning("foo", classOf[String]))
|
||||
} finally {
|
||||
s.shutdown()
|
||||
s.awaitTermination(5.seconds.dilated)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.io
|
|||
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
import Tcp._
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
|
||||
class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4")
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import scala.util.control.NonFatal
|
|||
import org.scalatest.matchers._
|
||||
import akka.io.Tcp._
|
||||
import akka.io.SelectionHandler._
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
import akka.actor.{ ActorRef, PoisonPill, Terminated }
|
||||
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
|
||||
|
|
@ -637,7 +638,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
def interestsDesc(interests: Int): String =
|
||||
interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ")
|
||||
}
|
||||
def withUnacceptedConnection(
|
||||
private[io] def withUnacceptedConnection(
|
||||
setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (),
|
||||
connectionActorCons: (ActorRef, ActorRef) ⇒ TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup ⇒ Any): Unit =
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.io
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.util.ByteString
|
||||
import Tcp._
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
import akka.testkit.EventFilter
|
||||
import java.io.IOException
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.ActorRef
|
|||
import scala.collection.immutable
|
||||
import akka.io.Inet.SocketOption
|
||||
import Tcp._
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
|
||||
trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import Tcp._
|
|||
import akka.testkit.EventFilter
|
||||
import akka.io.SelectionHandler._
|
||||
import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming }
|
||||
import akka.TestUtils
|
||||
|
||||
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.io
|
||||
|
||||
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
import akka.util.ByteString
|
||||
import java.net.InetSocketAddress
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ package akka.io
|
|||
|
||||
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
|
||||
import akka.io.UdpFF._
|
||||
import akka.TestUtils
|
||||
import TestUtils._
|
||||
import akka.util.ByteString
|
||||
import java.net.InetSocketAddress
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ class PatternSpec extends AkkaSpec {
|
|||
"complete Future with AskTimeoutException when actor not terminated within timeout" in {
|
||||
val target = system.actorOf(Props[TargetActor])
|
||||
val latch = TestLatch()
|
||||
target ! (latch, remaining)
|
||||
target ! ((latch, remaining))
|
||||
intercept[AskTimeoutException] { Await.result(gracefulStop(target, 500 millis), remaining) }
|
||||
latch.open()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ class OrderbookTest extends JUnitSuite {
|
|||
var tradeObserverMock: TradeObserver = null
|
||||
|
||||
@Before
|
||||
def setUp = {
|
||||
def setUp(): Unit = {
|
||||
tradeObserverMock = mock(classOf[TradeObserver])
|
||||
orderbook = new Orderbook("ERI") with TradeObserver {
|
||||
def trade(bid: Bid, ask: Ask) = tradeObserverMock.trade(bid, ask)
|
||||
|
|
@ -19,7 +19,7 @@ class OrderbookTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def shouldTradeSamePrice = {
|
||||
def shouldTradeSamePrice(): Unit = {
|
||||
val bid = new Bid("ERI", 100, 1000)
|
||||
val ask = new Ask("ERI", 100, 1000)
|
||||
orderbook.addOrder(bid)
|
||||
|
|
@ -33,7 +33,7 @@ class OrderbookTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def shouldTradeTwoLevels = {
|
||||
def shouldTradeTwoLevels(): Unit = {
|
||||
val bid1 = new Bid("ERI", 101, 1000)
|
||||
val bid2 = new Bid("ERI", 100, 1000)
|
||||
val bid3 = new Bid("ERI", 99, 1000)
|
||||
|
|
@ -62,7 +62,7 @@ class OrderbookTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def shouldSplitBid = {
|
||||
def shouldSplitBid(): Unit = {
|
||||
val bid = new Bid("ERI", 100, 300)
|
||||
val ask = new Ask("ERI", 100, 1000)
|
||||
orderbook.addOrder(bid)
|
||||
|
|
@ -77,7 +77,7 @@ class OrderbookTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def shouldSplitAsk = {
|
||||
def shouldSplitAsk(): Unit = {
|
||||
val bid = new Bid("ERI", 100, 1000)
|
||||
val ask = new Ask("ERI", 100, 600)
|
||||
orderbook.addOrder(bid)
|
||||
|
|
|
|||
|
|
@ -379,19 +379,19 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
|
||||
val busy = TestLatch(1)
|
||||
val received0 = TestLatch(1)
|
||||
router ! (busy, received0)
|
||||
router ! ((busy, received0))
|
||||
Await.ready(received0, TestLatch.DefaultTimeout)
|
||||
|
||||
val received1 = TestLatch(1)
|
||||
router ! (1, received1)
|
||||
router ! ((1, received1))
|
||||
Await.ready(received1, TestLatch.DefaultTimeout)
|
||||
|
||||
val received2 = TestLatch(1)
|
||||
router ! (2, received2)
|
||||
router ! ((2, received2))
|
||||
Await.ready(received2, TestLatch.DefaultTimeout)
|
||||
|
||||
val received3 = TestLatch(1)
|
||||
router ! (3, received3)
|
||||
router ! ((3, received3))
|
||||
Await.ready(received3, TestLatch.DefaultTimeout)
|
||||
|
||||
busy.countDown()
|
||||
|
|
|
|||
|
|
@ -329,8 +329,8 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
|
|||
val ser = SerializationExtension(system)
|
||||
|
||||
"Cross-version serialization compatibility" must {
|
||||
def verify(obj: Any, asExpected: String): Unit =
|
||||
String.valueOf(encodeHex(ser.serialize(obj, obj.getClass).get)) must be(asExpected)
|
||||
def verify(obj: SystemMessage, asExpected: String): Unit =
|
||||
String.valueOf(ser.serialize((obj, obj.getClass)).map(encodeHex).get) must be === asExpected
|
||||
|
||||
"be preserved for the Create SystemMessage" in {
|
||||
verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e437265617465bcdf9f7f2675038d02000078707671007e0003")
|
||||
|
|
@ -388,9 +388,9 @@ class OverriddenSystemMessageSerializationSpec extends AkkaSpec(SerializationTes
|
|||
}
|
||||
}
|
||||
|
||||
trait TestSerializable
|
||||
protected[akka] trait TestSerializable
|
||||
|
||||
class TestSerializer extends Serializer {
|
||||
protected[akka] class TestSerializer extends Serializer {
|
||||
def includeManifest: Boolean = false
|
||||
|
||||
def identifier = 9999
|
||||
|
|
@ -403,12 +403,12 @@ class TestSerializer extends Serializer {
|
|||
}
|
||||
|
||||
@SerialVersionUID(1)
|
||||
case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
|
||||
protected[akka] case class FakeThrowable(msg: String) extends Throwable(msg) with Serializable {
|
||||
override def fillInStackTrace = null
|
||||
}
|
||||
|
||||
@SerialVersionUID(1)
|
||||
case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope {
|
||||
protected[akka] case class FakeActorRef(name: String) extends InternalActorRef with ActorRefScope {
|
||||
override def path = RootActorPath(Address("proto", "SomeSystem"), name)
|
||||
override def forward(message: Any)(implicit context: ActorContext) = ???
|
||||
override def isTerminated = ???
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers {
|
|||
val (bsAIt, bsBIt) = (a.iterator, b.iterator)
|
||||
val (vecAIt, vecBIt) = (Vector(a: _*).iterator.buffered, Vector(b: _*).iterator.buffered)
|
||||
(body(bsAIt, bsBIt) == body(vecAIt, vecBIt)) &&
|
||||
(!strict || (bsAIt.toSeq, bsBIt.toSeq) == (vecAIt.toSeq, vecBIt.toSeq))
|
||||
(!strict || (bsAIt.toSeq -> bsBIt.toSeq) == (vecAIt.toSeq -> vecBIt.toSeq))
|
||||
}
|
||||
|
||||
def likeVecBld(body: Builder[Byte, _] ⇒ Unit): Boolean = {
|
||||
|
|
|
|||
|
|
@ -17,17 +17,17 @@ class SwitchSpec extends WordSpec with MustMatchers {
|
|||
s.isOff must be(true)
|
||||
s.isOn must be(false)
|
||||
|
||||
s.switchOn("hello") must be(true)
|
||||
s.switchOn(()) must be(true)
|
||||
s.isOn must be(true)
|
||||
s.isOff must be(false)
|
||||
s.switchOn("hello") must be(false)
|
||||
s.switchOn(()) must be(false)
|
||||
s.isOn must be(true)
|
||||
s.isOff must be(false)
|
||||
|
||||
s.switchOff("hello") must be(true)
|
||||
s.switchOff(()) must be(true)
|
||||
s.isOff must be(true)
|
||||
s.isOn must be(false)
|
||||
s.switchOff("hello") must be(false)
|
||||
s.switchOff(()) must be(false)
|
||||
s.isOff must be(true)
|
||||
s.isOn must be(false)
|
||||
}
|
||||
|
|
@ -44,34 +44,34 @@ class SwitchSpec extends WordSpec with MustMatchers {
|
|||
val s = new Switch(false)
|
||||
s.ifOffYield("yes") must be(Some("yes"))
|
||||
s.ifOnYield("no") must be(None)
|
||||
s.ifOff("yes") must be(true)
|
||||
s.ifOn("no") must be(false)
|
||||
s.ifOff(()) must be(true)
|
||||
s.ifOn(()) must be(false)
|
||||
|
||||
s.switchOn()
|
||||
s.switchOn(())
|
||||
s.ifOnYield("yes") must be(Some("yes"))
|
||||
s.ifOffYield("no") must be(None)
|
||||
s.ifOn("yes") must be(true)
|
||||
s.ifOff("no") must be(false)
|
||||
s.ifOn(()) must be(true)
|
||||
s.ifOff(()) must be(false)
|
||||
}
|
||||
|
||||
"run action with locking" in {
|
||||
val s = new Switch(false)
|
||||
s.whileOffYield("yes") must be(Some("yes"))
|
||||
s.whileOnYield("no") must be(None)
|
||||
s.whileOff("yes") must be(true)
|
||||
s.whileOn("no") must be(false)
|
||||
s.whileOff(()) must be(true)
|
||||
s.whileOn(()) must be(false)
|
||||
|
||||
s.switchOn()
|
||||
s.switchOn(())
|
||||
s.whileOnYield("yes") must be(Some("yes"))
|
||||
s.whileOffYield("no") must be(None)
|
||||
s.whileOn("yes") must be(true)
|
||||
s.whileOff("no") must be(false)
|
||||
s.whileOn(()) must be(true)
|
||||
s.whileOff(()) must be(false)
|
||||
}
|
||||
|
||||
"run first or second action depending on state" in {
|
||||
val s = new Switch(false)
|
||||
s.fold("on")("off") must be("off")
|
||||
s.switchOn()
|
||||
s.switchOn(())
|
||||
s.fold("on")("off") must be("on")
|
||||
}
|
||||
|
||||
|
|
@ -80,14 +80,14 @@ class SwitchSpec extends WordSpec with MustMatchers {
|
|||
|
||||
s.locked {
|
||||
Thread.sleep(500)
|
||||
s.switchOn()
|
||||
s.switchOn(())
|
||||
s.isOn must be(true)
|
||||
}
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
new Thread {
|
||||
override def run(): Unit = {
|
||||
s.switchOff()
|
||||
s.switchOff(())
|
||||
latch.countDown()
|
||||
}
|
||||
}.start()
|
||||
|
|
|
|||
|
|
@ -6,14 +6,12 @@ package akka.actor
|
|||
|
||||
import akka.dispatch._
|
||||
import akka.dispatch.sysmsg._
|
||||
import akka.util._
|
||||
import java.lang.{ UnsupportedOperationException, IllegalStateException }
|
||||
import akka.serialization.{ Serialization, JavaSerializer }
|
||||
import akka.event.EventStream
|
||||
import scala.annotation.tailrec
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.event.LoggingAdapter
|
||||
import scala.collection.JavaConverters
|
||||
|
||||
/**
|
||||
* Immutable and serializable handle to an actor, which may or may not reside
|
||||
|
|
@ -381,7 +379,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
override def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(path)
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -392,6 +390,10 @@ private[akka] class LocalActorRef private[akka] (
|
|||
private[akka] case class SerializedActorRef private (path: String) {
|
||||
import akka.serialization.JavaSerializer.currentSystem
|
||||
|
||||
def this(actorRef: ActorRef) = {
|
||||
this(Serialization.serializedActorPath(actorRef))
|
||||
}
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = currentSystem.value match {
|
||||
case null ⇒
|
||||
|
|
@ -407,11 +409,8 @@ private[akka] case class SerializedActorRef private (path: String) {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object SerializedActorRef {
|
||||
def apply(path: ActorPath): SerializedActorRef = {
|
||||
Serialization.currentTransportAddress.value match {
|
||||
case null ⇒ new SerializedActorRef(path.toSerializationFormat)
|
||||
case addr ⇒ new SerializedActorRef(path.toSerializationFormatWithAddress(addr))
|
||||
}
|
||||
def apply(actorRef: ActorRef): SerializedActorRef = {
|
||||
new SerializedActorRef(actorRef)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -437,7 +436,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
|
|||
override def restart(cause: Throwable): Unit = ()
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(path)
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -470,7 +469,7 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
|||
override val path: ActorPath,
|
||||
val eventStream: EventStream) extends MinimalActorRef {
|
||||
|
||||
override def isTerminated(): Boolean = true
|
||||
override def isTerminated: Boolean = true
|
||||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = {
|
||||
if (Mailbox.debug) println(s"ELAR $path having enqueued $message")
|
||||
|
|
|
|||
|
|
@ -329,7 +329,7 @@ private[akka] object SystemGuardian {
|
|||
*
|
||||
* Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported.
|
||||
*/
|
||||
class LocalActorRefProvider private[akka] (
|
||||
private[akka] class LocalActorRefProvider private[akka] (
|
||||
_systemName: String,
|
||||
override val settings: ActorSystem.Settings,
|
||||
val eventStream: EventStream,
|
||||
|
|
|
|||
|
|
@ -451,7 +451,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
* Verify existence of initial state and setup timers. This should be the
|
||||
* last call within the constructor.
|
||||
*/
|
||||
final def initialize: Unit = makeTransition(currentState)
|
||||
final def initialize(): Unit = makeTransition(currentState)
|
||||
|
||||
/**
|
||||
* Return current state name (i.e. object of type S)
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ private[akka] class RepointableActorRef(
|
|||
def sendSystemMessage(message: SystemMessage) = underlying.sendSystemMessage(message)
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(path)
|
||||
protected def writeReplace(): AnyRef = SerializedActorRef(this)
|
||||
}
|
||||
|
||||
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
|
||||
|
|
|
|||
|
|
@ -613,7 +613,7 @@ private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout](Con
|
|||
case some ⇒ if (!compareAndSet(some, newTimeout)) swap(newTimeout)
|
||||
}
|
||||
|
||||
def isCancelled(): Boolean = get().isCancelled()
|
||||
override def isCancelled: Boolean = get().isCancelled()
|
||||
def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -248,7 +248,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor {
|
||||
val me = withContext[T](createInstance)
|
||||
|
||||
override def supervisorStrategy(): SupervisorStrategy = me match {
|
||||
override def supervisorStrategy: SupervisorStrategy = me match {
|
||||
case l: Supervisor ⇒ l.supervisorStrategy
|
||||
case _ ⇒ super.supervisorStrategy
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ private[akka] object MessageDispatcher {
|
|||
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
|
||||
final val debug = false // Deliberately without type ascription to make it a compile-time constant
|
||||
lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _)
|
||||
def printActors: Unit =
|
||||
def printActors(): Unit =
|
||||
if (debug) {
|
||||
for {
|
||||
d ← actors.keys
|
||||
|
|
@ -242,7 +242,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
|||
/**
|
||||
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
|
||||
*/
|
||||
def suspend(actor: ActorCell): Unit = {
|
||||
protected[akka] def suspend(actor: ActorCell): Unit = {
|
||||
val mbox = actor.mailbox
|
||||
if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
|
||||
mbox.suspend()
|
||||
|
|
@ -251,7 +251,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
|||
/*
|
||||
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
|
||||
*/
|
||||
def resume(actor: ActorCell): Unit = {
|
||||
protected[akka] def resume(actor: ActorCell): Unit = {
|
||||
val mbox = actor.mailbox
|
||||
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
|
||||
registerForExecution(mbox, false, false)
|
||||
|
|
|
|||
|
|
@ -406,10 +406,15 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
|
|||
@tailrec
|
||||
final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
|
||||
val currentList = systemQueueGet
|
||||
if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents)
|
||||
if (currentList.head == NoMessage) new EarliestFirstSystemMessageList(null)
|
||||
else if (systemQueuePut(currentList, newContents)) currentList.reverse
|
||||
else systemDrain(newContents)
|
||||
}
|
||||
|
||||
def hasSystemMessages: Boolean = systemQueueGet.nonEmpty
|
||||
def hasSystemMessages: Boolean = systemQueueGet.head match {
|
||||
case null | NoMessage ⇒ false
|
||||
case _ ⇒ true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -570,7 +570,7 @@ object Logging {
|
|||
/**
|
||||
* Base type of LogEvents
|
||||
*/
|
||||
sealed trait LogEvent {
|
||||
sealed trait LogEvent extends NoSerializationVerificationNeeded {
|
||||
/**
|
||||
* The thread that created this log event
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -276,7 +276,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
|
|||
override def postRestart(reason: Throwable): Unit =
|
||||
throw new IllegalStateException("Restarting not supported for connection actors.")
|
||||
|
||||
private[TcpConnection] case class PendingWrite(
|
||||
private[io] case class PendingWrite(
|
||||
commander: ActorRef,
|
||||
ack: Any,
|
||||
remainingData: ByteString,
|
||||
|
|
|
|||
|
|
@ -75,10 +75,11 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
|||
*/
|
||||
|
||||
def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match {
|
||||
case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil
|
||||
case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil }
|
||||
case msg if route.isDefinedAt(sender, msg) ⇒ route(sender, message)
|
||||
case _ ⇒ Nil
|
||||
case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil
|
||||
case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil }
|
||||
case _ ⇒
|
||||
val payload = (sender, message)
|
||||
if (route isDefinedAt payload) route(payload) else Nil
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.serialization
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.{ Extension, ExtendedActorSystem, Address }
|
||||
import akka.actor._
|
||||
import akka.event.Logging
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
|
@ -21,10 +21,11 @@ object Serialization {
|
|||
type ClassSerializer = (Class[_], Serializer)
|
||||
|
||||
/**
|
||||
* This holds a reference to the current transport address to be inserted
|
||||
* into local actor refs during serialization.
|
||||
* This holds a reference to the current transport serialization information used for
|
||||
* serializing local actor refs.
|
||||
* INTERNAL API
|
||||
*/
|
||||
val currentTransportAddress = new DynamicVariable[Address](null)
|
||||
private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
|
||||
|
||||
class Settings(val config: Config) {
|
||||
val Serializers: Map[String, String] = configToMap("akka.actor.serializers")
|
||||
|
|
@ -35,6 +36,35 @@ object Serialization {
|
|||
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) ⇒ (k -> v.toString) }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialization information needed for serializing local actor refs.
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class Information(address: Address, system: ActorSystem)
|
||||
|
||||
/**
|
||||
* The serialized path of an actorRef, based on the current transport serialization information.
|
||||
* If there is no external address available for the requested address then the systems default
|
||||
* address will be used.
|
||||
*/
|
||||
def serializedActorPath(actorRef: ActorRef): String = {
|
||||
val path = actorRef.path
|
||||
val originalSystem: ExtendedActorSystem = actorRef match {
|
||||
case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem]
|
||||
case _ ⇒ null
|
||||
}
|
||||
Serialization.currentTransportInformation.value match {
|
||||
case null ⇒ path.toSerializationFormat
|
||||
case Information(address, system) ⇒
|
||||
if (originalSystem == null || originalSystem == system)
|
||||
path.toSerializationFormatWithAddress(address)
|
||||
else {
|
||||
val provider = originalSystem.provider
|
||||
path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -57,10 +57,9 @@ object Agent {
|
|||
* Internal helper method
|
||||
*/
|
||||
private final def withinTransaction(run: Runnable): Unit = {
|
||||
def dispatch = updater.execute(run)
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn)
|
||||
case _ ⇒ dispatch
|
||||
case Some(txn) ⇒ Txn.afterCommit(_ ⇒ updater.execute(run))(txn)
|
||||
case _ ⇒ updater.execute(run)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import language.postfixOps
|
|||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.util.Timeout
|
||||
import akka.testkit._
|
||||
import scala.concurrent.stm._
|
||||
|
|
@ -112,7 +113,7 @@ class AgentSpec extends AkkaSpec {
|
|||
agent send (_ * 2)
|
||||
throw new RuntimeException("Expected failure")
|
||||
}
|
||||
} catch { case _ ⇒ }
|
||||
} catch { case NonFatal(_) ⇒ }
|
||||
|
||||
agent send countDown
|
||||
|
||||
|
|
|
|||
|
|
@ -150,7 +150,7 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto
|
|||
try {
|
||||
val endpoint = camelContext.getEndpoint(endpointUri)
|
||||
val processor = new SendProcessor(endpoint)
|
||||
camelObjects += producer -> (endpoint, processor)
|
||||
camelObjects = camelObjects.updated(producer, endpoint -> processor)
|
||||
// if this throws, the supervisor stops the producer and de-registers it on termination
|
||||
processor.start()
|
||||
producer ! CamelProducerObjects(endpoint, processor)
|
||||
|
|
@ -159,10 +159,10 @@ private[camel] class ProducerRegistrar(activationTracker: ActorRef) extends Acto
|
|||
case NonFatal(e) ⇒ throw new ActorActivationException(producer, e)
|
||||
}
|
||||
} else {
|
||||
camelObjects.get(producer).foreach { case (endpoint, processor) ⇒ producer ! CamelProducerObjects(endpoint, processor) }
|
||||
camelObjects.get(producer) foreach { case (endpoint, processor) ⇒ producer ! CamelProducerObjects(endpoint, processor) }
|
||||
}
|
||||
case DeRegister(producer) ⇒
|
||||
camelObjects.get(producer).foreach {
|
||||
camelObjects.get(producer) foreach {
|
||||
case (_, processor) ⇒
|
||||
try {
|
||||
camelObjects.get(producer).foreach(_._2.stop())
|
||||
|
|
|
|||
|
|
@ -70,8 +70,8 @@ class ConcurrentActivationTest extends WordSpec with MustMatchers with NonShared
|
|||
}
|
||||
val (activatedConsumerNames, activatedProducerNames) = partitionNames(activations)
|
||||
val (deactivatedConsumerNames, deactivatedProducerNames) = partitionNames(deactivations)
|
||||
assertContainsSameElements(activatedConsumerNames, deactivatedConsumerNames)
|
||||
assertContainsSameElements(activatedProducerNames, deactivatedProducerNames)
|
||||
assertContainsSameElements(activatedConsumerNames -> deactivatedConsumerNames)
|
||||
assertContainsSameElements(activatedProducerNames -> deactivatedProducerNames)
|
||||
} finally {
|
||||
system.eventStream.publish(TestEvent.UnMute(eventFilter))
|
||||
}
|
||||
|
|
@ -97,7 +97,7 @@ class ConsumerBroadcast(promise: Promise[(Future[List[List[ActorRef]]], Future[L
|
|||
allDeactivationFutures = allDeactivationFutures :+ deactivationListFuture
|
||||
context.actorOf(Props(new Registrar(i, number, activationListPromise, deactivationListPromise)), "registrar-" + i)
|
||||
}
|
||||
promise.success((Future.sequence(allActivationFutures)), Future.sequence(allDeactivationFutures))
|
||||
promise.success(Future.sequence(allActivationFutures) -> Future.sequence(allDeactivationFutures))
|
||||
|
||||
broadcaster = Some(context.actorOf(Props[Registrar] withRouter (BroadcastRouter(routees)), "registrarRouter"))
|
||||
case reg: Any ⇒
|
||||
|
|
|
|||
|
|
@ -282,7 +282,7 @@ object ProducerFeatureTest {
|
|||
}
|
||||
|
||||
override def postStop() {
|
||||
for (msg ← lastMessage; aref ← lastSender) context.parent ! (aref, msg)
|
||||
for (msg ← lastMessage; aref ← lastSender) context.parent ! ((aref, msg))
|
||||
super.postStop()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
|
||||
"asynchronous" when {
|
||||
|
||||
def verifyFailureIsSet {
|
||||
def verifyFailureIsSet(): Unit = {
|
||||
producer.processExchangeAdapter(exchange, asyncCallback)
|
||||
asyncCallback.awaitCalled()
|
||||
verify(exchange).setFailure(any[FailureResult])
|
||||
|
|
@ -158,7 +158,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
"consumer actor doesnt exist" must {
|
||||
"set failure message on exchange" in {
|
||||
producer = given(actor = null, outCapable = true)
|
||||
verifyFailureIsSet
|
||||
verifyFailureIsSet()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -226,7 +226,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
"consumer actor doesnt exist" must {
|
||||
"set failure message on exchange" in {
|
||||
producer = given(actor = null, outCapable = false)
|
||||
verifyFailureIsSet
|
||||
verifyFailureIsSet()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -325,7 +325,7 @@ class ActorProducerTest extends TestKit(ActorSystem("test")) with WordSpec with
|
|||
}
|
||||
}
|
||||
|
||||
trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with MustMatchers with Suite ⇒
|
||||
private[camel] trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with BeforeAndAfterEach { self: TestKit with MustMatchers with Suite ⇒
|
||||
var camel: Camel = _
|
||||
var exchange: CamelExchangeAdapter = _
|
||||
var callback: AsyncCallback = _
|
||||
|
|
@ -427,9 +427,7 @@ trait ActorProducerFixture extends MockitoSugar with BeforeAndAfterAll with Befo
|
|||
}
|
||||
|
||||
def echoActor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case msg ⇒ sender ! "received " + msg
|
||||
}
|
||||
def receive = { case msg ⇒ sender ! "received " + msg }
|
||||
}), name = "echoActor")
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ object Helpers {
|
|||
|
||||
def imp[T: c.WeakTypeTag](c: Context): c.Expr[T] = {
|
||||
import c.universe._
|
||||
c.Expr[T](TypeApply(Ident("implicitly"), List(TypeTree().setType(weakTypeOf[T]))))
|
||||
c.Expr[T](TypeApply(Ident(newTermName("implicitly")), List(TypeTree().setType(weakTypeOf[T]))))
|
||||
}
|
||||
|
||||
def bool(c: Context, b: Boolean): c.Expr[Boolean] = c.Expr[Boolean](c.universe.Literal(c.universe.Constant(b)))
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ private[akka] class ClusterActorRefProvider(
|
|||
*/
|
||||
override def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = {
|
||||
super.useActorOnNode(path, props, deploy, supervisor)
|
||||
remoteDeploymentWatcher ! (actorFor(path), supervisor)
|
||||
remoteDeploymentWatcher ! ((actorFor(path), supervisor))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -229,9 +229,9 @@ private[cluster] case class ClusterHeartbeatSenderState private (
|
|||
heartbeatRequest: Map[Address, Deadline] = Map.empty) {
|
||||
|
||||
// FIXME can be disabled as optimization
|
||||
assertInvariants
|
||||
assertInvariants()
|
||||
|
||||
private def assertInvariants: Unit = {
|
||||
private def assertInvariants(): Unit = {
|
||||
val currentAndEnding = current.intersect(ending.keySet)
|
||||
require(currentAndEnding.isEmpty,
|
||||
s"Same nodes in current and ending not allowed, got [${currentAndEnding}]")
|
||||
|
|
|
|||
|
|
@ -63,9 +63,9 @@ private[cluster] case class Gossip(
|
|||
with Versioned[Gossip] {
|
||||
|
||||
// FIXME can be disabled as optimization
|
||||
assertInvariants
|
||||
assertInvariants()
|
||||
|
||||
private def assertInvariants: Unit = {
|
||||
private def assertInvariants(): Unit = {
|
||||
val unreachableAndLive = members.intersect(overview.unreachable)
|
||||
if (unreachableAndLive.nonEmpty)
|
||||
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
|
||||
|
|
|
|||
|
|
@ -317,7 +317,7 @@ abstract class MixMetricsSelectorBase(selectors: immutable.IndexedSeq[CapacityMe
|
|||
combined.foldLeft(Map.empty[Address, (Double, Int)].withDefaultValue((0.0, 0))) {
|
||||
case (acc, (address, capacity)) ⇒
|
||||
val (sum, count) = acc(address)
|
||||
acc + (address -> (sum + capacity, count + 1))
|
||||
acc + (address -> ((sum + capacity, count + 1)))
|
||||
}.map {
|
||||
case (addr, (sum, count)) ⇒ (addr -> sum / count)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class ClusterMetricsMultiJvmNode5 extends ClusterMetricsSpec
|
|||
abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
import ClusterMetricsMultiJvmSpec._
|
||||
|
||||
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
|
||||
private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
|
||||
|
||||
"Cluster metrics" must {
|
||||
"periodically collect metrics on each node, publish ClusterMetricsChanged to the event stream, " +
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import akka.testkit.TestEvent._
|
|||
* 8. while nodes are removed remote death watch is also exercised
|
||||
* 9. while nodes are removed a few cluster aware routers are also working
|
||||
*/
|
||||
object StressMultiJvmSpec extends MultiNodeConfig {
|
||||
private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
||||
|
||||
// Note that this test uses default configuration,
|
||||
// not MultiNodeClusterSpec.clusterConfig
|
||||
|
|
@ -521,14 +521,14 @@ object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children",
|
||||
totalActors, levels, width)
|
||||
val tree = context.actorOf(Props(new TreeNode(levels, width)), "tree")
|
||||
tree forward (idx, SimpleJob(id, payload))
|
||||
tree forward ((idx, SimpleJob(id, payload)))
|
||||
context.become(treeWorker(tree))
|
||||
}
|
||||
|
||||
def treeWorker(tree: ActorRef): Receive = {
|
||||
case SimpleJob(id, payload) ⇒ sender ! Ack(id)
|
||||
case TreeJob(id, payload, idx, _, _) ⇒
|
||||
tree forward (idx, SimpleJob(id, payload))
|
||||
tree forward ((idx, SimpleJob(id, payload)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -539,7 +539,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector
|
||||
|
||||
def receive = {
|
||||
case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward (idx, job)
|
||||
case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward ((idx, job))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -701,7 +701,7 @@ abstract class StressSpec
|
|||
|
||||
lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver")
|
||||
|
||||
def awaitClusterResult: Unit = {
|
||||
def awaitClusterResult(): Unit = {
|
||||
runOn(roles.head) {
|
||||
val r = clusterResultAggregator
|
||||
watch(r)
|
||||
|
|
@ -734,7 +734,7 @@ abstract class StressSpec
|
|||
}
|
||||
|
||||
}
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
enterBarrier("join-one-" + step)
|
||||
}
|
||||
|
||||
|
|
@ -754,7 +754,7 @@ abstract class StressSpec
|
|||
}
|
||||
|
||||
}
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
enterBarrier("join-several-" + step)
|
||||
}
|
||||
|
||||
|
|
@ -804,7 +804,7 @@ abstract class StressSpec
|
|||
}
|
||||
enterBarrier("watch-verified-" + step)
|
||||
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
enterBarrier("remove-one-" + step)
|
||||
}
|
||||
|
||||
|
|
@ -828,7 +828,7 @@ abstract class StressSpec
|
|||
awaitMembersUp(currentRoles.size, timeout = remaining)
|
||||
}
|
||||
}
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
enterBarrier("remove-several-" + step)
|
||||
}
|
||||
|
||||
|
|
@ -885,7 +885,7 @@ abstract class StressSpec
|
|||
(nextAS, nextAddresses)
|
||||
}
|
||||
}
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
|
||||
step += 1
|
||||
loop(counter + 1, nextAS, nextAddresses)
|
||||
|
|
@ -936,7 +936,7 @@ abstract class StressSpec
|
|||
}
|
||||
}
|
||||
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
}
|
||||
|
||||
def awaitWorkResult: WorkResult = {
|
||||
|
|
@ -983,7 +983,7 @@ abstract class StressSpec
|
|||
|
||||
}
|
||||
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
step += 1
|
||||
}
|
||||
}
|
||||
|
|
@ -1004,7 +1004,7 @@ abstract class StressSpec
|
|||
}
|
||||
}
|
||||
|
||||
awaitClusterResult
|
||||
awaitClusterResult()
|
||||
|
||||
nbrUsedRoles += size
|
||||
enterBarrier("after-" + step)
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
lazy val victim = sortedRoles(1)
|
||||
|
||||
var endBarrierNumber = 0
|
||||
def endBarrier: Unit = {
|
||||
def endBarrier(): Unit = {
|
||||
endBarrierNumber += 1
|
||||
enterBarrier("after_" + endBarrierNumber)
|
||||
}
|
||||
|
|
@ -75,7 +75,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(roles: _*)
|
||||
endBarrier
|
||||
endBarrier()
|
||||
}
|
||||
|
||||
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
|
||||
|
|
@ -125,7 +125,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
}
|
||||
}
|
||||
|
||||
endBarrier
|
||||
endBarrier()
|
||||
}
|
||||
|
||||
"mark the node as DOWN" taggedAs LongRunningTest in {
|
||||
|
|
@ -139,7 +139,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds)
|
||||
|
||||
}
|
||||
endBarrier
|
||||
endBarrier()
|
||||
}
|
||||
|
||||
"allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in {
|
||||
|
|
@ -158,7 +158,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
|
|||
|
||||
awaitMembersUp(roles.size)
|
||||
|
||||
endBarrier
|
||||
endBarrier()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,8 +35,8 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers {
|
|||
val eUp = Member(Address("akka.tcp", "sys", "e", 2552), Up, eRoles)
|
||||
val eDown = Member(Address("akka.tcp", "sys", "e", 2552), Down, eRoles)
|
||||
|
||||
def converge(gossip: Gossip): (Gossip, Set[Address]) =
|
||||
((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) ⇒ (gs._1.seen(m.address), gs._2 + m.address) }
|
||||
private[cluster] def converge(gossip: Gossip): (Gossip, Set[Address]) =
|
||||
((gossip, Set.empty[Address]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.address), as + m.address) }
|
||||
|
||||
"Domain events" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -128,5 +128,5 @@ trait MetricsCollectorFactory { this: AkkaSpec ⇒
|
|||
new JmxMetricsCollector(selfAddress, defaultDecayFactor)
|
||||
}.get
|
||||
|
||||
def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
|
||||
private[cluster] def isSigar(collector: MetricsCollector): Boolean = collector.isInstanceOf[SigarMetricsCollector]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -276,7 +276,7 @@ class TimerBasedThrottler(var rate: Rate) extends Actor with Throttler with FSM[
|
|||
case Active -> Idle ⇒ stopTimer()
|
||||
}
|
||||
|
||||
initialize
|
||||
initialize()
|
||||
|
||||
private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true)
|
||||
private def stopTimer() = cancelTimer("morePermits")
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) {
|
|||
}
|
||||
|
||||
"log info without stackTrace" in {
|
||||
producer ! ("{} is the magic number", 3)
|
||||
producer ! (("{} is the magic number", 3))
|
||||
|
||||
val record = expectMsgType[logging.LogRecord]
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class DataflowSpec extends AkkaSpec with DefaultTimeout {
|
|||
val x = Future("Hello")
|
||||
val y = x map (_.length)
|
||||
|
||||
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
|
||||
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply)
|
||||
|
||||
intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration))
|
||||
}
|
||||
|
|
@ -74,7 +74,7 @@ class DataflowSpec extends AkkaSpec with DefaultTimeout {
|
|||
val x = Future(3)
|
||||
val y = (actor ? "Hello").mapTo[Int]
|
||||
|
||||
val r = flow(x() + y(), 100)
|
||||
val r = flow(x() + y())
|
||||
|
||||
intercept[ClassCastException](Await.result(r, timeout.duration))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,9 +25,9 @@ class DangerousActor extends Actor with ActorLogging {
|
|||
new CircuitBreaker(context.system.scheduler,
|
||||
maxFailures = 5,
|
||||
callTimeout = 10.seconds,
|
||||
resetTimeout = 1.minute).onOpen(notifyMeOnOpen)
|
||||
resetTimeout = 1.minute).onOpen(notifyMeOnOpen())
|
||||
|
||||
def notifyMeOnOpen =
|
||||
def notifyMeOnOpen(): Unit =
|
||||
log.warning("My CircuitBreaker is now open, and will not close for one minute")
|
||||
//#circuit-breaker-initialization
|
||||
|
||||
|
|
|
|||
|
|
@ -564,7 +564,7 @@ public class FutureDocTestBase {
|
|||
return "foo";
|
||||
}
|
||||
}, ec);
|
||||
Future<String> result = Futures.firstCompletedOf(Arrays.asList(future, delayed), ec);
|
||||
Future<String> result = Futures.firstCompletedOf(Arrays.<Future<String>>asList(future, delayed), ec);
|
||||
//#after
|
||||
Await.result(result, Duration.create(2, SECONDS));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import org.junit.Test;
|
|||
import static org.junit.Assert.*;
|
||||
//#imports
|
||||
import akka.actor.*;
|
||||
import akka.remote.RemoteActorRefProvider;
|
||||
import akka.serialization.*;
|
||||
|
||||
//#imports
|
||||
|
|
@ -58,20 +57,10 @@ public class SerializationDocTestBase {
|
|||
//#actorref-serializer
|
||||
// Serialize
|
||||
// (beneath toBinary)
|
||||
final Address transportAddress =
|
||||
Serialization.currentTransportAddress().value();
|
||||
String identifier;
|
||||
String identifier = Serialization.serializedActorPath(theActorRef);
|
||||
|
||||
// If there is no transportAddress,
|
||||
// it means that either this Serializer isn't called
|
||||
// within a piece of code that sets it,
|
||||
// so either you need to supply your own,
|
||||
// or simply use the local path.
|
||||
if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat();
|
||||
else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress);
|
||||
// Then just serialize the identifier however you like
|
||||
|
||||
|
||||
// Deserialize
|
||||
// (beneath fromBinary)
|
||||
final ActorRef deserializedActorRef = theActorSystem.actorFor(identifier);
|
||||
|
|
@ -118,16 +107,20 @@ public class SerializationDocTestBase {
|
|||
}
|
||||
|
||||
//#external-address
|
||||
|
||||
public void demonstrateExternalAddress() {
|
||||
// this is not meant to be run, only to be compiled
|
||||
static
|
||||
//#external-address
|
||||
public class ExternalAddressExample {
|
||||
//#external-address
|
||||
final ActorSystem system = ActorSystem.create();
|
||||
final Address remoteAddr = new Address("", "");
|
||||
// #external-address
|
||||
final Address addr = ExternalAddress.ID.get(system).getAddressFor(remoteAddr);
|
||||
// #external-address
|
||||
//#external-address
|
||||
public String serializeTo(ActorRef ref, Address remote) {
|
||||
return ref.path().toSerializationFormatWithAddress(
|
||||
ExternalAddress.ID.get(system).getAddressFor(remote));
|
||||
}
|
||||
}
|
||||
|
||||
//#external-address
|
||||
|
||||
static
|
||||
//#external-address-default
|
||||
public class DefaultAddressExt implements Extension {
|
||||
|
|
|
|||
|
|
@ -109,8 +109,11 @@ list which classes that should be serialized using it.
|
|||
Serializing ActorRefs
|
||||
---------------------
|
||||
|
||||
All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer,
|
||||
you might want to know how to serialize and deserialize them properly, here's the magic incantation:
|
||||
All ActorRefs are serializable using JavaSerializer, but in case you are writing your
|
||||
own serializer, you might want to know how to serialize and deserialize them properly.
|
||||
In the general case, the local address to be used depends on the type of remote
|
||||
address which shall be the recipient of the serialized information. Use
|
||||
:meth:`Serialization.serializedActorPath(actorRef)` like this:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
|
||||
:include: imports
|
||||
|
|
@ -118,6 +121,22 @@ you might want to know how to serialize and deserialize them properly, here's th
|
|||
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
|
||||
:include: actorref-serializer
|
||||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
different network host: for that it would need to include the system’s remote
|
||||
transport address. An actor system is not limited to having just one remote
|
||||
transport per se, which makes this question a bit more interesting. To find out
|
||||
the appropriate address to use when sending to ``remoteAddr`` you can use
|
||||
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
|
||||
:include: external-address
|
||||
|
||||
.. note::
|
||||
|
||||
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
|
||||
|
|
@ -132,25 +151,6 @@ you might want to know how to serialize and deserialize them properly, here's th
|
|||
include the unique id.
|
||||
|
||||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
different network host: for that it would need to include the system’s remote
|
||||
transport address. An actor system is not limited to having just one remote
|
||||
transport per se, which makes this question a bit more interesting.
|
||||
|
||||
In the general case, the local address to be used depends on the type of remote
|
||||
address which shall be the recipient of the serialized information. Use
|
||||
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system
|
||||
for the appropriate address to use when sending to ``remoteAddr``:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocTestBase.java
|
||||
:include: external-address
|
||||
|
||||
This requires that you know at least which type of address will be supported by
|
||||
the system which will deserialize the resulting actor reference; if you have no
|
||||
concrete address handy you can create a dummy one for the right protocol using
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
|
|||
//#unhandled-elided
|
||||
//#fsm-body
|
||||
|
||||
initialize
|
||||
initialize()
|
||||
}
|
||||
//#simple-fsm
|
||||
object DemoCode {
|
||||
|
|
|
|||
|
|
@ -204,7 +204,7 @@ class CounterService extends Actor {
|
|||
if (backlog.size >= MaxBacklog)
|
||||
throw new ServiceUnavailable(
|
||||
"CounterService not available, lack of initial value")
|
||||
backlog = backlog :+ (sender, msg)
|
||||
backlog :+= (sender -> msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import language.postfixOps
|
|||
import akka.util.Timeout
|
||||
|
||||
object Introduction {
|
||||
def foo = {
|
||||
def foo(): Unit = {
|
||||
//#Consumer-mina
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
|
|
@ -27,7 +27,7 @@ object Introduction {
|
|||
val mina = system.actorOf(Props[MyEndpoint])
|
||||
//#Consumer-mina
|
||||
}
|
||||
def bar = {
|
||||
def bar(): Unit = {
|
||||
//#Consumer
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
|
|
@ -41,7 +41,7 @@ object Introduction {
|
|||
}
|
||||
//#Consumer
|
||||
}
|
||||
def baz = {
|
||||
def baz(): Unit = {
|
||||
//#Producer
|
||||
import akka.actor.Actor
|
||||
import akka.camel.{ Producer, Oneway }
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class DataflowDocSpec extends WordSpec with MustMatchers {
|
|||
val v1, v2 = Promise[Int]()
|
||||
flow {
|
||||
// v1 will become the value of v2 + 10 when v2 gets a value
|
||||
v1 << v2() + 10
|
||||
v1 << 10 + v2()
|
||||
v1() + v2()
|
||||
} onComplete println
|
||||
flow { v2 << 5 } // As you can see, no blocking above!
|
||||
|
|
|
|||
|
|
@ -340,12 +340,12 @@ class FutureDocSpec extends AkkaSpec {
|
|||
def loadPage(s: String) = s
|
||||
val url = "foo bar"
|
||||
def log(cause: Throwable) = ()
|
||||
def watchSomeTV = ()
|
||||
def watchSomeTV(): Unit = ()
|
||||
//#and-then
|
||||
val result = Future { loadPage(url) } andThen {
|
||||
case Failure(exception) ⇒ log(exception)
|
||||
} andThen {
|
||||
case _ ⇒ watchSomeTV
|
||||
case _ ⇒ watchSomeTV()
|
||||
}
|
||||
result foreach println
|
||||
//#and-then
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ object RouterViaProgramDocSpec {
|
|||
|
||||
class ExampleActor1 extends Actor {
|
||||
def receive = {
|
||||
case m @ Message1(nbr) ⇒ sender ! (self, m)
|
||||
case m @ Message1(nbr) ⇒ sender ! ((self, m))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package docs.serialization {
|
||||
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit._
|
||||
//#imports
|
||||
import akka.actor.{ ActorRef, ActorSystem }
|
||||
|
|
@ -16,7 +15,6 @@ package docs.serialization {
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Extension
|
||||
import akka.actor.Address
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
|
||||
//#my-own-serializer
|
||||
class MyOwnSerializer extends Serializer {
|
||||
|
|
@ -164,16 +162,8 @@ package docs.serialization {
|
|||
//#actorref-serializer
|
||||
// Serialize
|
||||
// (beneath toBinary)
|
||||
val identifier: String = Serialization.serializedActorPath(theActorRef)
|
||||
|
||||
// If there is no transportAddress,
|
||||
// it means that either this Serializer isn't called
|
||||
// within a piece of code that sets it,
|
||||
// so either you need to supply your own,
|
||||
// or simply use the local path.
|
||||
val identifier: String = Serialization.currentTransportAddress.value match {
|
||||
case null ⇒ theActorRef.path.toSerializationFormat
|
||||
case address ⇒ theActorRef.path.toSerializationFormatWithAddress(address)
|
||||
}
|
||||
// Then just serialize the identifier however you like
|
||||
|
||||
// Deserialize
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
|||
val probe1 = TestProbe()
|
||||
val probe2 = TestProbe()
|
||||
val actor = system.actorOf(Props[MyDoubleEcho])
|
||||
actor ! (probe1.ref, probe2.ref)
|
||||
actor ! ((probe1.ref, probe2.ref))
|
||||
actor ! "hello"
|
||||
probe1.expectMsg(500 millis, "hello")
|
||||
probe2.expectMsg(500 millis, "hello")
|
||||
|
|
|
|||
|
|
@ -101,12 +101,31 @@ list which classes that should be serialized using it.
|
|||
Serializing ActorRefs
|
||||
---------------------
|
||||
|
||||
All ActorRefs are serializable using JavaSerializer, but in case you are writing your own serializer,
|
||||
you might want to know how to serialize and deserialize them properly, here's the magic incantation:
|
||||
All ActorRefs are serializable using JavaSerializer, but in case you are writing your
|
||||
own serializer, you might want to know how to serialize and deserialize them properly.
|
||||
In the general case, the local address to be used depends on the type of remote
|
||||
address which shall be the recipient of the serialized information. Use
|
||||
:meth:`Serialization.serializedActorPath(actorRef)` like this:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
|
||||
:include: imports,actorref-serializer
|
||||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
different network host: for that it would need to include the system’s remote
|
||||
transport address. An actor system is not limited to having just one remote
|
||||
transport per se, which makes this question a bit more interesting. To find out
|
||||
the appropriate address to use when sending to ``remoteAddr`` you can use
|
||||
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` like this:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
|
||||
:include: external-address
|
||||
|
||||
.. note::
|
||||
|
||||
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
|
||||
|
|
@ -120,24 +139,6 @@ you might want to know how to serialize and deserialize them properly, here's th
|
|||
storage of the reference, you can use ``toStringWithAddress``, which doesn't
|
||||
include the unique id.
|
||||
|
||||
This assumes that serialization happens in the context of sending a message
|
||||
through the remote transport. There are other uses of serialization, though,
|
||||
e.g. storing actor references outside of an actor application (database,
|
||||
durable mailbox, etc.). In this case, it is important to keep in mind that the
|
||||
address part of an actor’s path determines how that actor is communicated with.
|
||||
Storing a local actor path might be the right choice if the retrieval happens
|
||||
in the same logical context, but it is not enough when deserializing it on a
|
||||
different network host: for that it would need to include the system’s remote
|
||||
transport address. An actor system is not limited to having just one remote
|
||||
transport per se, which makes this question a bit more interesting.
|
||||
|
||||
In the general case, the local address to be used depends on the type of remote
|
||||
address which shall be the recipient of the serialized information. Use
|
||||
:meth:`ActorRefProvider.getExternalAddressFor(remoteAddr)` to query the system
|
||||
for the appropriate address to use when sending to ``remoteAddr``:
|
||||
|
||||
.. includecode:: code/docs/serialization/SerializationDocSpec.scala
|
||||
:include: external-address
|
||||
|
||||
This requires that you know at least which type of address will be supported by
|
||||
the system which will deserialize the resulting actor reference; if you have no
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package akka.actor.mailbox.filebased
|
|||
import language.postfixOps
|
||||
|
||||
import akka.actor.mailbox._
|
||||
import scala.concurrent.duration._
|
||||
import org.apache.commons.io.FileUtils
|
||||
import akka.dispatch.Mailbox
|
||||
|
||||
|
|
@ -27,18 +28,13 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
|
|||
"read the file-based section" in {
|
||||
settings.QueuePath must be("file-based")
|
||||
settings.CircuitBreakerMaxFailures must be(5)
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
settings.CircuitBreakerCallTimeout must be(5 seconds)
|
||||
}
|
||||
}
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
|
||||
private[akka] def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
|
||||
|
||||
def clean() {
|
||||
FileUtils.deleteDirectory(new java.io.File(settings.QueuePath))
|
||||
}
|
||||
def clean(): Unit = FileUtils.deleteDirectory(new java.io.File(settings.QueuePath))
|
||||
|
||||
override def atStartup() {
|
||||
clean()
|
||||
|
|
|
|||
|
|
@ -42,21 +42,11 @@ object Crypt {
|
|||
}
|
||||
|
||||
print("""
|
||||
# This config imports the Akka reference configuration.
|
||||
include "akka-reference.conf"
|
||||
|
||||
# In this file you can override any option defined in the 'akka-reference.conf' file.
|
||||
# Copy in all or parts of the 'akka-reference.conf' file and modify as you please.
|
||||
|
||||
akka {
|
||||
remote {
|
||||
netty {
|
||||
secure-cookie = """")
|
||||
print(Crypt.generateSecureCookie)
|
||||
print(""""
|
||||
require-cookie = on
|
||||
}
|
||||
secure-cookie = "%s"
|
||||
require-cookie = on
|
||||
}
|
||||
}
|
||||
""")
|
||||
""".format(Crypt.generateSecureCookie))
|
||||
|
||||
|
|
@ -336,7 +336,7 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
stay
|
||||
}
|
||||
|
||||
initialize
|
||||
initialize()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -574,7 +574,7 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
|
|||
throw BarrierTimeout(d)
|
||||
}
|
||||
|
||||
initialize
|
||||
initialize()
|
||||
|
||||
def handleBarrier(data: Data): State = {
|
||||
log.debug("handleBarrier({})", data)
|
||||
|
|
|
|||
|
|
@ -191,7 +191,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
case EnterBarrier(barrier, timeout) ⇒ barrier
|
||||
case GetAddress(node) ⇒ node.name
|
||||
}
|
||||
stay using d.copy(runningOp = Some(token, sender))
|
||||
stay using d.copy(runningOp = Some(token -> sender))
|
||||
case Event(ToServer(op), Data(channel, Some((token, _)))) ⇒
|
||||
log.error("cannot write {} while waiting for {}", op, token)
|
||||
stay
|
||||
|
|
@ -257,8 +257,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
|||
channel.close()
|
||||
}
|
||||
|
||||
initialize
|
||||
|
||||
initialize()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@ import akka.remote.transport.AssociationHandle._
|
|||
import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle }
|
||||
import akka.serialization.Serialization
|
||||
import akka.util.ByteString
|
||||
import scala.util.control.NonFatal
|
||||
import akka.remote.transport.Transport.InvalidAssociationException
|
||||
import java.io.NotSerializableException
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -332,7 +332,7 @@ private[remote] class EndpointWriter(
|
|||
|
||||
private def serializeMessage(msg: Any): MessageProtocol = handle match {
|
||||
case Some(h) ⇒
|
||||
Serialization.currentTransportAddress.withValue(h.localAddress) {
|
||||
Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, context.system)) {
|
||||
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
|
||||
}
|
||||
case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" +
|
||||
|
|
|
|||
|
|
@ -271,8 +271,17 @@ private[akka] class RemoteActorRefProvider(
|
|||
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
|
||||
case ActorPathExtractor(address, elems) ⇒
|
||||
if (hasAddress(address)) actorFor(rootGuardian, elems)
|
||||
else new RemoteActorRef(transport, transport.localAddressForRemote(address),
|
||||
new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
|
||||
else {
|
||||
val rootPath = RootActorPath(address) / elems
|
||||
try {
|
||||
new RemoteActorRef(transport, transport.localAddressForRemote(address),
|
||||
rootPath, Nobody, props = None, deploy = None)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
log.error(e, "Error while looking up address {}", rootPath.address)
|
||||
new EmptyLocalActorRef(this, rootPath, eventStream)
|
||||
}
|
||||
}
|
||||
case _ ⇒ local.actorFor(ref, path)
|
||||
}
|
||||
|
||||
|
|
@ -378,5 +387,5 @@ private[akka] class RemoteActorRef private[akka] (
|
|||
def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause))
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def writeReplace(): AnyRef = SerializedActorRef(path)
|
||||
private def writeReplace(): AnyRef = SerializedActorRef(this)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
|
|||
*
|
||||
* The remote transport is responsible for sending and receiving messages.
|
||||
* Each transport has an address, which it should provide in
|
||||
* Serialization.currentTransportAddress (thread-local) while serializing
|
||||
* Serialization.currentTransportInformation (thread-local) while serializing
|
||||
* actor references (which might also be part of messages). This address must
|
||||
* be available (i.e. fully initialized) by the time the first message is
|
||||
* received or when the start() method returns, whatever happens first.
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ private[remote] object Remoting {
|
|||
null)
|
||||
}
|
||||
case None ⇒ throw new RemoteTransportException(
|
||||
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null)
|
||||
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString(", ")}]", null)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,10 +6,8 @@ package akka.remote.serialization
|
|||
|
||||
import akka.serialization.{ Serializer, Serialization }
|
||||
import com.google.protobuf.Message
|
||||
import akka.actor.DynamicAccess
|
||||
import akka.actor.{ ActorSystem, ActorRef }
|
||||
import akka.remote.RemoteProtocol.ActorRefProtocol
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ProtobufSerializer {
|
||||
|
||||
|
|
@ -18,11 +16,7 @@ object ProtobufSerializer {
|
|||
* protobuf representation.
|
||||
*/
|
||||
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
|
||||
val identifier: String = Serialization.currentTransportAddress.value match {
|
||||
case null ⇒ ref.path.toSerializationFormat
|
||||
case address ⇒ ref.path.toSerializationFormatWithAddress(address)
|
||||
}
|
||||
ActorRefProtocol.newBuilder.setPath(identifier).build
|
||||
ActorRefProtocol.newBuilder.setPath(Serialization.serializedActorPath(ref)).build
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -109,9 +109,9 @@ class TestTransport(
|
|||
(_) ⇒ defaultShutdown,
|
||||
(_) ⇒ registry.logActivity(ShutdownAttempt(localAddress)))
|
||||
|
||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior()
|
||||
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(())
|
||||
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
|
||||
override def shutdown(): Unit = shutdownBehavior()
|
||||
override def shutdown(): Unit = shutdownBehavior(())
|
||||
|
||||
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
|
||||
registry.getRemoteReadHandlerFor(params._1) match {
|
||||
|
|
|
|||
|
|
@ -210,30 +210,29 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
|
|||
case AssociateUnderlying(remoteAddress, statusPromise) ⇒
|
||||
wrappedTransport.associate(remoteAddress) onComplete {
|
||||
// Slight modification of pipe, only success is sent, failure is propagated to a separate future
|
||||
case Success(handle) ⇒ self ! (handle, statusPromise)
|
||||
case Success(handle) ⇒ self ! ((handle, statusPromise))
|
||||
case Failure(e) ⇒ statusPromise.failure(e)
|
||||
}
|
||||
// Finished outbound association and got back the handle
|
||||
case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒
|
||||
case (handle: AssociationHandle, statusPromise: Promise[AssociationHandle]) ⇒ //FIXME switch to a real message iso Tuple2
|
||||
val wrappedHandle = wrapHandle(handle, associationListener, inbound = false)
|
||||
val naked = nakedAddress(handle.remoteAddress)
|
||||
val inMode = getInboundMode(naked)
|
||||
wrappedHandle.outboundThrottleMode.set(getOutboundMode(naked))
|
||||
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
|
||||
wrappedHandle.readHandlerPromise.future map { _ -> inMode } pipeTo wrappedHandle.throttlerActor
|
||||
handleTable ::= naked -> wrappedHandle
|
||||
statusPromise.success(wrappedHandle)
|
||||
case SetThrottle(address, direction, mode) ⇒
|
||||
val naked = nakedAddress(address)
|
||||
throttlingModes += naked -> (mode, direction)
|
||||
throttlingModes = throttlingModes.updated(naked, (mode, direction))
|
||||
val ok = Future.successful(SetThrottleAck)
|
||||
val allAcks = handleTable.map {
|
||||
Future.sequence(handleTable map {
|
||||
case (`naked`, handle) ⇒ setMode(handle, mode, direction)
|
||||
case _ ⇒ ok
|
||||
}
|
||||
Future.sequence(allAcks).map(_ ⇒ SetThrottleAck) pipeTo sender
|
||||
}).map(_ ⇒ SetThrottleAck) pipeTo sender
|
||||
case ForceDisassociate(address) ⇒
|
||||
val naked = nakedAddress(address)
|
||||
handleTable.foreach {
|
||||
handleTable foreach {
|
||||
case (`naked`, handle) ⇒ handle.disassociate()
|
||||
case _ ⇒
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import java.io.{ IOException, FileNotFoundException, FileInputStream }
|
|||
import java.security._
|
||||
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -92,7 +93,7 @@ private[akka] object NettySSLSupport {
|
|||
trustManagerFactory.init({
|
||||
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = new FileInputStream(trustStorePath)
|
||||
try trustStore.load(fin, trustStorePassword.toCharArray) finally fin.close()
|
||||
try trustStore.load(fin, trustStorePassword.toCharArray) finally Try(fin.close())
|
||||
trustStore
|
||||
})
|
||||
trustManagerFactory.getTrustManagers
|
||||
|
|
@ -140,10 +141,23 @@ private[akka] object NettySSLSupport {
|
|||
factory.init({
|
||||
val keyStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = new FileInputStream(keyStorePath)
|
||||
try keyStore.load(fin, keyStorePassword.toCharArray) finally fin.close()
|
||||
try keyStore.load(fin, keyStorePassword.toCharArray) finally Try(fin.close())
|
||||
keyStore
|
||||
}, keyStorePassword.toCharArray)
|
||||
Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, null, rng); ctx }
|
||||
|
||||
val trustManagers: Option[Array[TrustManager]] = settings.SSLTrustStore map {
|
||||
path ⇒
|
||||
val pwd = settings.SSLTrustStorePassword.map(_.toCharArray).orNull
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
|
||||
trustManagerFactory.init({
|
||||
val trustStore = KeyStore.getInstance(KeyStore.getDefaultType)
|
||||
val fin = new FileInputStream(path)
|
||||
try trustStore.load(fin, pwd) finally Try(fin.close())
|
||||
trustStore
|
||||
})
|
||||
trustManagerFactory.getTrustManagers
|
||||
}
|
||||
Option(SSLContext.getInstance(protocol)) map { ctx ⇒ ctx.init(factory.getKeyManagers, trustManagers.orNull, rng); ctx }
|
||||
} catch {
|
||||
case e: FileNotFoundException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because key store could not be loaded", e)
|
||||
case e: IOException ⇒ throw new RemoteTransportException("Server SSL connection could not be established because: " + e.getMessage, e)
|
||||
|
|
|
|||
|
|
@ -37,14 +37,24 @@ object RemotingSpec {
|
|||
|
||||
class Echo2 extends Actor {
|
||||
def receive = {
|
||||
case "ping" ⇒ sender ! (("pong", sender))
|
||||
case "ping" ⇒ sender ! (("pong", sender))
|
||||
case a: ActorRef ⇒ a ! (("ping", sender))
|
||||
case ("ping", a: ActorRef) ⇒ sender ! (("pong", a))
|
||||
case ("pong", a: ActorRef) ⇒ a ! (("pong", sender.path.toSerializationFormat))
|
||||
}
|
||||
}
|
||||
|
||||
val cfg: Config = ConfigFactory parseString ("""
|
||||
class Proxy(val one: ActorRef, val another: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case s if sender.path == one.path ⇒ another ! s
|
||||
case s if sender.path == another.path ⇒ one ! s
|
||||
}
|
||||
}
|
||||
|
||||
val cfg: Config = ConfigFactory parseString (s"""
|
||||
common-ssl-settings {
|
||||
key-store = "%s"
|
||||
trust-store = "%s"
|
||||
key-store = "${getClass.getClassLoader.getResource("keystore").getPath}"
|
||||
trust-store = "${getClass.getClassLoader.getResource("truststore").getPath}"
|
||||
key-store-password = "changeme"
|
||||
trust-store-password = "changeme"
|
||||
protocol = "TLSv1"
|
||||
|
|
@ -83,10 +93,10 @@ object RemotingSpec {
|
|||
}
|
||||
}
|
||||
|
||||
netty.tcp = ${common-netty-settings}
|
||||
netty.udp = ${common-netty-settings}
|
||||
netty.ssl = ${common-netty-settings}
|
||||
netty.ssl.security = ${common-ssl-settings}
|
||||
netty.tcp = $${common-netty-settings}
|
||||
netty.udp = $${common-netty-settings}
|
||||
netty.ssl = $${common-netty-settings}
|
||||
netty.ssl.security = $${common-ssl-settings}
|
||||
|
||||
test {
|
||||
transport-class = "akka.remote.transport.TestTransport"
|
||||
|
|
@ -104,9 +114,7 @@ object RemotingSpec {
|
|||
/looker/child/grandchild.remote = "akka.test://RemotingSpec@localhost:12345"
|
||||
}
|
||||
}
|
||||
""".format(
|
||||
getClass.getClassLoader.getResource("keystore").getPath,
|
||||
getClass.getClassLoader.getResource("truststore").getPath))
|
||||
""")
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -122,14 +130,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
maximum-payload-bytes = 48000 bytes
|
||||
}
|
||||
""").withFallback(system.settings.config).resolve()
|
||||
val otherSystem = ActorSystem("remote-sys", conf)
|
||||
val remoteSystem = ActorSystem("remote-sys", conf)
|
||||
|
||||
for (
|
||||
(name, proto) ← Seq(
|
||||
"/gonk" -> "tcp",
|
||||
"/zagzag" -> "udp",
|
||||
"/roghtaar" -> "ssl.tcp")
|
||||
) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto))))
|
||||
) deploy(system, Deploy(name, scope = RemoteScope(addr(remoteSystem, proto))))
|
||||
|
||||
def addr(sys: ActorSystem, proto: String) =
|
||||
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
|
||||
|
|
@ -138,12 +146,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
|
||||
}
|
||||
|
||||
val remote = otherSystem.actorOf(Props[Echo2], "echo")
|
||||
val remote = remoteSystem.actorOf(Props[Echo2], "echo")
|
||||
|
||||
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
|
||||
|
||||
private def verifySend(msg: Any)(afterSend: ⇒ Unit) {
|
||||
val bigBounceOther = otherSystem.actorOf(Props(new Actor {
|
||||
val bigBounceOther = remoteSystem.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case x: Int ⇒ sender ! byteStringOfSize(x)
|
||||
case x ⇒ sender ! x
|
||||
|
|
@ -166,16 +174,26 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
system.eventStream.unsubscribe(eventForwarder, classOf[AssociationErrorEvent])
|
||||
system.eventStream.unsubscribe(eventForwarder, classOf[DisassociatedEvent])
|
||||
system.stop(eventForwarder)
|
||||
otherSystem.stop(bigBounceOther)
|
||||
remoteSystem.stop(bigBounceOther)
|
||||
}
|
||||
}
|
||||
|
||||
override def atStartup() = {
|
||||
system.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter.error(start = "AssociationError"),
|
||||
EventFilter.warning(pattern = "received dead letter.*")))
|
||||
remoteSystem.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter[EndpointException](),
|
||||
EventFilter.error(start = "AssociationError"),
|
||||
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
|
||||
}
|
||||
|
||||
private def byteStringOfSize(size: Int) = ByteString.fromArray(Array.fill(size)(42: Byte))
|
||||
|
||||
val maxPayloadBytes = system.settings.config.getBytes("akka.remote.test.maximum-payload-bytes").toInt
|
||||
|
||||
override def afterTermination() {
|
||||
otherSystem.shutdown()
|
||||
remoteSystem.shutdown()
|
||||
AssociationRegistry.clear()
|
||||
}
|
||||
|
||||
|
|
@ -203,16 +221,21 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"send dead letters on remote if actor does not exist" in {
|
||||
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
|
||||
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
|
||||
}(otherSystem)
|
||||
}(remoteSystem)
|
||||
}
|
||||
|
||||
"not be exhausted by sending to broken connections" in {
|
||||
val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]""").
|
||||
withFallback(otherSystem.settings.config)
|
||||
val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig))
|
||||
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo"))
|
||||
withFallback(remoteSystem.settings.config)
|
||||
val moreSystems = Vector.fill(5)(ActorSystem(remoteSystem.name, tcpOnlyConfig))
|
||||
moreSystems foreach { sys ⇒
|
||||
sys.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter[EndpointDisassociatedException](),
|
||||
EventFilter.warning(pattern = "received dead letter.*")))
|
||||
sys.actorOf(Props[Echo2], name = "echo")
|
||||
}
|
||||
val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
|
||||
val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo")
|
||||
val aliveEcho = system.actorFor(RootActorPath(addr(remoteSystem, "tcp")) / "user" / "echo")
|
||||
val n = 100
|
||||
|
||||
// first everything is up and running
|
||||
|
|
@ -229,7 +252,6 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
moreSystems foreach { sys ⇒
|
||||
sys.shutdown()
|
||||
sys.awaitTermination(5.seconds.dilated)
|
||||
sys.isTerminated must be(true)
|
||||
}
|
||||
|
||||
1 to n foreach { x ⇒
|
||||
|
|
@ -259,7 +281,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
}
|
||||
|
||||
"not send to remote re-created actor with same name" in {
|
||||
val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1")
|
||||
val echo = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
|
||||
echo ! 71
|
||||
expectMsg(71)
|
||||
echo ! PoisonPill
|
||||
|
|
@ -267,7 +289,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
echo ! 72
|
||||
expectNoMsg(1.second)
|
||||
|
||||
val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1")
|
||||
val echo2 = remoteSystem.actorOf(Props[Echo1], "otherEcho1")
|
||||
echo2 ! 73
|
||||
expectMsg(73)
|
||||
// msg to old ActorRef (different uid) should not get through
|
||||
|
|
@ -275,7 +297,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
echo ! 74
|
||||
expectNoMsg(1.second)
|
||||
|
||||
otherSystem.actorFor("/user/otherEcho1") ! 75
|
||||
remoteSystem.actorFor("/user/otherEcho1") ! 75
|
||||
expectMsg(75)
|
||||
|
||||
system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76
|
||||
|
|
@ -289,11 +311,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
case s: String ⇒ sender ! context.actorFor(s)
|
||||
}
|
||||
}), "looker")
|
||||
// child is configured to be deployed on remote-sys (otherSystem)
|
||||
l ! (Props[Echo1], "child")
|
||||
// child is configured to be deployed on remote-sys (remoteSystem)
|
||||
l ! ((Props[Echo1], "child"))
|
||||
val child = expectMsgType[ActorRef]
|
||||
// grandchild is configured to be deployed on RemotingSpec (system)
|
||||
child ! (Props[Echo1], "grandchild")
|
||||
child ! ((Props[Echo1], "grandchild"))
|
||||
val grandchild = expectMsgType[ActorRef]
|
||||
grandchild.asInstanceOf[ActorRefScope].isLocal must be(true)
|
||||
grandchild ! 43
|
||||
|
|
@ -313,7 +335,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
child ! PoisonPill
|
||||
expectMsg("postStop")
|
||||
expectMsgType[Terminated].actor must be === child
|
||||
l ! (Props[Echo1], "child")
|
||||
l ! ((Props[Echo1], "child"))
|
||||
val child2 = expectMsgType[ActorRef]
|
||||
child2 ! 45
|
||||
expectMsg(45)
|
||||
|
|
@ -335,7 +357,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (TCP)" in {
|
||||
val r = system.actorOf(Props[Echo1], "gonk")
|
||||
r.path.toString must be ===
|
||||
s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
|
||||
s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
|
||||
r ! 42
|
||||
expectMsg(42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -351,7 +373,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (UDP)" in {
|
||||
val r = system.actorOf(Props[Echo1], "zagzag")
|
||||
r.path.toString must be ===
|
||||
s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
|
||||
s"akka.udp://remote-sys@localhost:${port(remoteSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
|
||||
r ! 42
|
||||
expectMsg(10.seconds, 42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -367,7 +389,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
"be able to use multiple transports and use the appropriate one (SSL)" in {
|
||||
val r = system.actorOf(Props[Echo1], "roghtaar")
|
||||
r.path.toString must be ===
|
||||
s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
|
||||
s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
|
||||
r ! 42
|
||||
expectMsg(10.seconds, 42)
|
||||
EventFilter[Exception]("crash", occurrences = 1).intercept {
|
||||
|
|
@ -415,15 +437,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
}
|
||||
}
|
||||
|
||||
"be able to serialize a local actor ref from another actor system" in {
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.remote.enabled-transports = ["akka.remote.test", "akka.remote.netty.tcp"]
|
||||
akka.remote.test.local-address = "test://other-system@localhost:12347"
|
||||
""").withFallback(remoteSystem.settings.config)
|
||||
val otherSystem = ActorSystem("other-system", config)
|
||||
try {
|
||||
val otherGuy = otherSystem.actorOf(Props[Echo2], "other-guy")
|
||||
// check that we use the specified transport address instead of the default
|
||||
val otherGuyRemoteTcp = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "tcp"))
|
||||
val remoteEchoHereTcp = system.actorFor(s"akka.tcp://remote-sys@localhost:${port(remoteSystem, "tcp")}/user/echo")
|
||||
val proxyTcp = system.actorOf(Props(new Proxy(remoteEchoHereTcp, testActor)), "proxy-tcp")
|
||||
proxyTcp ! otherGuy
|
||||
expectMsg(3.seconds, ("pong", otherGuyRemoteTcp))
|
||||
// now check that we fall back to default when we haven't got a corresponding transport
|
||||
val otherGuyRemoteTest = otherGuy.path.toSerializationFormatWithAddress(addr(otherSystem, "test"))
|
||||
val remoteEchoHereSsl = system.actorFor(s"akka.ssl.tcp://remote-sys@localhost:${port(remoteSystem, "ssl.tcp")}/user/echo")
|
||||
val proxySsl = system.actorOf(Props(new Proxy(remoteEchoHereSsl, testActor)), "proxy-ssl")
|
||||
proxySsl ! otherGuy
|
||||
expectMsg(3.seconds, ("pong", otherGuyRemoteTest))
|
||||
} finally {
|
||||
otherSystem.shutdown()
|
||||
otherSystem.awaitTermination(5.seconds.dilated)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def beforeTermination() {
|
||||
system.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
|
||||
otherSystem.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter[EndpointException](),
|
||||
EventFilter.error(start = "AssociationError"),
|
||||
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ object AkkaProtocolStressTest {
|
|||
losses += seq - maxSeq - 1
|
||||
maxSeq = seq
|
||||
if (seq > limit * 0.9) {
|
||||
controller ! (maxSeq, losses)
|
||||
controller ! ((maxSeq, losses))
|
||||
}
|
||||
} else {
|
||||
controller ! s"Received out of order message. Previous: ${maxSeq} Received: ${seq}"
|
||||
|
|
|
|||
|
|
@ -21,19 +21,19 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
|
|||
"execute default behavior" in {
|
||||
val behavior = defaultBehavior
|
||||
|
||||
Await.result(behavior(), timeout.duration) == 3 must be(true)
|
||||
Await.result(behavior(()), timeout.duration) must be === 3
|
||||
}
|
||||
|
||||
"be able to push generic behavior" in {
|
||||
val behavior = defaultBehavior
|
||||
|
||||
behavior.push((_) ⇒ Promise.successful(4).future)
|
||||
Await.result(behavior(), timeout.duration) must be(4)
|
||||
Await.result(behavior(()), timeout.duration) must be(4)
|
||||
|
||||
behavior.push((_) ⇒ Promise.failed(TestException).future)
|
||||
behavior().value match {
|
||||
case Some(Failure(e)) if e eq TestException ⇒
|
||||
case _ ⇒ fail("Expected exception")
|
||||
behavior(()).value match {
|
||||
case Some(Failure(`TestException`)) ⇒
|
||||
case _ ⇒ fail("Expected exception")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -41,15 +41,15 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
|
|||
val behavior = defaultBehavior
|
||||
behavior.pushConstant(5)
|
||||
|
||||
Await.result(behavior(), timeout.duration) must be(5)
|
||||
Await.result(behavior(), timeout.duration) must be(5)
|
||||
Await.result(behavior(()), timeout.duration) must be(5)
|
||||
Await.result(behavior(()), timeout.duration) must be(5)
|
||||
}
|
||||
|
||||
"be able to push failure behavior" in {
|
||||
val behavior = defaultBehavior
|
||||
behavior.pushError(TestException)
|
||||
|
||||
behavior().value match {
|
||||
behavior(()).value match {
|
||||
case Some(Failure(e)) if e eq TestException ⇒
|
||||
case _ ⇒ fail("Expected exception")
|
||||
}
|
||||
|
|
@ -59,16 +59,16 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
|
|||
val behavior = defaultBehavior
|
||||
|
||||
behavior.pushConstant(5)
|
||||
Await.result(behavior(), timeout.duration) must be(5)
|
||||
Await.result(behavior(()), timeout.duration) must be(5)
|
||||
|
||||
behavior.pushConstant(7)
|
||||
Await.result(behavior(), timeout.duration) must be(7)
|
||||
Await.result(behavior(()), timeout.duration) must be(7)
|
||||
|
||||
behavior.pop()
|
||||
Await.result(behavior(), timeout.duration) must be(5)
|
||||
Await.result(behavior(()), timeout.duration) must be(5)
|
||||
|
||||
behavior.pop()
|
||||
Await.result(behavior(), timeout.duration) must be(3)
|
||||
Await.result(behavior(()), timeout.duration) must be(3)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -78,13 +78,13 @@ class SwitchableLoggedBehaviorSpec extends AkkaSpec with DefaultTimeout {
|
|||
behavior.pop()
|
||||
behavior.pop()
|
||||
|
||||
Await.result(behavior(), timeout.duration) must be(3)
|
||||
Await.result(behavior(()), timeout.duration) must be(3)
|
||||
}
|
||||
|
||||
"enable delayed completition" in {
|
||||
val behavior = defaultBehavior
|
||||
val controlPromise = behavior.pushDelayed
|
||||
val f = behavior()
|
||||
val f = behavior(())
|
||||
|
||||
f.isCompleted must be(false)
|
||||
controlPromise.success(())
|
||||
|
|
|
|||
|
|
@ -111,13 +111,13 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
|
|||
//#test-statsService
|
||||
"show usage of the statsService from one node" in within(15 seconds) {
|
||||
runOn(second) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("done-2")
|
||||
}
|
||||
|
||||
def assertServiceOk: Unit = {
|
||||
def assertServiceOk(): Unit = {
|
||||
val service = system.actorFor(node(third) / "user" / "statsService")
|
||||
// eventually the service should be ok,
|
||||
// first attempts might fail because worker actors not started yet
|
||||
|
|
@ -135,7 +135,7 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig)
|
|||
//#test-statsService
|
||||
|
||||
"show usage of the statsService from all nodes" in within(15 seconds) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
testConductor.enter("done-3")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -94,13 +94,13 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf
|
|||
|
||||
"show usage of the statsService from one node" in within(15 seconds) {
|
||||
runOn(second) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("done-2")
|
||||
}
|
||||
|
||||
def assertServiceOk: Unit = {
|
||||
def assertServiceOk(): Unit = {
|
||||
val service = system.actorFor(node(third) / "user" / "statsService")
|
||||
// eventually the service should be ok,
|
||||
// first attempts might fail because worker actors not started yet
|
||||
|
|
@ -117,7 +117,7 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf
|
|||
//#test-statsService
|
||||
|
||||
"show usage of the statsService from all nodes" in within(15 seconds) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
|
||||
testConductor.enter("done-3")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp
|
|||
testConductor.enter("backend1-started")
|
||||
|
||||
runOn(frontend1) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("frontend1-backend1-ok")
|
||||
|
|
@ -105,7 +105,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp
|
|||
testConductor.enter("all-started")
|
||||
|
||||
runOn(frontend1, frontend2) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("all-ok")
|
||||
|
|
@ -114,7 +114,7 @@ abstract class TransformationSampleSpec extends MultiNodeSpec(TransformationSamp
|
|||
|
||||
}
|
||||
|
||||
def assertServiceOk: Unit = {
|
||||
def assertServiceOk(): Unit = {
|
||||
val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend")
|
||||
// eventually the service should be ok,
|
||||
// backends might not have registered initially
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation
|
|||
testConductor.enter("backend1-started")
|
||||
|
||||
runOn(frontend1) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("frontend1-backend1-ok")
|
||||
|
|
@ -106,7 +106,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation
|
|||
testConductor.enter("all-started")
|
||||
|
||||
runOn(frontend1, frontend2) {
|
||||
assertServiceOk
|
||||
assertServiceOk()
|
||||
}
|
||||
|
||||
testConductor.enter("all-ok")
|
||||
|
|
@ -115,7 +115,7 @@ abstract class TransformationSampleJapiSpec extends MultiNodeSpec(Transformation
|
|||
|
||||
}
|
||||
|
||||
def assertServiceOk: Unit = {
|
||||
def assertServiceOk(): Unit = {
|
||||
val transformationFrontend = system.actorFor("akka://" + system.name + "/user/frontend")
|
||||
// eventually the service should be ok,
|
||||
// backends might not have registered initially
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ abstract class GenericBuncher[A: ClassTag, B](val singleTimeout: FiniteDuration,
|
|||
stop
|
||||
}
|
||||
|
||||
initialize
|
||||
initialize()
|
||||
}
|
||||
|
||||
object Buncher {
|
||||
|
|
|
|||
|
|
@ -133,9 +133,9 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
|
|||
object DiningHakkers {
|
||||
val system = ActorSystem()
|
||||
|
||||
def main(args: Array[String]): Unit = run
|
||||
def main(args: Array[String]): Unit = run()
|
||||
|
||||
def run {
|
||||
def run(): Unit = {
|
||||
//Create 5 chopsticks
|
||||
val chopsticks = for (i ← 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i)
|
||||
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class Chopstick extends Actor with FSM[ChopstickState, TakenBy] {
|
|||
}
|
||||
|
||||
// Initialze the chopstick
|
||||
initialize
|
||||
initialize()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -155,7 +155,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
}
|
||||
|
||||
// Initialize the hakker
|
||||
initialize
|
||||
initialize()
|
||||
|
||||
private def startThinking(duration: FiniteDuration): State = {
|
||||
goto(Thinking) using TakenChopsticks(None, None) forMax duration
|
||||
|
|
@ -169,9 +169,9 @@ object DiningHakkersOnFsm {
|
|||
|
||||
val system = ActorSystem()
|
||||
|
||||
def main(args: Array[String]): Unit = run
|
||||
def main(args: Array[String]): Unit = run()
|
||||
|
||||
def run = {
|
||||
def run(): Unit = {
|
||||
// Create 5 chopsticks
|
||||
val chopsticks = for (i ← 1 to 5) yield system.actorOf(Props[Chopstick], "Chopstick" + i)
|
||||
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
|
||||
|
|
|
|||
|
|
@ -20,9 +20,8 @@ class CreationApplication extends Bootable {
|
|||
val remoteActor =
|
||||
system.actorOf(Props[AdvancedCalculatorActor], "advancedCalculator")
|
||||
|
||||
def doSomething(op: MathOp) = {
|
||||
localActor ! (remoteActor, op)
|
||||
}
|
||||
def doSomething(op: MathOp): Unit =
|
||||
localActor ! ((remoteActor, op))
|
||||
//#setup
|
||||
|
||||
def startup() {
|
||||
|
|
|
|||
|
|
@ -22,9 +22,8 @@ class LookupApplication extends Bootable {
|
|||
val remoteActor = system.actorFor(
|
||||
"akka.tcp://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator")
|
||||
|
||||
def doSomething(op: MathOp) = {
|
||||
actor ! (remoteActor, op)
|
||||
}
|
||||
def doSomething(op: MathOp): Unit =
|
||||
actor ! ((remoteActor, op))
|
||||
//#setup
|
||||
|
||||
def startup() {
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
|
|||
}
|
||||
|
||||
"log info with parameters" in {
|
||||
producer ! ("test x={} y={}", 3, 17)
|
||||
producer ! (("test x={} y={}", 3, 17))
|
||||
|
||||
awaitCond(outputString.contains("----"), 5 seconds)
|
||||
val s = outputString
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
|
|||
private var lastGC = 0l
|
||||
|
||||
// we have to forget about long-gone threads sometime
|
||||
private def gc {
|
||||
private def gc(): Unit = {
|
||||
queues = (Map.newBuilder[CallingThreadMailbox, Set[WeakReference[MessageQueue]]] /: queues) {
|
||||
case (m, (k, v)) ⇒
|
||||
val nv = v filter (_.get ne null)
|
||||
|
|
@ -66,7 +66,7 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
|
|||
val now = System.nanoTime
|
||||
if (now - lastGC > 1000000000l) {
|
||||
lastGC = now
|
||||
gc
|
||||
gc()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -165,16 +165,14 @@ class CallingThreadDispatcher(
|
|||
mbox foreach CallingThreadDispatcherQueues(actor.system).unregisterQueues
|
||||
}
|
||||
|
||||
override def suspend(actor: ActorCell) {
|
||||
protected[akka] override def suspend(actor: ActorCell) {
|
||||
actor.mailbox match {
|
||||
case m: CallingThreadMailbox ⇒
|
||||
m.suspendSwitch.switchOn; m.suspend()
|
||||
case m ⇒
|
||||
m.systemEnqueue(actor.self, Suspend())
|
||||
case m: CallingThreadMailbox ⇒ { m.suspendSwitch.switchOn; m.suspend() }
|
||||
case m ⇒ m.systemEnqueue(actor.self, Suspend())
|
||||
}
|
||||
}
|
||||
|
||||
override def resume(actor: ActorCell) {
|
||||
protected[akka] override def resume(actor: ActorCell) {
|
||||
actor.mailbox match {
|
||||
case mbox: CallingThreadMailbox ⇒
|
||||
val queue = mbox.queue
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class TestActorRef[T <: Actor](
|
|||
|
||||
import TestActorRef.InternalGetActor
|
||||
|
||||
override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
||||
protected override def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor) {
|
||||
override def autoReceiveMessage(msg: Envelope) {
|
||||
msg.message match {
|
||||
|
|
|
|||
|
|
@ -38,5 +38,5 @@ class TestBarrier(count: Int) {
|
|||
}
|
||||
}
|
||||
|
||||
def reset = barrier.reset
|
||||
def reset(): Unit = barrier.reset()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -788,3 +788,13 @@ private[testkit] abstract class CachingPartialFunction[A, B <: AnyRef] extends s
|
|||
final def isDefinedAt(x: A): Boolean = try { cache = `match`(x); true } catch { case NoMatch ⇒ cache = null.asInstanceOf[B]; false }
|
||||
final override def apply(x: A): B = cache
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for implicit conversion to add dilated function to Duration.
|
||||
*/
|
||||
class TestDuration(duration: FiniteDuration) {
|
||||
def dilated(implicit system: ActorSystem): FiniteDuration = {
|
||||
// this cast will succeed unless TestTimeFactor is non-finite (which would be a misconfiguration)
|
||||
(duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,14 +47,4 @@ package object testkit {
|
|||
* Corresponding Java API is available in TestKit.dilated
|
||||
*/
|
||||
implicit def duration2TestDuration(duration: FiniteDuration) = new TestDuration(duration)
|
||||
|
||||
/**
|
||||
* Wrapper for implicit conversion to add dilated function to Duration.
|
||||
*/
|
||||
class TestDuration(duration: FiniteDuration) {
|
||||
def dilated(implicit system: ActorSystem): FiniteDuration = {
|
||||
// this cast will succeed unless TestTimeFactor is non-finite (which would be a misconfiguration)
|
||||
(duration * TestKitExtension(system).TestTimeFactor).asInstanceOf[FiniteDuration]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,13 +109,9 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
|
||||
import TestActorRefSpec._
|
||||
|
||||
override def beforeEach {
|
||||
otherthread = null
|
||||
}
|
||||
override def beforeEach(): Unit = otherthread = null
|
||||
|
||||
private def assertThread {
|
||||
otherthread must (be(null) or equal(thread))
|
||||
}
|
||||
private def assertThread(): Unit = otherthread must (be(null) or equal(thread))
|
||||
|
||||
"A TestActorRef must be an ActorRef, hence it" must {
|
||||
|
||||
|
|
@ -167,7 +163,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
|
||||
counter must be(0)
|
||||
|
||||
assertThread
|
||||
assertThread()
|
||||
}
|
||||
|
||||
"stop when sent a poison pill" in {
|
||||
|
|
@ -185,7 +181,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
case WrappedTerminated(Terminated(`a`)) ⇒ true
|
||||
}
|
||||
a.isTerminated must be(true)
|
||||
assertThread
|
||||
assertThread()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -209,7 +205,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
|
|||
boss ! "sendKill"
|
||||
|
||||
counter must be(0)
|
||||
assertThread
|
||||
assertThread()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue