rename app: ActorSystem to system everywhere
This commit is contained in:
parent
c31695bef4
commit
d381b72061
69 changed files with 344 additions and 343 deletions
|
|
@ -9,15 +9,15 @@ import static org.junit.Assert.*;
|
|||
|
||||
public class JavaAPI {
|
||||
|
||||
private ActorSystem app = ActorSystem.create();
|
||||
private ActorSystem system = ActorSystem.create();
|
||||
|
||||
@Test void mustBeAbleToCreateActorRefFromClass() {
|
||||
ActorRef ref = app.actorOf(JavaAPITestActor.class);
|
||||
ActorRef ref = system.actorOf(JavaAPITestActor.class);
|
||||
assertNotNull(ref);
|
||||
}
|
||||
|
||||
@Test void mustBeAbleToCreateActorRefFromFactory() {
|
||||
ActorRef ref = app.actorOf(new Props().withCreator(new Creator<Actor>() {
|
||||
ActorRef ref = system.actorOf(new Props().withCreator(new Creator<Actor>() {
|
||||
public Actor create() {
|
||||
return new JavaAPITestActor();
|
||||
}
|
||||
|
|
@ -26,7 +26,7 @@ public class JavaAPI {
|
|||
}
|
||||
|
||||
@Test void mustAcceptSingleArgTell() {
|
||||
ActorRef ref = app.actorOf(JavaAPITestActor.class);
|
||||
ActorRef ref = system.actorOf(JavaAPITestActor.class);
|
||||
ref.tell("hallo");
|
||||
ref.tell("hallo", ref);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,9 +19,9 @@ import scala.Right;
|
|||
|
||||
public class JavaFutureTests {
|
||||
|
||||
private final ActorSystem app = ActorSystem.create();
|
||||
private final Timeout t = app.settings().ActorTimeout();
|
||||
private final FutureFactory ff = new FutureFactory(app.dispatcher(), t);
|
||||
private final ActorSystem system = ActorSystem.create();
|
||||
private final Timeout t = system.settings().ActorTimeout();
|
||||
private final FutureFactory ff = new FutureFactory(system.dispatcher(), t);
|
||||
|
||||
@Test public void mustBeAbleToMapAFuture() {
|
||||
Future<String> f1 = ff.future(new Callable<String>() {
|
||||
|
|
@ -41,7 +41,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
f.onResult(new Procedure<String>() {
|
||||
public void apply(String result) {
|
||||
|
|
@ -57,7 +57,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
f.onException(new Procedure<Throwable>() {
|
||||
public void apply(Throwable t) {
|
||||
|
|
@ -74,7 +74,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToExecuteAnOnTimeoutCallback() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
f.onTimeout(new Procedure<Future<String>>() {
|
||||
public void apply(Future<String> future) {
|
||||
|
|
@ -88,7 +88,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
f.onComplete(new Procedure<Future<String>>() {
|
||||
public void apply(akka.dispatch.Future<String> future) {
|
||||
|
|
@ -103,7 +103,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToForeachAFuture() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
f.foreach(new Procedure<String>() {
|
||||
public void apply(String future) {
|
||||
|
|
@ -118,13 +118,13 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToFlatMapAFuture() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
cf.completeWithResult("1000");
|
||||
Future<String> f = cf;
|
||||
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
|
||||
public Future<Integer> apply(String r) {
|
||||
latch.countDown();
|
||||
Promise<Integer> cf = new akka.dispatch.DefaultPromise<Integer>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<Integer> cf = new akka.dispatch.DefaultPromise<Integer>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
cf.completeWithResult(Integer.parseInt(r));
|
||||
return cf;
|
||||
}
|
||||
|
|
@ -137,7 +137,7 @@ public class JavaFutureTests {
|
|||
|
||||
@Test public void mustBeAbleToFilterAFuture() throws Throwable {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, app.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, system.dispatcherFactory().defaultGlobalDispatcher());
|
||||
Future<String> f = cf;
|
||||
Future<String> r = f.filter(new Function<String, Boolean>() {
|
||||
public Boolean apply(String r) {
|
||||
|
|
|
|||
|
|
@ -247,7 +247,7 @@ class ActorRefSpec extends AkkaSpec {
|
|||
out.flush
|
||||
out.close
|
||||
|
||||
Serialization.app.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
val readA = in.readObject
|
||||
|
||||
|
|
@ -257,7 +257,7 @@ class ActorRefSpec extends AkkaSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"throw an exception on deserialize if no app in scope" in {
|
||||
"throw an exception on deserialize if no system in scope" in {
|
||||
val a = actorOf[InnerActor]
|
||||
|
||||
import java.io._
|
||||
|
|
@ -275,7 +275,7 @@ class ActorRefSpec extends AkkaSpec {
|
|||
(intercept[java.lang.IllegalStateException] {
|
||||
in.readObject
|
||||
}).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
|
||||
" Use akka.serialization.Serialization.app.withValue(akkaApplication) { ... }"
|
||||
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }"
|
||||
}
|
||||
|
||||
"must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
|
||||
|
|
@ -292,7 +292,7 @@ class ActorRefSpec extends AkkaSpec {
|
|||
out.flush
|
||||
out.close
|
||||
|
||||
Serialization.app.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
(intercept[java.lang.IllegalStateException] {
|
||||
in.readObject
|
||||
|
|
|
|||
|
|
@ -14,12 +14,12 @@ class DeployerSpec extends AkkaSpec {
|
|||
|
||||
"A Deployer" must {
|
||||
"be able to parse 'akka.actor.deployment._' config elements" in {
|
||||
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupInConfig("/app/service-ping")
|
||||
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupInConfig("/system/service-ping")
|
||||
deployment must be('defined)
|
||||
|
||||
deployment must equal(Some(
|
||||
Deploy(
|
||||
"/app/service-ping",
|
||||
"/system/service-ping",
|
||||
None,
|
||||
RoundRobin,
|
||||
NrOfInstances(3),
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import akka.config.Configuration
|
|||
|
||||
object FSMActorSpec {
|
||||
|
||||
class Latches(implicit app: ActorSystem) {
|
||||
class Latches(implicit system: ActorSystem) {
|
||||
val unlockedLatch = TestLatch()
|
||||
val lockedLatch = TestLatch()
|
||||
val unhandledLatch = TestLatch()
|
||||
|
|
|
|||
|
|
@ -12,12 +12,12 @@ import akka.util.Duration
|
|||
object ForwardActorSpec {
|
||||
val ExpectedMessage = "FOO"
|
||||
|
||||
def createForwardingChain(app: ActorSystem): ActorRef = {
|
||||
val replier = app.actorOf(new Actor {
|
||||
def createForwardingChain(system: ActorSystem): ActorRef = {
|
||||
val replier = system.actorOf(new Actor {
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
})
|
||||
|
||||
def mkforwarder(forwardTo: ActorRef) = app.actorOf(
|
||||
def mkforwarder(forwardTo: ActorRef) = system.actorOf(
|
||||
new Actor {
|
||||
def receive = { case x ⇒ forwardTo forward x }
|
||||
})
|
||||
|
|
|
|||
|
|
@ -68,10 +68,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
|
|||
// val actor = actorOf(new Actor {
|
||||
// def receive = { case Ping ⇒ ticks.countDown }
|
||||
// })
|
||||
// val numActors = app.registry.local.actors.length
|
||||
// val numActors = system.registry.local.actors.length
|
||||
// (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)))
|
||||
// assert(ticks.await(10, TimeUnit.SECONDS))
|
||||
// assert(app.registry.local.actors.length === numActors)
|
||||
// assert(system.registry.local.actors.length === numActors)
|
||||
// }
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -341,7 +341,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
|
|||
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
|
||||
Serialization.app.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
|
||||
|
||||
mNew.method must be(m.method)
|
||||
|
|
@ -360,7 +360,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
|
|||
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
|
||||
Serialization.app.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) {
|
||||
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
|
||||
|
||||
mNew.method must be(m.method)
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ trait TradingSystem {
|
|||
case class MatchingEngineInfo(primary: ME, standby: Option[ME], orderbooks: List[Orderbook])
|
||||
}
|
||||
|
||||
class AkkaTradingSystem(val app: ActorSystem) extends TradingSystem {
|
||||
class AkkaTradingSystem(val system: ActorSystem) extends TradingSystem {
|
||||
type ME = ActorRef
|
||||
type OR = ActorRef
|
||||
|
||||
|
|
@ -70,8 +70,8 @@ class AkkaTradingSystem(val app: ActorSystem) extends TradingSystem {
|
|||
|
||||
def createMatchingEngine(meId: String, orderbooks: List[Orderbook]) =
|
||||
meDispatcher match {
|
||||
case Some(d) ⇒ app.actorOf(Props(new AkkaMatchingEngine(meId, orderbooks)).withDispatcher(d))
|
||||
case _ ⇒ app.actorOf(Props(new AkkaMatchingEngine(meId, orderbooks)))
|
||||
case Some(d) ⇒ system.actorOf(Props(new AkkaMatchingEngine(meId, orderbooks)).withDispatcher(d))
|
||||
case _ ⇒ system.actorOf(Props(new AkkaMatchingEngine(meId, orderbooks)))
|
||||
}
|
||||
|
||||
override def createOrderReceivers: List[ActorRef] = {
|
||||
|
|
@ -91,8 +91,8 @@ class AkkaTradingSystem(val app: ActorSystem) extends TradingSystem {
|
|||
}
|
||||
|
||||
def createOrderReceiver() = orDispatcher match {
|
||||
case Some(d) ⇒ app.actorOf(Props(new AkkaOrderReceiver()).withDispatcher(d))
|
||||
case _ ⇒ app.actorOf(Props(new AkkaOrderReceiver()))
|
||||
case Some(d) ⇒ system.actorOf(Props(new AkkaOrderReceiver()).withDispatcher(d))
|
||||
case _ ⇒ system.actorOf(Props(new AkkaOrderReceiver()))
|
||||
}
|
||||
|
||||
override def start() {
|
||||
|
|
|
|||
|
|
@ -10,12 +10,12 @@ import akka.event.Logging
|
|||
import scala.collection.immutable.TreeMap
|
||||
|
||||
class Report(
|
||||
app: ActorSystem,
|
||||
system: ActorSystem,
|
||||
resultRepository: BenchResultRepository,
|
||||
compareResultWith: Option[String] = None) {
|
||||
|
||||
private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean
|
||||
val log = Logging(app, this)
|
||||
val log = Logging(system, this)
|
||||
|
||||
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
|
||||
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
|
||||
|
|
@ -221,11 +221,11 @@ class Report(
|
|||
sb.append("Args:\n ").append(args)
|
||||
sb.append("\n")
|
||||
|
||||
sb.append("Akka version: ").append(app.settings.ConfigVersion)
|
||||
sb.append("Akka version: ").append(system.settings.ConfigVersion)
|
||||
sb.append("\n")
|
||||
sb.append("Akka config:")
|
||||
for (key ← app.settings.config.keys) {
|
||||
sb.append("\n ").append(key).append("=").append(app.settings.config(key))
|
||||
for (key ← system.settings.config.keys) {
|
||||
sb.append("\n ").append(key).append("=").append(system.settings.config(key))
|
||||
}
|
||||
|
||||
sb.toString
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ class SerializeSpec extends AkkaSpec {
|
|||
out.close()
|
||||
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
|
||||
Serialization.app.withValue(a.asInstanceOf[ActorSystemImpl]) {
|
||||
Serialization.system.withValue(a.asInstanceOf[ActorSystemImpl]) {
|
||||
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
|
||||
(deadLetters eq a.deadLetters) must be(true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,7 +150,7 @@ object Timeout {
|
|||
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
|
||||
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
|
||||
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
|
||||
implicit def defaultTimeout(implicit app: ActorSystem) = app.settings.ActorTimeout
|
||||
implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout
|
||||
}
|
||||
|
||||
trait ActorLogging { this: Actor ⇒
|
||||
|
|
@ -164,17 +164,17 @@ object Actor {
|
|||
/**
|
||||
* This decorator adds invocation logging to a Receive function.
|
||||
*/
|
||||
class LoggingReceive(source: AnyRef, r: Receive)(implicit app: ActorSystem) extends Receive {
|
||||
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
|
||||
def isDefinedAt(o: Any) = {
|
||||
val handled = r.isDefinedAt(o)
|
||||
app.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
|
||||
system.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
|
||||
handled
|
||||
}
|
||||
def apply(o: Any): Unit = r(o)
|
||||
}
|
||||
|
||||
object LoggingReceive {
|
||||
def apply(source: AnyRef, r: Receive)(implicit app: ActorSystem): Receive = r match {
|
||||
def apply(source: AnyRef, r: Receive)(implicit system: ActorSystem): Receive = r match {
|
||||
case _: LoggingReceive ⇒ r
|
||||
case _ ⇒ new LoggingReceive(source, r)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ private[akka] object ActorCell {
|
|||
//vars don't need volatile since it's protected with the mailbox status
|
||||
//Make sure that they are not read/written outside of a message processing (systemInvoke/invoke)
|
||||
private[akka] class ActorCell(
|
||||
val app: ActorSystemImpl,
|
||||
val system: ActorSystemImpl,
|
||||
val self: ActorRef with ScalaActorRef,
|
||||
val props: Props,
|
||||
val parent: ActorRef,
|
||||
|
|
@ -72,13 +72,13 @@ private[akka] class ActorCell(
|
|||
|
||||
import ActorCell._
|
||||
|
||||
final def system = app
|
||||
def systemImpl = system
|
||||
|
||||
protected final def guardian = self
|
||||
|
||||
protected def typedActor = app.typedActor
|
||||
protected def typedActor = system.typedActor
|
||||
|
||||
final def provider = app.provider
|
||||
final def provider = system.provider
|
||||
|
||||
var futureTimeout: Option[Cancellable] = None
|
||||
|
||||
|
|
@ -93,7 +93,7 @@ private[akka] class ActorCell(
|
|||
var stopping = false
|
||||
|
||||
@inline
|
||||
final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher
|
||||
final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
|
||||
|
||||
final def isShutdown: Boolean = mailbox.isClosed
|
||||
|
||||
|
|
@ -141,12 +141,12 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
final def tell(message: Any, sender: ActorRef): Unit =
|
||||
dispatcher.dispatch(this, Envelope(message, if (sender eq null) app.deadLetters else sender))
|
||||
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender))
|
||||
|
||||
final def sender: ActorRef = currentMessage match {
|
||||
case null ⇒ app.deadLetters
|
||||
case null ⇒ system.deadLetters
|
||||
case msg if msg.sender ne null ⇒ msg.sender
|
||||
case _ ⇒ app.deadLetters
|
||||
case _ ⇒ system.deadLetters
|
||||
}
|
||||
|
||||
//This method is in charge of setting up the contextStack and create a new instance of the Actor
|
||||
|
|
@ -174,11 +174,11 @@ private[akka] class ActorCell(
|
|||
actor = created
|
||||
created.preStart()
|
||||
checkReceiveTimeout
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "started (" + actor + ")"))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "started (" + actor + ")"))
|
||||
} catch {
|
||||
case e ⇒
|
||||
try {
|
||||
app.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
system.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
} finally {
|
||||
|
|
@ -188,7 +188,7 @@ private[akka] class ActorCell(
|
|||
|
||||
def recreate(cause: Throwable): Unit = try {
|
||||
val failedActor = actor
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "restarting"))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarting"))
|
||||
val freshActor = newActor()
|
||||
if (failedActor ne null) {
|
||||
val c = currentMessage //One read only plz
|
||||
|
|
@ -202,14 +202,14 @@ private[akka] class ActorCell(
|
|||
}
|
||||
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
|
||||
freshActor.postRestart(cause)
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "restarted"))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarted"))
|
||||
|
||||
dispatcher.resume(this) //FIXME should this be moved down?
|
||||
|
||||
props.faultHandler.handleSupervisorRestarted(cause, self, children)
|
||||
} catch {
|
||||
case e ⇒ try {
|
||||
app.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
system.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
} finally {
|
||||
|
|
@ -228,7 +228,7 @@ private[akka] class ActorCell(
|
|||
val c = children
|
||||
if (c.isEmpty) doTerminate()
|
||||
else {
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "stopping"))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopping"))
|
||||
for (child ← c) child.stop()
|
||||
stopping = true
|
||||
}
|
||||
|
|
@ -239,8 +239,8 @@ private[akka] class ActorCell(
|
|||
if (!stats.contains(child)) {
|
||||
childrenRefs = childrenRefs.updated(child.name, child)
|
||||
childrenStats = childrenStats.updated(child, ChildRestartStats())
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "now supervising " + child))
|
||||
} else app.eventStream.publish(Warning(self, "Already supervising " + child))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now supervising " + child))
|
||||
} else system.eventStream.publish(Warning(self, "Already supervising " + child))
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -254,11 +254,11 @@ private[akka] class ActorCell(
|
|||
case Create() ⇒ create()
|
||||
case Recreate(cause) ⇒ recreate(cause)
|
||||
case Link(subject) ⇒
|
||||
app.deathWatch.subscribe(self, subject)
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "now monitoring " + subject))
|
||||
system.deathWatch.subscribe(self, subject)
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now monitoring " + subject))
|
||||
case Unlink(subject) ⇒
|
||||
app.deathWatch.unsubscribe(self, subject)
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped monitoring " + subject))
|
||||
system.deathWatch.unsubscribe(self, subject)
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped monitoring " + subject))
|
||||
case Suspend() ⇒ suspend()
|
||||
case Resume() ⇒ resume()
|
||||
case Terminate() ⇒ terminate()
|
||||
|
|
@ -267,7 +267,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
} catch {
|
||||
case e ⇒ //Should we really catch everything here?
|
||||
app.eventStream.publish(Error(e, self, "error while processing " + message))
|
||||
system.eventStream.publish(Error(e, self, "error while processing " + message))
|
||||
//TODO FIXME How should problems here be handled?
|
||||
throw e
|
||||
}
|
||||
|
|
@ -286,7 +286,7 @@ private[akka] class ActorCell(
|
|||
case msg ⇒
|
||||
if (stopping) {
|
||||
// receiving Terminated in response to stopping children is too common to generate noise
|
||||
if (!msg.isInstanceOf[Terminated]) app.deadLetterMailbox.enqueue(self, messageHandle)
|
||||
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
|
||||
} else {
|
||||
actor(msg)
|
||||
}
|
||||
|
|
@ -294,7 +294,7 @@ private[akka] class ActorCell(
|
|||
currentMessage = null // reset current message after successful invocation
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.eventStream.publish(Error(e, self, e.getMessage))
|
||||
system.eventStream.publish(Error(e, self, e.getMessage))
|
||||
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
|
|
@ -314,7 +314,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.eventStream.publish(Error(e, self, e.getMessage))
|
||||
system.eventStream.publish(Error(e, self, e.getMessage))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -332,11 +332,11 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
def autoReceiveMessage(msg: Envelope) {
|
||||
if (app.settings.DebugAutoReceive) app.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg))
|
||||
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg))
|
||||
|
||||
if (stopping) msg.message match {
|
||||
case ChildTerminated ⇒ handleChildTerminated(sender)
|
||||
case _ ⇒ app.deadLetterMailbox.enqueue(self, msg)
|
||||
case _ ⇒ system.deadLetterMailbox.enqueue(self, msg)
|
||||
}
|
||||
else msg.message match {
|
||||
case HotSwap(code, discardOld) ⇒ become(code(self), discardOld)
|
||||
|
|
@ -349,7 +349,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
private def doTerminate() {
|
||||
app.provider.evict(self.path.toString)
|
||||
system.provider.evict(self.path.toString)
|
||||
dispatcher.detach(this)
|
||||
|
||||
try {
|
||||
|
|
@ -358,8 +358,8 @@ private[akka] class ActorCell(
|
|||
} finally {
|
||||
try {
|
||||
parent.tell(ChildTerminated, self)
|
||||
app.deathWatch.publish(Terminated(self))
|
||||
if (app.settings.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped"))
|
||||
system.deathWatch.publish(Terminated(self))
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped"))
|
||||
} finally {
|
||||
currentMessage = null
|
||||
clearActorContext()
|
||||
|
|
@ -369,7 +369,7 @@ private[akka] class ActorCell(
|
|||
|
||||
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match {
|
||||
case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause
|
||||
case None ⇒ app.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child"))
|
||||
case None ⇒ system.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child"))
|
||||
}
|
||||
|
||||
final def handleChildTerminated(child: ActorRef): Unit = {
|
||||
|
|
@ -387,7 +387,7 @@ private[akka] class ActorCell(
|
|||
val recvtimeout = receiveTimeout
|
||||
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
|
||||
//Only reschedule if desired and there are currently no more messages to be processed
|
||||
futureTimeout = Some(app.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
|
||||
futureTimeout = Some(system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,14 +14,14 @@ object ActorPath {
|
|||
/**
|
||||
* Create an actor path from a string.
|
||||
*/
|
||||
def apply(app: ActorSystem, path: String): ActorPath =
|
||||
apply(app, split(path))
|
||||
def apply(system: ActorSystem, path: String): ActorPath =
|
||||
apply(system, split(path))
|
||||
|
||||
/**
|
||||
* Create an actor path from an iterable.
|
||||
*/
|
||||
def apply(app: ActorSystem, path: Iterable[String]): ActorPath =
|
||||
path.foldLeft(app.rootPath)(_ / _)
|
||||
def apply(system: ActorSystem, path: Iterable[String]): ActorPath =
|
||||
path.foldLeft(system.rootPath)(_ / _)
|
||||
|
||||
/**
|
||||
* Split a string path into an iterable.
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class LocalActorRef private[akka] (
|
||||
app: ActorSystemImpl,
|
||||
system: ActorSystemImpl,
|
||||
_props: Props,
|
||||
_supervisor: ActorRef,
|
||||
val path: ActorPath,
|
||||
|
|
@ -185,7 +185,7 @@ class LocalActorRef private[akka] (
|
|||
* us to use purely factory methods for creating LocalActorRefs.
|
||||
*/
|
||||
@volatile
|
||||
private var actorCell = new ActorCell(app, this, _props, _supervisor, _receiveTimeout, _hotswap)
|
||||
private var actorCell = new ActorCell(system, this, _props, _supervisor, _receiveTimeout, _hotswap)
|
||||
actorCell.start()
|
||||
|
||||
/**
|
||||
|
|
@ -306,17 +306,17 @@ trait ScalaActorRef { ref: ActorRef ⇒
|
|||
*/
|
||||
|
||||
case class SerializedActorRef(hostname: String, port: Int, path: String) {
|
||||
import akka.serialization.Serialization.app
|
||||
import akka.serialization.Serialization.system
|
||||
|
||||
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
|
||||
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
|
||||
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = {
|
||||
if (app.value eq null) throw new IllegalStateException(
|
||||
if (system.value eq null) throw new IllegalStateException(
|
||||
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
|
||||
" Use akka.serialization.Serialization.app.withValue(akkaApplication) { ... }")
|
||||
app.value.provider.deserialize(this) match {
|
||||
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }")
|
||||
system.value.provider.deserialize(this) match {
|
||||
case Some(actor) ⇒ actor
|
||||
case None ⇒ throw new IllegalStateException("Could not deserialize ActorRef")
|
||||
}
|
||||
|
|
@ -380,7 +380,7 @@ case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
|
|||
object DeadLetterActorRef {
|
||||
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
private def readResolve(): AnyRef = Serialization.app.value.deadLetters
|
||||
private def readResolve(): AnyRef = Serialization.system.value.deadLetters
|
||||
}
|
||||
|
||||
val serialized = new SerializedDeadLetterActorRef
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import com.eaio.uuid.UUID
|
|||
*/
|
||||
trait ActorRefProvider {
|
||||
|
||||
def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(app, props, supervisor, name, false)
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String): ActorRef = actorOf(system, props, supervisor, name, false)
|
||||
|
||||
def actorFor(path: Iterable[String]): Option[ActorRef]
|
||||
|
||||
|
|
@ -36,7 +36,7 @@ trait ActorRefProvider {
|
|||
|
||||
def settings: ActorSystem.Settings
|
||||
|
||||
def init(app: ActorSystemImpl)
|
||||
def init(system: ActorSystemImpl)
|
||||
|
||||
/**
|
||||
* What deployer will be used to resolve deployment configuration?
|
||||
|
|
@ -45,9 +45,9 @@ trait ActorRefProvider {
|
|||
|
||||
private[akka] def scheduler: Scheduler
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef
|
||||
|
||||
private[akka] def evict(path: String): Boolean
|
||||
|
||||
|
|
@ -71,7 +71,7 @@ trait ActorRefProvider {
|
|||
*/
|
||||
trait ActorRefFactory {
|
||||
|
||||
protected def app: ActorSystemImpl
|
||||
protected def systemImpl: ActorSystemImpl
|
||||
|
||||
protected def provider: ActorRefProvider
|
||||
|
||||
|
|
@ -89,7 +89,7 @@ trait ActorRefFactory {
|
|||
Helpers.base64(l)
|
||||
}
|
||||
|
||||
def actorOf(props: Props): ActorRef = provider.actorOf(app, props, guardian, randomName, false)
|
||||
def actorOf(props: Props): ActorRef = provider.actorOf(systemImpl, props, guardian, randomName, false)
|
||||
|
||||
/*
|
||||
* TODO this will have to go at some point, because creating two actors with
|
||||
|
|
@ -99,7 +99,7 @@ trait ActorRefFactory {
|
|||
def actorOf(props: Props, name: String): ActorRef = {
|
||||
if (name == null || name == "" || name.startsWith("$"))
|
||||
throw new ActorInitializationException("actor name must not be null, empty or start with $")
|
||||
provider.actorOf(app, props, guardian, name, false)
|
||||
provider.actorOf(systemImpl, props, guardian, name, false)
|
||||
}
|
||||
|
||||
def actorOf[T <: Actor](implicit m: Manifest[T]): ActorRef = actorOf(Props(m.erasure.asInstanceOf[Class[_ <: Actor]]))
|
||||
|
|
@ -231,16 +231,16 @@ class LocalActorRefProvider(
|
|||
|
||||
val deathWatch = createDeathWatch()
|
||||
|
||||
def init(app: ActorSystemImpl) {
|
||||
rootGuardian = actorOf(app, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
|
||||
guardian = actorOf(app, guardianProps, rootGuardian, "app", true)
|
||||
systemGuardian = actorOf(app, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
|
||||
def init(system: ActorSystemImpl) {
|
||||
rootGuardian = actorOf(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
|
||||
guardian = actorOf(system, guardianProps, rootGuardian, "system", true)
|
||||
systemGuardian = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
|
||||
// chain death watchers so that killing guardian stops the application
|
||||
deathWatch.subscribe(systemGuardian, guardian)
|
||||
deathWatch.subscribe(rootGuardian, systemGuardian)
|
||||
}
|
||||
|
||||
// FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "app" name for now)
|
||||
// FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "system" name for now)
|
||||
def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(guardian), path.tail)
|
||||
|
||||
@tailrec
|
||||
|
|
@ -266,10 +266,10 @@ class LocalActorRefProvider(
|
|||
*/
|
||||
private[akka] def evict(path: String): Boolean = actors.remove(path) ne null
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(app, props, supervisor, supervisor.path / name, systemService)
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(system, props, supervisor, supervisor.path / name, systemService)
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
|
||||
val name = path.name
|
||||
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
|
||||
|
||||
|
|
@ -280,7 +280,7 @@ class LocalActorRefProvider(
|
|||
|
||||
// create a local actor
|
||||
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, DeploymentConfig.LocalScope)) ⇒
|
||||
new LocalActorRef(app, props, supervisor, path, systemService) // create a local actor
|
||||
new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
|
||||
|
||||
// create a routed actor ref
|
||||
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.LocalScope)) ⇒
|
||||
|
|
@ -299,10 +299,10 @@ class LocalActorRefProvider(
|
|||
|
||||
val connections: Iterable[ActorRef] = (1 to nrOfInstances.factor) map { i ⇒
|
||||
val routedPath = path.parent / (path.name + ":" + i)
|
||||
new LocalActorRef(app, props, supervisor, routedPath, systemService)
|
||||
new LocalActorRef(system, props, supervisor, routedPath, systemService)
|
||||
}
|
||||
|
||||
actorOf(app, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
|
||||
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, path.toString)
|
||||
|
||||
case unknown ⇒ throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
|
||||
}
|
||||
|
|
@ -327,7 +327,7 @@ class LocalActorRefProvider(
|
|||
/**
|
||||
* Creates (or fetches) a routed actor reference, configured by the 'props: RoutedProps' configuration.
|
||||
*/
|
||||
def actorOf(app: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
// FIXME: this needs to take supervision into account!
|
||||
|
||||
//FIXME clustering should be implemented by cluster actor ref provider
|
||||
|
|
@ -340,7 +340,7 @@ class LocalActorRefProvider(
|
|||
// val localOnly = props.localOnly
|
||||
// if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
|
||||
// else new RoutedActorRef(props, address)
|
||||
new RoutedActorRef(app, props, supervisor, name)
|
||||
new RoutedActorRef(system, props, supervisor, name)
|
||||
}
|
||||
|
||||
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(ActorPath.split(actor.path))
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst
|
|||
|
||||
val settings = new Settings(config)
|
||||
|
||||
protected def app = this
|
||||
protected def systemImpl = this
|
||||
|
||||
private[akka] def systemActorOf(props: Props, address: String): ActorRef = provider.actorOf(this, props, systemGuardian, address, true)
|
||||
|
||||
|
|
|
|||
|
|
@ -14,14 +14,14 @@ import akka.util.Bootable
|
|||
*/
|
||||
trait BootableActorLoaderService extends Bootable {
|
||||
|
||||
def app: ActorSystem
|
||||
def system: ActorSystem
|
||||
|
||||
val BOOT_CLASSES = app.settings.BootClasses
|
||||
val BOOT_CLASSES = system.settings.BootClasses
|
||||
lazy val applicationLoader = createApplicationClassLoader()
|
||||
|
||||
protected def createApplicationClassLoader(): Option[ClassLoader] = Some({
|
||||
if (app.settings.Home.isDefined) {
|
||||
val DEPLOY = app.settings.Home.get + "/deploy"
|
||||
if (system.settings.Home.isDefined) {
|
||||
val DEPLOY = system.settings.Home.get + "/deploy"
|
||||
val DEPLOY_DIR = new File(DEPLOY)
|
||||
if (!DEPLOY_DIR.exists) {
|
||||
System.exit(-1)
|
||||
|
|
@ -59,11 +59,11 @@ trait BootableActorLoaderService extends Bootable {
|
|||
super.onUnload()
|
||||
|
||||
// FIXME shutdown all actors
|
||||
// app.registry.local.shutdownAll
|
||||
// system.registry.local.shutdownAll
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API for the default JAX-RS/Mist Initializer
|
||||
*/
|
||||
class DefaultBootableActorLoaderService(val app: ActorSystem) extends BootableActorLoaderService
|
||||
class DefaultBootableActorLoaderService(val system: ActorSystem) extends BootableActorLoaderService
|
||||
|
|
|
|||
|
|
@ -28,14 +28,14 @@ object FSM {
|
|||
case object StateTimeout
|
||||
case class TimeoutMarker(generation: Long)
|
||||
|
||||
case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit app: ActorSystem) {
|
||||
case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit system: ActorSystem) {
|
||||
private var ref: Option[Cancellable] = _
|
||||
|
||||
def schedule(actor: ActorRef, timeout: Duration) {
|
||||
if (repeat) {
|
||||
ref = Some(app.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit))
|
||||
ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit))
|
||||
} else {
|
||||
ref = Some(app.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit))
|
||||
ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -290,7 +290,7 @@ private[akka] object IOWorker {
|
|||
case object Shutdown extends Request
|
||||
}
|
||||
|
||||
private[akka] class IOWorker(app: ActorSystem, ioManager: ActorRef, val bufferSize: Int) {
|
||||
private[akka] class IOWorker(system: ActorSystem, ioManager: ActorRef, val bufferSize: Int) {
|
||||
import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT }
|
||||
import IOWorker._
|
||||
|
||||
|
|
|
|||
|
|
@ -57,17 +57,17 @@ object TypedActor {
|
|||
//TODO implement writeObject and readObject to serialize
|
||||
//TODO Possible optimization is to special encode the parameter-types to conserve space
|
||||
private def readResolve(): AnyRef = {
|
||||
val app = akka.serialization.Serialization.app.value
|
||||
if (app eq null) throw new IllegalStateException(
|
||||
val system = akka.serialization.Serialization.system.value
|
||||
if (system eq null) throw new IllegalStateException(
|
||||
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
|
||||
" Use akka.serialization.Serialization.app.withValue(akkaApplication) { ... }")
|
||||
MethodCall(app.serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
|
||||
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }")
|
||||
MethodCall(system.serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
|
||||
case null ⇒ null
|
||||
case a if a.length == 0 ⇒ Array[AnyRef]()
|
||||
case a ⇒
|
||||
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
|
||||
for (i ← 0 until a.length) {
|
||||
deserializedParameters(i) = app.serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
|
||||
deserializedParameters(i) = system.serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
|
||||
}
|
||||
deserializedParameters
|
||||
})
|
||||
|
|
@ -101,22 +101,22 @@ object TypedActor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the akka app (for a TypedActor) when inside a method call in a TypedActor.
|
||||
* Returns the akka system (for a TypedActor) when inside a method call in a TypedActor.
|
||||
*/
|
||||
def app = appReference.get match {
|
||||
case null ⇒ throw new IllegalStateException("Calling TypedActor.app outside of a TypedActor implementation method!")
|
||||
def system = appReference.get match {
|
||||
case null ⇒ throw new IllegalStateException("Calling TypedActor.system outside of a TypedActor implementation method!")
|
||||
case some ⇒ some
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the default dispatcher (for a TypedActor) when inside a method call in a TypedActor.
|
||||
*/
|
||||
implicit def dispatcher = app.dispatcher
|
||||
implicit def dispatcher = system.dispatcher
|
||||
|
||||
/**
|
||||
* Returns the default timeout (for a TypedActor) when inside a method call in a TypedActor.
|
||||
*/
|
||||
implicit def timeout = app.settings.ActorTimeout
|
||||
implicit def timeout = system.settings.ActorTimeout
|
||||
}
|
||||
|
||||
trait TypedActorFactory { this: ActorRefFactory ⇒
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ class NodeAddress(val clusterName: String, val nodeName: String) {
|
|||
*/
|
||||
object NodeAddress {
|
||||
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
|
||||
def apply(app: ActorSystem): NodeAddress = new NodeAddress(app.settings.ClusterName, app.nodename)
|
||||
def apply(system: ActorSystem): NodeAddress = new NodeAddress(system.settings.ClusterName, system.nodename)
|
||||
|
||||
def unapply(other: Any) = other match {
|
||||
case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName))
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class FilesystemImporter(val baseDir: String) extends Importer {
|
|||
|
||||
/**
|
||||
* An Importer that looks for imported config files in the java resources
|
||||
* of the system class loader (usually the jar used to launch this app).
|
||||
* of the system class loader (usually the jar used to launch this system).
|
||||
*/
|
||||
class ResourceImporter(classLoader: ClassLoader) extends Importer {
|
||||
def importFile(filename: String): String = {
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ object MessageDispatcher {
|
|||
val SCHEDULED = 1
|
||||
val RESCHEDULED = 2
|
||||
|
||||
implicit def defaultDispatcher(implicit app: ActorSystem) = app.dispatcher
|
||||
implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -205,7 +205,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
|
|||
}
|
||||
} catch {
|
||||
case e ⇒
|
||||
actor.app.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
|
||||
actor.system.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas
|
|||
super.unsubscribe(subscriber)
|
||||
}
|
||||
|
||||
def start(app: ActorSystemImpl) {
|
||||
reaper = app.systemActorOf(Props(new Actor {
|
||||
def start(system: ActorSystemImpl) {
|
||||
reaper = system.systemActorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case ref: ActorRef ⇒ watch(ref)
|
||||
case Terminated(ref) ⇒ unsubscribe(ref)
|
||||
|
|
|
|||
|
|
@ -79,13 +79,13 @@ trait LoggingBus extends ActorEventBus {
|
|||
publish(Info(this, "StandardOutLogger started"))
|
||||
}
|
||||
|
||||
private[akka] def startDefaultLoggers(app: ActorSystemImpl) {
|
||||
val level = levelFor(app.settings.LogLevel) getOrElse {
|
||||
StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + app.settings.LogLevel))
|
||||
private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
|
||||
val level = levelFor(system.settings.LogLevel) getOrElse {
|
||||
StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + system.settings.LogLevel))
|
||||
ErrorLevel
|
||||
}
|
||||
try {
|
||||
val defaultLoggers = app.settings.EventHandlers match {
|
||||
val defaultLoggers = system.settings.EventHandlers match {
|
||||
case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil
|
||||
case loggers ⇒ loggers
|
||||
}
|
||||
|
|
@ -95,7 +95,7 @@ trait LoggingBus extends ActorEventBus {
|
|||
} yield {
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(app, actorClass, level)
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level)
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
} catch {
|
||||
|
|
@ -138,9 +138,9 @@ trait LoggingBus extends ActorEventBus {
|
|||
publish(Info(this, "all default loggers stopped"))
|
||||
}
|
||||
|
||||
private def addLogger(app: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
|
||||
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
|
||||
val name = "log" + loggerId.incrementAndGet + "-" + simpleName(clazz)
|
||||
val actor = app.systemActorOf(Props(clazz), name)
|
||||
val actor = system.systemActorOf(Props(clazz), name)
|
||||
implicit val timeout = Timeout(3 seconds)
|
||||
val response = try actor ? InitializeLogger(this) get catch {
|
||||
case _: FutureTimeoutException ⇒
|
||||
|
|
@ -237,12 +237,12 @@ object Logging {
|
|||
* Obtain LoggingAdapter for the given application and source object. The
|
||||
* source object is used to identify the source of this logging channel.
|
||||
*/
|
||||
def apply(app: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(app.eventStream, source)
|
||||
def apply(system: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(system.eventStream, source)
|
||||
/**
|
||||
* Java API: Obtain LoggingAdapter for the given application and source object. The
|
||||
* source object is used to identify the source of this logging channel.
|
||||
*/
|
||||
def getLogger(app: ActorSystem, source: AnyRef): LoggingAdapter = apply(app, source)
|
||||
def getLogger(system: ActorSystem, source: AnyRef): LoggingAdapter = apply(system, source)
|
||||
/**
|
||||
* Obtain LoggingAdapter for the given event bus and source object. The
|
||||
* source object is used to identify the source of this logging channel.
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept
|
|||
override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter)
|
||||
}
|
||||
|
||||
abstract class RemoteSupport(val app: ActorSystem) {
|
||||
abstract class RemoteSupport(val system: ActorSystem) {
|
||||
/**
|
||||
* Shuts down the remoting
|
||||
*/
|
||||
|
|
@ -162,7 +162,7 @@ abstract class RemoteSupport(val app: ActorSystem) {
|
|||
recipient: ActorRef,
|
||||
loader: Option[ClassLoader]): Unit
|
||||
|
||||
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.eventStream.publish(message)
|
||||
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = system.eventStream.publish(message)
|
||||
|
||||
override def toString = name
|
||||
}
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ object Routing {
|
|||
/**
|
||||
* An Abstract convenience implementation for building an ActorReference that uses a Router.
|
||||
*/
|
||||
abstract private[akka] class AbstractRoutedActorRef(val app: ActorSystem, val props: RoutedProps) extends UnsupportedActorRef {
|
||||
abstract private[akka] class AbstractRoutedActorRef(val system: ActorSystem, val props: RoutedProps) extends UnsupportedActorRef {
|
||||
val router = props.routerFactory()
|
||||
|
||||
override def tell(message: Any, sender: ActorRef) = router.route(message)(sender)
|
||||
|
|
@ -167,7 +167,7 @@ abstract private[akka] class AbstractRoutedActorRef(val app: ActorSystem, val pr
|
|||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
||||
* on (or more) of these actors.
|
||||
*/
|
||||
private[akka] class RoutedActorRef(app: ActorSystem, val routedProps: RoutedProps, val supervisor: ActorRef, override val name: String) extends AbstractRoutedActorRef(app, routedProps) {
|
||||
private[akka] class RoutedActorRef(system: ActorSystem, val routedProps: RoutedProps, val supervisor: ActorRef, override val name: String) extends AbstractRoutedActorRef(system, routedProps) {
|
||||
|
||||
val path = supervisor.path / name
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ case class NoSerializerFoundException(m: String) extends AkkaException(m)
|
|||
* Serialization module. Contains methods for serialization and deserialization as well as
|
||||
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
|
||||
*/
|
||||
class Serialization(val app: ActorSystemImpl) {
|
||||
class Serialization(val system: ActorSystemImpl) {
|
||||
|
||||
//TODO document me
|
||||
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
|
||||
|
|
@ -28,7 +28,7 @@ class Serialization(val app: ActorSystemImpl) {
|
|||
clazz: Class[_],
|
||||
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
|
||||
try {
|
||||
Serialization.app.withValue(app) {
|
||||
Serialization.system.withValue(system) {
|
||||
Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader))
|
||||
}
|
||||
} catch { case e: Exception ⇒ Left(e) }
|
||||
|
|
@ -70,7 +70,7 @@ class Serialization(val app: ActorSystemImpl) {
|
|||
* But "default" can be overridden in config
|
||||
*/
|
||||
val serializers: Map[String, Serializer] =
|
||||
app.settings.config.getSection("akka.actor.serializers")
|
||||
system.settings.config.getSection("akka.actor.serializers")
|
||||
.map(_.map)
|
||||
.getOrElse(Map())
|
||||
.foldLeft(Map[String, Serializer]("default" -> akka.serialization.JavaSerializer)) {
|
||||
|
|
@ -81,7 +81,7 @@ class Serialization(val app: ActorSystemImpl) {
|
|||
/**
|
||||
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
|
||||
*/
|
||||
val bindings: Map[String, String] = app.settings.config.getSection("akka.actor.serialization-bindings") map {
|
||||
val bindings: Map[String, String] = system.settings.config.getSection("akka.actor.serialization-bindings") map {
|
||||
_.map.foldLeft(Map[String, String]()) {
|
||||
case (result, (k: String, vs: List[_])) ⇒ result ++ (vs collect { case v: String ⇒ (v, k) }) //All keys which are lists, take the Strings from them and Map them
|
||||
case (result, _) ⇒ result //For any other values, just skip them, TODO: print out warnings?
|
||||
|
|
@ -102,6 +102,6 @@ class Serialization(val app: ActorSystemImpl) {
|
|||
|
||||
object Serialization {
|
||||
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
|
||||
val app = new DynamicVariable[ActorSystemImpl](null)
|
||||
val system = new DynamicVariable[ActorSystemImpl](null)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.actor.ActorSystem
|
|||
/*
|
||||
* This class is responsible for booting up a stack of bundles and then shutting them down
|
||||
*/
|
||||
class AkkaLoader(app: ActorSystem) {
|
||||
class AkkaLoader(system: ActorSystem) {
|
||||
private val hasBooted = new Switch(false)
|
||||
|
||||
@volatile
|
||||
|
|
|
|||
|
|
@ -278,7 +278,7 @@ abstract class Duration extends Serializable {
|
|||
def /(other: Duration): Double
|
||||
def unary_- : Duration
|
||||
def finite_? : Boolean
|
||||
def dilated(implicit app: ActorSystem): Duration = this * app.settings.TestTimeFactor
|
||||
def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
|
||||
def min(other: Duration): Duration = if (this < other) this else other
|
||||
def max(other: Duration): Duration = if (this > other) this else other
|
||||
def sleep(): Unit = Thread.sleep(toMillis)
|
||||
|
|
|
|||
|
|
@ -18,20 +18,20 @@ object JMX {
|
|||
def nameFor(hostname: String, service: String, bean: String): ObjectName =
|
||||
new ObjectName("akka.%s:type=%s,name=%s".format(hostname, service, bean.replace(":", "_")))
|
||||
|
||||
def register(name: ObjectName, mbean: AnyRef)(implicit app: ActorSystem): Option[ObjectInstance] = try {
|
||||
def register(name: ObjectName, mbean: AnyRef)(implicit system: ActorSystem): Option[ObjectInstance] = try {
|
||||
Some(mbeanServer.registerMBean(mbean, name))
|
||||
} catch {
|
||||
case e: InstanceAlreadyExistsException ⇒
|
||||
Some(mbeanServer.getObjectInstance(name))
|
||||
case e: Exception ⇒
|
||||
app.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
|
||||
system.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
|
||||
None
|
||||
}
|
||||
|
||||
def unregister(mbean: ObjectName)(implicit app: ActorSystem) = try {
|
||||
def unregister(mbean: ObjectName)(implicit system: ActorSystem) = try {
|
||||
mbeanServer.unregisterMBean(mbean)
|
||||
} catch {
|
||||
case e: InstanceNotFoundException ⇒ {}
|
||||
case e: Exception ⇒ app.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
|
||||
case e: Exception ⇒ system.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ trait CamelService extends Bootable {
|
|||
* Starts this CamelService.
|
||||
*/
|
||||
def start: CamelService = {
|
||||
// Only init and start if not already done by app
|
||||
// Only init and start if not already done by system
|
||||
if (!CamelContextManager.initialized) CamelContextManager.init
|
||||
if (!CamelContextManager.started) CamelContextManager.start
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ trait Consumer { this: Actor ⇒
|
|||
|
||||
/**
|
||||
* Determines whether one-way communications between an endpoint and this consumer actor
|
||||
* should be auto-acknowledged or app-acknowledged.
|
||||
* should be auto-acknowledged or system-acknowledged.
|
||||
*/
|
||||
def autoack = true
|
||||
|
||||
|
|
@ -79,7 +79,7 @@ abstract class UntypedConsumerActor extends UntypedActor with Consumer {
|
|||
|
||||
/**
|
||||
* Determines whether one-way communications between an endpoint and this consumer actor
|
||||
* should be auto-acknowledged or app-acknowledged.
|
||||
* should be auto-acknowledged or system-acknowledged.
|
||||
*/
|
||||
def isAutoack() = super.autoack
|
||||
}
|
||||
|
|
|
|||
|
|
@ -208,7 +208,7 @@ object Message {
|
|||
}
|
||||
|
||||
/**
|
||||
* Positive acknowledgement message (used for app-acknowledged message receipts).
|
||||
* Positive acknowledgement message (used for system-acknowledged message receipts).
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ trait ProducerSupport { this: Actor ⇒
|
|||
/**
|
||||
* Returns the names of message headers to copy from a request message to a response message.
|
||||
* By default only the Message.MessageExchangeId is copied. Applications may override this to
|
||||
* define an app-specific set of message headers to copy.
|
||||
* define an system-specific set of message headers to copy.
|
||||
*/
|
||||
def headersToCopy: Set[String] = headersToCopyDefault
|
||||
|
||||
|
|
|
|||
|
|
@ -138,15 +138,15 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
|
|||
|
||||
"An non auto-acknowledging consumer" when {
|
||||
"started" must {
|
||||
"must support acknowledgements on app level" in {
|
||||
"must support acknowledgements on system level" in {
|
||||
|
||||
var consumer: ActorRef = null
|
||||
|
||||
service.awaitEndpointActivation(1) {
|
||||
consumer = actorOf(new TestAckConsumer("direct:app-ack-test"))
|
||||
consumer = actorOf(new TestAckConsumer("direct:system-ack-test"))
|
||||
} must be(true)
|
||||
|
||||
val endpoint = mandatoryContext.getEndpoint("direct:app-ack-test", classOf[DirectEndpoint])
|
||||
val endpoint = mandatoryContext.getEndpoint("direct:system-ack-test", classOf[DirectEndpoint])
|
||||
val producer = endpoint.createProducer.asInstanceOf[AsyncProcessor]
|
||||
val exchange = endpoint.createExchange
|
||||
|
||||
|
|
|
|||
|
|
@ -9,13 +9,13 @@ import java.io.File
|
|||
|
||||
/*
|
||||
A simple use of BookKeeper is to implement a write-ahead transaction log. A server maintains an in-memory data structure
|
||||
(with periodic snapshots for example) and logs changes to that structure before it applies the change. The app
|
||||
(with periodic snapshots for example) and logs changes to that structure before it applies the change. The system
|
||||
server creates a ledger at startup and store the ledger id and password in a well known place (ZooKeeper maybe). When
|
||||
it needs to make a change, the server adds an entry with the change information to a ledger and apply the change when
|
||||
BookKeeper adds the entry successfully. The server can even use asyncAddEntry to queue up many changes for high change
|
||||
throughput. BooKeeper meticulously logs the changes in order and call the completion functions in order.
|
||||
|
||||
When the app server dies, a backup server will come online, get the last snapshot and then it will open the
|
||||
When the system server dies, a backup server will come online, get the last snapshot and then it will open the
|
||||
ledger of the old server and read all the entries from the time the snapshot was taken. (Since it doesn't know the last
|
||||
entry number it will use MAX_INTEGER). Once all the entries have been processed, it will close the ledger and start a
|
||||
new one for its use.
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import System.{currentTimeMillis => now}
|
|||
import java.util.concurrent.CountDownLatch
|
||||
//#imports
|
||||
|
||||
//#app
|
||||
//#system
|
||||
object Pi extends App {
|
||||
|
||||
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
|
||||
|
|
@ -127,5 +127,5 @@ object Pi extends App {
|
|||
latch.await()
|
||||
}
|
||||
}
|
||||
//#app
|
||||
//#system
|
||||
|
||||
|
|
|
|||
|
|
@ -11,10 +11,10 @@
|
|||
// import java.util.concurrent.CountDownLatch
|
||||
// //#imports
|
||||
|
||||
// //#app
|
||||
// //#system
|
||||
// object Pi extends App {
|
||||
|
||||
// val app = ActorSystem()
|
||||
// val system = ActorSystem()
|
||||
|
||||
// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
|
||||
|
||||
|
|
@ -66,10 +66,10 @@
|
|||
|
||||
// //#create-workers
|
||||
// // create the workers
|
||||
// val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker])
|
||||
// val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker])
|
||||
|
||||
// // wrap them with a load-balancing router
|
||||
// val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
|
||||
// val router = system.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
|
||||
// //#create-workers
|
||||
|
||||
// //#master-receive
|
||||
|
|
@ -119,7 +119,7 @@
|
|||
// val latch = new CountDownLatch(1)
|
||||
|
||||
// // create the master
|
||||
// val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
|
||||
// val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
|
||||
|
||||
// // start the calculation
|
||||
// master ! Calculate
|
||||
|
|
@ -128,5 +128,5 @@
|
|||
// latch.await()
|
||||
// }
|
||||
// }
|
||||
// //#app
|
||||
// //#system
|
||||
|
||||
|
|
|
|||
|
|
@ -14,13 +14,13 @@ import javax.servlet.{ ServletContextListener, ServletContextEvent }
|
|||
/**
|
||||
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
|
||||
*
|
||||
* <web-app>
|
||||
* <web-system>
|
||||
* ...
|
||||
* <listener>
|
||||
* <listener-class>akka.servlet.Initializer</listener-class>
|
||||
* </listener>
|
||||
* ...
|
||||
* </web-app>
|
||||
* </web-system>
|
||||
*/
|
||||
class Initializer extends ServletContextListener {
|
||||
lazy val loader = new AkkaLoader
|
||||
|
|
|
|||
|
|
@ -25,10 +25,10 @@ import akka.actor.ActorSystem
|
|||
*/
|
||||
class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 1000) {
|
||||
|
||||
def this(app: ActorSystem) {
|
||||
def this(system: ActorSystem) {
|
||||
this(
|
||||
app.settings.config.getInt("akka.remote.failure-detector.theshold", 8),
|
||||
app.settings.config.getInt("akka.remote.failure-detector.max-sample-size", 1000))
|
||||
system.settings.config.getInt("akka.remote.failure-detector.theshold", 8),
|
||||
system.settings.config.getInt("akka.remote.failure-detector.max-sample-size", 1000))
|
||||
}
|
||||
|
||||
private final val PhiFactor = 1.0 / math.log(10.0)
|
||||
|
|
|
|||
|
|
@ -21,26 +21,26 @@ trait BootableRemoteActorService extends Bootable {
|
|||
def settings: RemoteServerSettings
|
||||
|
||||
protected lazy val remoteServerThread = new Thread(new Runnable() {
|
||||
def run = app.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port
|
||||
def run = system.remote.start(self.applicationLoader.getOrElse(null)) //Use config host/port
|
||||
}, "Akka RemoteModule Service")
|
||||
|
||||
def startRemoteService() { remoteServerThread.start() }
|
||||
|
||||
abstract override def onLoad() {
|
||||
if (app.reflective.ClusterModule.isEnabled && settings.isRemotingEnabled) {
|
||||
app.eventHandler.info(this, "Initializing Remote Actors Service...")
|
||||
if (system.reflective.ClusterModule.isEnabled && settings.isRemotingEnabled) {
|
||||
system.eventHandler.info(this, "Initializing Remote Actors Service...")
|
||||
startRemoteService()
|
||||
app.eventHandler.info(this, "Remote Actors Service initialized")
|
||||
system.eventHandler.info(this, "Remote Actors Service initialized")
|
||||
}
|
||||
super.onLoad()
|
||||
}
|
||||
|
||||
abstract override def onUnload() {
|
||||
app.eventHandler.info(this, "Shutting down Remote Actors Service")
|
||||
system.eventHandler.info(this, "Shutting down Remote Actors Service")
|
||||
|
||||
app.remote.shutdown()
|
||||
system.remote.shutdown()
|
||||
if (remoteServerThread.isAlive) remoteServerThread.join(1000)
|
||||
app.eventHandler.info(this, "Remote Actors Service has been shut down")
|
||||
system.eventHandler.info(this, "Remote Actors Service has been shut down")
|
||||
super.onUnload()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,13 +101,13 @@ class Gossiper(remote: Remote) {
|
|||
currentGossip: Gossip,
|
||||
nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
|
||||
|
||||
private val app = remote.app
|
||||
private val log = Logging(app, this)
|
||||
private val system = remote.system
|
||||
private val log = Logging(system, this)
|
||||
private val failureDetector = remote.failureDetector
|
||||
private val connectionManager = new RemoteConnectionManager(app, remote, Map.empty[RemoteAddress, ActorRef])
|
||||
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
|
||||
private val seeds = Set(address) // FIXME read in list of seeds from config
|
||||
|
||||
private val address = app.rootPath.remoteAddress
|
||||
private val address = system.rootPath.remoteAddress
|
||||
private val nodeFingerprint = address.##
|
||||
|
||||
private val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
|
@ -122,8 +122,8 @@ class Gossiper(remote: Remote) {
|
|||
|
||||
{
|
||||
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
|
||||
app.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit)
|
||||
app.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit)
|
||||
system.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit)
|
||||
system.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -153,7 +153,7 @@ class Gossiper(remote: Remote) {
|
|||
node ← oldAvailableNodes
|
||||
if connectionManager.connectionFor(node).isEmpty
|
||||
} {
|
||||
val connectionFactory = () ⇒ RemoteActorRef(remote.app.provider, remote.server, gossipingNode, remote.remoteDaemon.path, None)
|
||||
val connectionFactory = () ⇒ RemoteActorRef(remote.system.provider, remote.server, gossipingNode, remote.remoteDaemon.path, None)
|
||||
connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node
|
||||
oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes
|
||||
}
|
||||
|
|
@ -299,7 +299,7 @@ class Gossiper(remote: Remote) {
|
|||
}
|
||||
|
||||
private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = {
|
||||
val gossipAsBytes = app.serialization.serialize(gossip) match {
|
||||
val gossipAsBytes = system.serialization.serialize(gossip) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(bytes) ⇒ bytes
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,15 +11,15 @@ import akka.actor.ActorSystem
|
|||
|
||||
object MessageSerializer {
|
||||
|
||||
def deserialize(app: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
|
||||
val clazz = loadManifest(classLoader, messageProtocol)
|
||||
app.serialization.deserialize(messageProtocol.getMessage.toByteArray,
|
||||
system.serialization.deserialize(messageProtocol.getMessage.toByteArray,
|
||||
clazz, classLoader).fold(x ⇒ throw x, identity)
|
||||
}
|
||||
|
||||
def serialize(app: ActorSystem, message: AnyRef): MessageProtocol = {
|
||||
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
|
||||
val builder = MessageProtocol.newBuilder
|
||||
val bytes = app.serialization.serialize(message).fold(x ⇒ throw x, identity)
|
||||
val bytes = system.serialization.serialize(message).fold(x ⇒ throw x, identity)
|
||||
builder.setMessage(ByteString.copyFrom(bytes))
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
|
||||
builder.build
|
||||
|
|
|
|||
|
|
@ -58,14 +58,14 @@ object NetworkEventStream {
|
|||
}
|
||||
}
|
||||
|
||||
class NetworkEventStream(app: ActorSystemImpl) {
|
||||
class NetworkEventStream(system: ActorSystemImpl) {
|
||||
|
||||
import NetworkEventStream._
|
||||
|
||||
// FIXME: check that this supervision is correct
|
||||
private[akka] val sender = app.provider.actorOf(app,
|
||||
Props[Channel].copy(dispatcher = app.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
|
||||
app.systemGuardian, "network-event-sender", systemService = true)
|
||||
private[akka] val sender = system.provider.actorOf(system,
|
||||
Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
|
||||
system.systemGuardian, "network-event-sender", systemService = true)
|
||||
|
||||
/**
|
||||
* Registers a network event stream listener (asyncronously).
|
||||
|
|
|
|||
|
|
@ -26,11 +26,11 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class Remote(val app: ActorSystemImpl, val nodename: String) {
|
||||
class Remote(val system: ActorSystemImpl, val nodename: String) {
|
||||
|
||||
val log = Logging(app, this)
|
||||
val log = Logging(system, this)
|
||||
|
||||
import app._
|
||||
import system._
|
||||
val AC = settings
|
||||
import AC._
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ class Remote(val app: ActorSystemImpl, val nodename: String) {
|
|||
val shouldCompressData = config.getBool("akka.remote.use-compression", false)
|
||||
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
|
||||
|
||||
val failureDetector = new AccrualFailureDetector(app)
|
||||
val failureDetector = new AccrualFailureDetector(system)
|
||||
|
||||
// val gossiper = new Gossiper(this)
|
||||
|
||||
|
|
@ -48,17 +48,17 @@ class Remote(val app: ActorSystemImpl, val nodename: String) {
|
|||
val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build
|
||||
|
||||
// FIXME it is probably better to create another supervisor for handling the children created by handle_*
|
||||
private[remote] lazy val remoteDaemonSupervisor = app.actorOf(Props(
|
||||
private[remote] lazy val remoteDaemonSupervisor = system.actorOf(Props(
|
||||
OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want?
|
||||
|
||||
private[remote] lazy val remoteDaemon =
|
||||
app.provider.actorOf(app,
|
||||
system.provider.actorOf(system,
|
||||
Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)),
|
||||
remoteDaemonSupervisor,
|
||||
remoteDaemonServiceName,
|
||||
systemService = true)
|
||||
|
||||
private[remote] lazy val remoteClientLifeCycleHandler = app.actorOf(Props(new Actor {
|
||||
private[remote] lazy val remoteClientLifeCycleHandler = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address)
|
||||
case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address)
|
||||
|
|
@ -66,22 +66,22 @@ class Remote(val app: ActorSystemImpl, val nodename: String) {
|
|||
}
|
||||
}), "akka.remote.RemoteClientLifeCycleListener")
|
||||
|
||||
lazy val eventStream = new NetworkEventStream(app)
|
||||
lazy val eventStream = new NetworkEventStream(system)
|
||||
|
||||
lazy val server: RemoteSupport = {
|
||||
val remote = new akka.remote.netty.NettyRemoteSupport(app)
|
||||
val remote = new akka.remote.netty.NettyRemoteSupport(system)
|
||||
remote.start() //TODO FIXME Any application loader here?
|
||||
|
||||
app.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
|
||||
app.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
|
||||
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
|
||||
// TODO actually register this provider in app in remote mode
|
||||
// TODO actually register this provider in system in remote mode
|
||||
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
|
||||
remote
|
||||
}
|
||||
|
||||
def start(): Unit = {
|
||||
val serverAddress = server.app.rootPath.remoteAddress //Force init of server
|
||||
val serverAddress = server.system.rootPath.remoteAddress //Force init of server
|
||||
val daemonAddress = remoteDaemon.address //Force init of daemon
|
||||
log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", serverAddress, daemonAddress)
|
||||
}
|
||||
|
|
@ -97,6 +97,7 @@ class Remote(val app: ActorSystemImpl, val nodename: String) {
|
|||
class RemoteSystemDaemon(remote: Remote) extends Actor {
|
||||
|
||||
import remote._
|
||||
import remote.{ system ⇒ systemImpl }
|
||||
|
||||
override def preRestart(reason: Throwable, msg: Option[Any]) {
|
||||
log.debug("RemoteSystemDaemon failed due to [{}] - restarting...", reason)
|
||||
|
|
@ -133,16 +134,16 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
|
||||
|
||||
val actorFactory =
|
||||
app.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
|
||||
system.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor]
|
||||
}
|
||||
|
||||
val actorPath = ActorPath(remote.app, message.getActorPath)
|
||||
val parent = app.actorFor(actorPath.parent)
|
||||
val actorPath = ActorPath(systemImpl, message.getActorPath)
|
||||
val parent = system.actorFor(actorPath.parent)
|
||||
|
||||
if (parent.isDefined) {
|
||||
app.provider.actorOf(app, Props(creator = actorFactory), parent.get, actorPath.name)
|
||||
systemImpl.provider.actorOf(systemImpl, Props(creator = actorFactory), parent.get, actorPath.name)
|
||||
} else {
|
||||
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
|
||||
}
|
||||
|
|
@ -151,7 +152,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message)
|
||||
}
|
||||
|
||||
sender ! Success(app.address)
|
||||
sender ! Success(systemImpl.address)
|
||||
} catch {
|
||||
case error: Throwable ⇒ //FIXME doesn't seem sensible
|
||||
sender ! Failure(error)
|
||||
|
|
@ -192,7 +193,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
|
||||
// FIXME: handle real remote supervision
|
||||
def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(app,
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case f: Function0[_] ⇒ try { f() } finally { context.self.stop() }
|
||||
|
|
@ -201,7 +202,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
|
||||
// FIXME: handle real remote supervision
|
||||
def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(app,
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case f: Function0[_] ⇒ try { sender ! f() } finally { context.self.stop() }
|
||||
|
|
@ -210,7 +211,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
|
||||
// FIXME: handle real remote supervision
|
||||
def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(app,
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { fun.asInstanceOf[Any ⇒ Unit].apply(param) } finally { context.self.stop() }
|
||||
|
|
@ -219,7 +220,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
|
||||
// FIXME: handle real remote supervision
|
||||
def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) {
|
||||
new LocalActorRef(app,
|
||||
new LocalActorRef(systemImpl,
|
||||
Props(
|
||||
context ⇒ {
|
||||
case (fun: Function[_, _], param: Any) ⇒ try { sender ! fun.asInstanceOf[Any ⇒ Any](param) } finally { context.self.stop() }
|
||||
|
|
@ -232,7 +233,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
}
|
||||
|
||||
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
|
||||
app.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
|
||||
system.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(instance) ⇒ instance.asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -241,20 +242,20 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
|
|||
|
||||
class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) {
|
||||
|
||||
val provider = remote.app.asInstanceOf[ActorSystemImpl].provider
|
||||
val provider = remote.system.asInstanceOf[ActorSystemImpl].provider
|
||||
|
||||
lazy val sender: ActorRef =
|
||||
if (input.hasSender)
|
||||
provider.deserialize(
|
||||
SerializedActorRef(input.getSender.getHost, input.getSender.getPort, input.getSender.getPath)).getOrElse(throw new IllegalStateException("OHNOES"))
|
||||
else
|
||||
remote.app.deadLetters
|
||||
remote.system.deadLetters
|
||||
|
||||
lazy val recipient: ActorRef = remote.app.actorFor(input.getRecipient.getPath).getOrElse(remote.app.deadLetters)
|
||||
lazy val recipient: ActorRef = remote.system.actorFor(input.getRecipient.getPath).getOrElse(remote.system.deadLetters)
|
||||
|
||||
lazy val payload: Either[Throwable, AnyRef] =
|
||||
if (input.hasException) Left(parseException())
|
||||
else Right(MessageSerializer.deserialize(remote.app, input.getMessage, classLoader))
|
||||
else Right(MessageSerializer.deserialize(remote.system, input.getMessage, classLoader))
|
||||
|
||||
protected def parseException(): Throwable = {
|
||||
val exception = input.getException
|
||||
|
|
@ -267,7 +268,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
|
|||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
} catch {
|
||||
case problem: Exception ⇒
|
||||
remote.app.eventStream.publish(Logging.Error(problem, remote, problem.getMessage))
|
||||
remote.system.eventStream.publish(Logging.Error(problem, remote, problem.getMessage))
|
||||
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
|
||||
}
|
||||
}
|
||||
|
|
@ -277,7 +278,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
|
|||
|
||||
trait RemoteMarshallingOps {
|
||||
|
||||
def app: ActorSystem
|
||||
def system: ActorSystem
|
||||
|
||||
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
|
||||
val arp = AkkaRemoteProtocol.newBuilder
|
||||
|
|
@ -295,7 +296,7 @@ trait RemoteMarshallingOps {
|
|||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||
*/
|
||||
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = {
|
||||
val rep = app.asInstanceOf[ActorSystemImpl].provider.serialize(actor)
|
||||
val rep = system.asInstanceOf[ActorSystemImpl].provider.serialize(actor)
|
||||
ActorRefProtocol.newBuilder.setHost(rep.hostname).setPort(rep.port).setPath(rep.path).build
|
||||
}
|
||||
|
||||
|
|
@ -308,7 +309,7 @@ trait RemoteMarshallingOps {
|
|||
|
||||
message match {
|
||||
case Right(message) ⇒
|
||||
messageBuilder.setMessage(MessageSerializer.serialize(app, message.asInstanceOf[AnyRef]))
|
||||
messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef]))
|
||||
case Left(exception) ⇒
|
||||
messageBuilder.setException(ExceptionProtocol.newBuilder
|
||||
.setClassname(exception.getClass.getName)
|
||||
|
|
|
|||
|
|
@ -55,10 +55,10 @@ class RemoteActorRefProvider(
|
|||
@volatile
|
||||
private var remoteDaemonConnectionManager: RemoteConnectionManager = _
|
||||
|
||||
def init(app: ActorSystemImpl) {
|
||||
local.init(app)
|
||||
remote = new Remote(app, nodename)
|
||||
remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
|
||||
def init(system: ActorSystemImpl) {
|
||||
local.init(system)
|
||||
remote = new Remote(system, nodename)
|
||||
remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
|
||||
}
|
||||
|
||||
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
|
||||
|
|
@ -69,11 +69,11 @@ class RemoteActorRefProvider(
|
|||
def defaultDispatcher = dispatcher
|
||||
def defaultTimeout = settings.ActorTimeout
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(app, props, supervisor, supervisor.path / name, systemService)
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
|
||||
actorOf(system, props, supervisor, supervisor.path / name, systemService)
|
||||
|
||||
private[akka] def actorOf(app: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
|
||||
if (systemService) local.actorOf(app, props, supervisor, path, systemService)
|
||||
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef =
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
|
||||
else {
|
||||
val name = path.name
|
||||
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
|
||||
|
|
@ -92,13 +92,13 @@ class RemoteActorRefProvider(
|
|||
// case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass)
|
||||
// }
|
||||
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == app.address }
|
||||
def isReplicaNode: Boolean = remoteAddresses exists { _ == system.address }
|
||||
|
||||
//app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
|
||||
|
||||
if (isReplicaNode) {
|
||||
// we are on one of the replica node for this remote actor
|
||||
local.actorOf(app, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
|
||||
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
|
||||
} else {
|
||||
|
||||
// we are on the single "reference" node uses the remote actors on the replica nodes
|
||||
|
|
@ -135,17 +135,17 @@ class RemoteActorRefProvider(
|
|||
|
||||
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
|
||||
val remoteAddress = RemoteAddress(a.hostname, a.port)
|
||||
conns + (remoteAddress -> RemoteActorRef(remote.app.provider, remote.server, remoteAddress, path, None))
|
||||
conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None))
|
||||
}
|
||||
|
||||
val connectionManager = new RemoteConnectionManager(app, remote, connections)
|
||||
val connectionManager = new RemoteConnectionManager(system, remote, connections)
|
||||
|
||||
connections.keys foreach { useActorOnNode(app, _, path.toString, props.creator) }
|
||||
connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
|
||||
|
||||
actorOf(app, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
|
||||
}
|
||||
|
||||
case deploy ⇒ local.actorOf(app, props, supervisor, name, systemService)
|
||||
case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
|
|
@ -153,7 +153,7 @@ class RemoteActorRefProvider(
|
|||
throw e
|
||||
}
|
||||
|
||||
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
|
||||
// actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later
|
||||
|
||||
newFuture completeWithResult actor
|
||||
actors.replace(path.toString, newFuture, actor)
|
||||
|
|
@ -167,9 +167,9 @@ class RemoteActorRefProvider(
|
|||
* Copied from LocalActorRefProvider...
|
||||
*/
|
||||
// FIXME: implement supervision
|
||||
def actorOf(app: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
|
||||
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
|
||||
new RoutedActorRef(app, props, supervisor, name)
|
||||
new RoutedActorRef(system, props, supervisor, name)
|
||||
}
|
||||
|
||||
def actorFor(path: Iterable[String]): Option[ActorRef] = actors.get(ActorPath.join(path)) match {
|
||||
|
|
@ -198,18 +198,18 @@ class RemoteActorRefProvider(
|
|||
local.actorFor(ActorPath.split(actor.path))
|
||||
} else {
|
||||
log.debug("{}: Creating RemoteActorRef with address [{}] connected to [{}]", rootPath.remoteAddress, actor.path, remoteAddress)
|
||||
Some(RemoteActorRef(remote.app.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
|
||||
Some(RemoteActorRef(remote.system.provider, remote.server, remoteAddress, rootPath / ActorPath.split(actor.path), None)) //Should it be None here
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
*/
|
||||
def useActorOnNode(app: ActorSystem, remoteAddress: RemoteAddress, actorPath: String, actorFactory: () ⇒ Actor) {
|
||||
def useActorOnNode(system: ActorSystem, remoteAddress: RemoteAddress, actorPath: String, actorFactory: () ⇒ Actor) {
|
||||
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
|
||||
|
||||
val actorFactoryBytes =
|
||||
app.serialization.serialize(actorFactory) match {
|
||||
system.serialization.serialize(actorFactory) match {
|
||||
case Left(error) ⇒ throw error
|
||||
case Right(bytes) ⇒ if (remote.shouldCompressData) LZF.compress(bytes) else bytes
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,12 +20,12 @@ import java.util.concurrent.atomic.AtomicReference
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteConnectionManager(
|
||||
app: ActorSystem,
|
||||
system: ActorSystem,
|
||||
remote: Remote,
|
||||
initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef])
|
||||
extends ConnectionManager {
|
||||
|
||||
val log = Logging(app, this)
|
||||
val log = Logging(system, this)
|
||||
|
||||
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
|
||||
case class State(version: Long, connections: Map[RemoteAddress, ActorRef])
|
||||
|
|
@ -149,5 +149,5 @@ class RemoteConnectionManager(
|
|||
}
|
||||
|
||||
private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) =
|
||||
RemoteActorRef(remote.app.provider, remote.server, remoteAddress, actorPath, None)
|
||||
RemoteActorRef(remote.system.provider, remote.server, remoteAddress, actorPath, None)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ abstract class RemoteClient private[akka] (
|
|||
val remoteSupport: NettyRemoteSupport,
|
||||
val remoteAddress: RemoteAddress) {
|
||||
|
||||
val log = Logging(remoteSupport.app, this)
|
||||
val log = Logging(remoteSupport.system, this)
|
||||
|
||||
val name = simpleName(this) + "@" + remoteAddress
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ class ActiveRemoteClient private[akka] (
|
|||
def sendSecureCookie(connection: ChannelFuture) {
|
||||
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
|
||||
if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get)
|
||||
val addr = remoteSupport.app.rootPath.remoteAddress
|
||||
val addr = remoteSupport.system.rootPath.remoteAddress
|
||||
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder.setHostname(addr.hostname).setPort(addr.port).build)
|
||||
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
|
||||
}
|
||||
|
|
@ -349,10 +349,10 @@ class ActiveRemoteClientHandler(
|
|||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with RemoteMarshallingOps {
|
||||
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
|
||||
|
||||
val serverSettings = new RemoteServerSettings(app.settings.config, app.settings.DefaultTimeUnit)
|
||||
val clientSettings = new RemoteClientSettings(app.settings.config, app.settings.DefaultTimeUnit)
|
||||
val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit)
|
||||
val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit)
|
||||
|
||||
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
|
||||
private val clientsLock = new ReadWriteGuard
|
||||
|
|
@ -429,7 +429,7 @@ class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with Rem
|
|||
|
||||
def name = currentServer.get match {
|
||||
case Some(server) ⇒ server.name
|
||||
case None ⇒ "Non-running NettyRemoteServer@" + app.rootPath.remoteAddress
|
||||
case None ⇒ "Non-running NettyRemoteServer@" + system.rootPath.remoteAddress
|
||||
}
|
||||
|
||||
private val _isRunning = new Switch(false)
|
||||
|
|
@ -458,10 +458,10 @@ class NettyRemoteSupport(_app: ActorSystem) extends RemoteSupport(_app) with Rem
|
|||
}
|
||||
|
||||
class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) {
|
||||
val log = Logging(remoteSupport.app, this)
|
||||
val log = Logging(remoteSupport.system, this)
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
val address = remoteSupport.app.rootPath.remoteAddress
|
||||
val address = remoteSupport.system.rootPath.remoteAddress
|
||||
|
||||
val name = "NettyRemoteServer@" + address
|
||||
|
||||
|
|
@ -563,7 +563,7 @@ class RemoteServerHandler(
|
|||
val applicationLoader: Option[ClassLoader],
|
||||
val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
val log = Logging(remoteSupport.app, this)
|
||||
val log = Logging(remoteSupport.system, this)
|
||||
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
|
|
|
|||
|
|
@ -122,14 +122,14 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
|
|||
* Alright, here's our test-harness
|
||||
*/
|
||||
object DiningHakkers {
|
||||
val app = ActorSystem()
|
||||
val system = ActorSystem()
|
||||
def run {
|
||||
//Create 5 chopsticks
|
||||
val chopsticks = for (i ← 1 to 5) yield app.actorOf(new Chopstick("Chopstick " + i))
|
||||
val chopsticks = for (i ← 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i))
|
||||
//Create 5 awesome hakkers and assign them their left and right chopstick
|
||||
val hakkers = for {
|
||||
(name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex
|
||||
} yield app.actorOf(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5)))
|
||||
} yield system.actorOf(new Hakker(name, chopsticks(i), chopsticks((i + 1) % 5)))
|
||||
|
||||
//Signal all hakkers that they should start thinking, and watch the show
|
||||
hakkers.foreach(_ ! Think)
|
||||
|
|
|
|||
|
|
@ -163,15 +163,15 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
|
|||
*/
|
||||
object DiningHakkersOnFsm {
|
||||
|
||||
val app = ActorSystem()
|
||||
val system = ActorSystem()
|
||||
|
||||
def run = {
|
||||
// Create 5 chopsticks
|
||||
val chopsticks = for (i ← 1 to 5) yield app.actorOf(new Chopstick("Chopstick " + i))
|
||||
val chopsticks = for (i ← 1 to 5) yield system.actorOf(new Chopstick("Chopstick " + i))
|
||||
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
|
||||
val hakkers = for {
|
||||
(name, i) ← List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex
|
||||
} yield app.actorOf(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5)))
|
||||
} yield system.actorOf(new FSMHakker(name, chopsticks(i), chopsticks((i + 1) % 5)))
|
||||
|
||||
hakkers.foreach(_ ! Think)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
|
|||
assert(target.getStringFromVal === entry.value)
|
||||
}
|
||||
|
||||
it("should create an app context and verify dependency injection for typed") {
|
||||
it("should create an system context and verify dependency injection for typed") {
|
||||
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
|
||||
val ta = ctx.getBean("typedActor").asInstanceOf[PojoInf];
|
||||
assert(ta.isPreStartInvoked)
|
||||
|
|
@ -75,7 +75,7 @@ class ActorFactoryBeanTest extends Spec with ShouldMatchers with BeforeAndAfterA
|
|||
ctx.close
|
||||
}
|
||||
|
||||
it("should create an app context and verify dependency injection for untyped actors") {
|
||||
it("should create an system context and verify dependency injection for untyped actors") {
|
||||
var ctx = new ClassPathXmlApplicationContext("appContext.xml")
|
||||
val uta = ctx.getBean("untypedActor").asInstanceOf[ActorRef]
|
||||
val ping = uta.actor.asInstanceOf[PingActor]
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ class CamelServiceSpringFeatureTest extends FeatureSpec with BeforeAndAfterEach
|
|||
Actor.registry.shutdownAll
|
||||
}
|
||||
|
||||
feature("start CamelService from Spring app context") {
|
||||
feature("start CamelService from Spring system context") {
|
||||
import CamelContextManager._
|
||||
scenario("with a custom CamelContext and access a registered typed actor") {
|
||||
val appctx = new ClassPathXmlApplicationContext("/appContextCamelServiceCustom.xml")
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with B
|
|||
myPojo
|
||||
}
|
||||
|
||||
feature("parse Spring app context") {
|
||||
feature("parse Spring system context") {
|
||||
|
||||
scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") {
|
||||
val context = new ClassPathResource("/typed-actor-config.xml")
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with
|
|||
pingActor
|
||||
}
|
||||
|
||||
feature("parse Spring app context") {
|
||||
feature("parse Spring system context") {
|
||||
|
||||
scenario("get a untyped actor") {
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ private[akka] case object Get
|
|||
* Factory method for creating an Agent.
|
||||
*/
|
||||
object Agent {
|
||||
def apply[T](initialValue: T)(implicit app: ActorSystem) = new Agent(initialValue, app)
|
||||
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -93,9 +93,9 @@ object Agent {
|
|||
* agent4.close
|
||||
* }}}
|
||||
*/
|
||||
class Agent[T](initialValue: T, app: ActorSystem) {
|
||||
class Agent[T](initialValue: T, system: ActorSystem) {
|
||||
private[akka] val ref = Ref(initialValue)
|
||||
private[akka] val updater = app.actorOf(Props(new AgentUpdater(this))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow?
|
||||
private[akka] val updater = system.actorOf(Props(new AgentUpdater(this))).asInstanceOf[LocalActorRef] //TODO can we avoid this somehow?
|
||||
|
||||
/**
|
||||
* Read the internal state of the agent.
|
||||
|
|
@ -123,7 +123,7 @@ class Agent[T](initialValue: T, app: ActorSystem) {
|
|||
def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = {
|
||||
def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]]
|
||||
if (Stm.activeTransaction) {
|
||||
val result = new DefaultPromise[T](timeout)(app.dispatcher)
|
||||
val result = new DefaultPromise[T](timeout)(system.dispatcher)
|
||||
get //Join xa
|
||||
deferred { result completeWith dispatch } //Attach deferred-block to current transaction
|
||||
result
|
||||
|
|
@ -151,8 +151,8 @@ class Agent[T](initialValue: T, app: ActorSystem) {
|
|||
def sendOff(f: T ⇒ T): Unit = {
|
||||
send((value: T) ⇒ {
|
||||
suspend()
|
||||
val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-send-off", UnboundedMailbox(), app.settings.ActorTimeoutMillis)
|
||||
val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||
val pinnedDispatcher = new PinnedDispatcher(system.deadLetterMailbox, system.eventStream, system.scheduler, null, "agent-send-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis)
|
||||
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||
threadBased ! Update(f)
|
||||
value
|
||||
})
|
||||
|
|
@ -166,11 +166,11 @@ class Agent[T](initialValue: T, app: ActorSystem) {
|
|||
* still be executed in order.
|
||||
*/
|
||||
def alterOff(f: T ⇒ T)(timeout: Timeout): Future[T] = {
|
||||
val result = new DefaultPromise[T](timeout)(app.dispatcher)
|
||||
val result = new DefaultPromise[T](timeout)(system.dispatcher)
|
||||
send((value: T) ⇒ {
|
||||
suspend()
|
||||
val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-alter-off", UnboundedMailbox(), app.settings.ActorTimeoutMillis)
|
||||
val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||
val pinnedDispatcher = new PinnedDispatcher(system.deadLetterMailbox, system.eventStream, system.scheduler, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis)
|
||||
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
|
||||
value
|
||||
})
|
||||
|
|
@ -192,7 +192,7 @@ class Agent[T](initialValue: T, app: ActorSystem) {
|
|||
* Map this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
*/
|
||||
def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(app)
|
||||
def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(system)
|
||||
|
||||
/**
|
||||
* Flatmap this agent to a new agent, applying the function to the internal state.
|
||||
|
|
@ -262,7 +262,7 @@ class Agent[T](initialValue: T, app: ActorSystem) {
|
|||
* Map this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
*/
|
||||
def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(app)
|
||||
def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(system)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
|
|||
|
||||
class AgentSpec extends WordSpec with MustMatchers {
|
||||
|
||||
implicit val app = ActorSystem("AgentSpec")
|
||||
implicit val system = ActorSystem("AgentSpec")
|
||||
implicit val timeout = Timeout(5.seconds.dilated)
|
||||
|
||||
"Agent" should {
|
||||
|
|
|
|||
|
|
@ -22,14 +22,14 @@ import akka.event.EventStream
|
|||
* @since 1.1
|
||||
*/
|
||||
class TestActorRef[T <: Actor](
|
||||
_app: ActorSystemImpl,
|
||||
_system: ActorSystemImpl,
|
||||
_deadLetterMailbox: Mailbox,
|
||||
_eventStream: EventStream,
|
||||
_scheduler: Scheduler,
|
||||
_props: Props,
|
||||
_supervisor: ActorRef,
|
||||
name: String)
|
||||
extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_deadLetterMailbox, _eventStream, _scheduler)), _supervisor, _supervisor.path / name, false) {
|
||||
extends LocalActorRef(_system, _props.withDispatcher(new CallingThreadDispatcher(_deadLetterMailbox, _eventStream, _scheduler)), _supervisor, _supervisor.path / name, false) {
|
||||
/**
|
||||
* Directly inject messages into actor receive behavior. Any exceptions
|
||||
* thrown will be available to you, while still being able to use
|
||||
|
|
@ -57,23 +57,23 @@ object TestActorRef {
|
|||
"$" + akka.util.Helpers.base64(l)
|
||||
}
|
||||
|
||||
def apply[T <: Actor](factory: ⇒ T)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), randomName)
|
||||
def apply[T <: Actor](factory: ⇒ T)(implicit system: ActorSystem): TestActorRef[T] = apply[T](Props(factory), randomName)
|
||||
|
||||
def apply[T <: Actor](factory: ⇒ T, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](Props(factory), name)
|
||||
def apply[T <: Actor](factory: ⇒ T, name: String)(implicit system: ActorSystem): TestActorRef[T] = apply[T](Props(factory), name)
|
||||
|
||||
def apply[T <: Actor](props: Props)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, randomName)
|
||||
def apply[T <: Actor](props: Props)(implicit system: ActorSystem): TestActorRef[T] = apply[T](props, randomName)
|
||||
|
||||
def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] =
|
||||
apply[T](props, app.asInstanceOf[ActorSystemImpl].guardian, name)
|
||||
def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] =
|
||||
apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name)
|
||||
|
||||
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit app: ActorSystem): TestActorRef[T] = {
|
||||
val impl = app.asInstanceOf[ActorSystemImpl]
|
||||
new TestActorRef(impl, impl.deadLetterMailbox, app.eventStream, app.scheduler, props, supervisor, name)
|
||||
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl]
|
||||
new TestActorRef(impl, impl.deadLetterMailbox, system.eventStream, system.scheduler, props, supervisor, name)
|
||||
}
|
||||
|
||||
def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](randomName)
|
||||
def apply[T <: Actor](implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)
|
||||
|
||||
def apply[T <: Actor](name: String)(implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](Props({
|
||||
def apply[T <: Actor](name: String)(implicit m: Manifest[T], system: ActorSystem): TestActorRef[T] = apply[T](Props({
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
createInstance[T](m.erasure, noParams, noArgs) match {
|
||||
case Right(value) ⇒ value
|
||||
|
|
|
|||
|
|
@ -25,15 +25,15 @@ object TestBarrier {
|
|||
class TestBarrier(count: Int) {
|
||||
private val barrier = new CyclicBarrier(count)
|
||||
|
||||
def await()(implicit app: ActorSystem): Unit = await(TestBarrier.DefaultTimeout)
|
||||
def await()(implicit system: ActorSystem): Unit = await(TestBarrier.DefaultTimeout)
|
||||
|
||||
def await(timeout: Duration)(implicit app: ActorSystem) {
|
||||
def await(timeout: Duration)(implicit system: ActorSystem) {
|
||||
try {
|
||||
barrier.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
|
||||
} catch {
|
||||
case e: TimeoutException ⇒
|
||||
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s"
|
||||
format (timeout.toString, app.settings.TestTimeFactor))
|
||||
format (timeout.toString, system.settings.TestTimeFactor))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,17 +79,17 @@ abstract class EventFilter(occurrences: Int) {
|
|||
* Apply this filter while executing the given code block. Care is taken to
|
||||
* remove the filter when the block is finished or aborted.
|
||||
*/
|
||||
def intercept[T](code: ⇒ T)(implicit app: ActorSystem): T = {
|
||||
app.eventStream publish TestEvent.Mute(this)
|
||||
def intercept[T](code: ⇒ T)(implicit system: ActorSystem): T = {
|
||||
system.eventStream publish TestEvent.Mute(this)
|
||||
try {
|
||||
val result = code
|
||||
if (!awaitDone(app.settings.TestEventFilterLeeway))
|
||||
if (!awaitDone(system.settings.TestEventFilterLeeway))
|
||||
if (todo > 0)
|
||||
throw new AssertionError("Timeout (" + app.settings.TestEventFilterLeeway + ") waiting for " + todo + " messages on " + this)
|
||||
throw new AssertionError("Timeout (" + system.settings.TestEventFilterLeeway + ") waiting for " + todo + " messages on " + this)
|
||||
else
|
||||
throw new AssertionError("Received " + (-todo) + " messages too many on " + this)
|
||||
result
|
||||
} finally app.eventStream publish TestEvent.UnMute(this)
|
||||
} finally system.eventStream publish TestEvent.UnMute(this)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -37,14 +37,14 @@ import akka.event.EventStream
|
|||
* @since 1.2
|
||||
*/
|
||||
class TestFSMRef[S, D, T <: Actor](
|
||||
app: ActorSystemImpl,
|
||||
system: ActorSystemImpl,
|
||||
_deadLetterMailbox: Mailbox,
|
||||
_eventStream: EventStream,
|
||||
_scheduler: Scheduler,
|
||||
props: Props,
|
||||
supervisor: ActorRef,
|
||||
name: String)(implicit ev: T <:< FSM[S, D])
|
||||
extends TestActorRef(app, _deadLetterMailbox, _eventStream, _scheduler, props, supervisor, name) {
|
||||
extends TestActorRef(system, _deadLetterMailbox, _eventStream, _scheduler, props, supervisor, name) {
|
||||
|
||||
private def fsm: T = underlyingActor
|
||||
|
||||
|
|
@ -89,13 +89,13 @@ class TestFSMRef[S, D, T <: Actor](
|
|||
|
||||
object TestFSMRef {
|
||||
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = app.asInstanceOf[ActorSystemImpl]
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl]
|
||||
new TestFSMRef(impl, impl.deadLetterMailbox, impl.eventStream, impl.scheduler, Props(creator = () ⇒ factory), impl.guardian, TestActorRef.randomName)
|
||||
}
|
||||
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = app.asInstanceOf[ActorSystemImpl]
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl]
|
||||
new TestFSMRef(impl, impl.deadLetterMailbox, impl.eventStream, impl.scheduler, Props(creator = () ⇒ factory), impl.guardian, name)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,11 +76,11 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
|
|||
* @author Roland Kuhn
|
||||
* @since 1.1
|
||||
*/
|
||||
class TestKit(_app: ActorSystem) {
|
||||
class TestKit(_system: ActorSystem) {
|
||||
|
||||
import TestActor.{ Message, RealMessage, NullMessage }
|
||||
|
||||
implicit val system = _app
|
||||
implicit val system = _system
|
||||
|
||||
private val queue = new LinkedBlockingDeque[Message]()
|
||||
private[akka] var lastMessage: Message = NullMessage
|
||||
|
|
@ -598,7 +598,7 @@ class TestProbe(_application: ActorSystem) extends TestKit(_application) {
|
|||
}
|
||||
|
||||
object TestProbe {
|
||||
def apply()(implicit app: ActorSystem) = new TestProbe(app)
|
||||
def apply()(implicit system: ActorSystem) = new TestProbe(system)
|
||||
}
|
||||
|
||||
trait ImplicitSender { this: TestKit ⇒
|
||||
|
|
|
|||
|
|
@ -21,10 +21,10 @@ class TestLatchNoTimeoutException(message: String) extends RuntimeException(mess
|
|||
object TestLatch {
|
||||
val DefaultTimeout = Duration(5, TimeUnit.SECONDS)
|
||||
|
||||
def apply(count: Int = 1)(implicit app: ActorSystem) = new TestLatch(count)
|
||||
def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count)
|
||||
}
|
||||
|
||||
class TestLatch(count: Int = 1)(implicit app: ActorSystem) {
|
||||
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
|
||||
private var latch = new CountDownLatch(count)
|
||||
|
||||
def countDown() = latch.countDown()
|
||||
|
|
@ -36,7 +36,7 @@ class TestLatch(count: Int = 1)(implicit app: ActorSystem) {
|
|||
def await(timeout: Duration): Boolean = {
|
||||
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
|
||||
if (!opened) throw new TestLatchTimeoutException(
|
||||
"Timeout of %s with time factor of %s" format (timeout.toString, app.settings.TestTimeFactor))
|
||||
"Timeout of %s with time factor of %s" format (timeout.toString, system.settings.TestTimeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
|
|
@ -46,7 +46,7 @@ class TestLatch(count: Int = 1)(implicit app: ActorSystem) {
|
|||
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
|
||||
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
|
||||
if (opened) throw new TestLatchNoTimeoutException(
|
||||
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, app.settings.TestTimeFactor))
|
||||
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, system.settings.TestTimeFactor))
|
||||
opened
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,25 +5,25 @@ import akka.util.Duration
|
|||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
|
||||
package object testkit {
|
||||
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit app: ActorSystem): T = {
|
||||
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit system: ActorSystem): T = {
|
||||
def now = System.currentTimeMillis
|
||||
|
||||
app.eventStream.publish(TestEvent.Mute(eventFilters.toSeq))
|
||||
system.eventStream.publish(TestEvent.Mute(eventFilters.toSeq))
|
||||
try {
|
||||
val result = block
|
||||
|
||||
val stop = now + app.settings.TestEventFilterLeeway.toMillis
|
||||
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + app.settings.TestEventFilterLeeway + ") waiting for " + _)
|
||||
val stop = now + system.settings.TestEventFilterLeeway.toMillis
|
||||
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + system.settings.TestEventFilterLeeway + ") waiting for " + _)
|
||||
if (failed.nonEmpty)
|
||||
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
|
||||
|
||||
result
|
||||
} finally {
|
||||
app.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq))
|
||||
system.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq))
|
||||
}
|
||||
}
|
||||
|
||||
def filterEvents[T](eventFilters: EventFilter*)(block: ⇒ T)(implicit app: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)
|
||||
def filterEvents[T](eventFilters: EventFilter*)(block: ⇒ T)(implicit system: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)
|
||||
|
||||
def filterException[T <: Throwable](block: ⇒ Unit)(implicit app: ActorSystem, m: Manifest[T]): Unit = EventFilter[T]() intercept (block)
|
||||
def filterException[T <: Throwable](block: ⇒ Unit)(implicit system: ActorSystem, m: Manifest[T]): Unit = EventFilter[T]() intercept (block)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,14 +56,14 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
|
|||
"An AkkaSpec" must {
|
||||
"terminate all actors" in {
|
||||
import ActorSystem.defaultConfig
|
||||
val app = ActorSystem("test", defaultConfig ++ Configuration(
|
||||
val system = ActorSystem("test", defaultConfig ++ Configuration(
|
||||
"akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true,
|
||||
"akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG"))
|
||||
val spec = new AkkaSpec(app) {
|
||||
val ref = Seq(testActor, app.actorOf(Props.empty, "name"))
|
||||
val spec = new AkkaSpec(system) {
|
||||
val ref = Seq(testActor, system.actorOf(Props.empty, "name"))
|
||||
}
|
||||
spec.ref foreach (_ must not be 'shutdown)
|
||||
app.stop()
|
||||
system.stop()
|
||||
spec.awaitCond(spec.ref forall (_.isShutdown), 2 seconds)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@
|
|||
|
||||
// public class Pi {
|
||||
|
||||
// private static final ActorSystem app = new ActorSystem();
|
||||
// private static final ActorSystem system = new ActorSystem();
|
||||
|
||||
// public static void main(String[] args) throws Exception {
|
||||
// Pi pi = new Pi();
|
||||
|
|
@ -109,11 +109,11 @@
|
|||
|
||||
// LinkedList<ActorRef> workers = new LinkedList<ActorRef>();
|
||||
// for (int i = 0; i < nrOfWorkers; i++) {
|
||||
// ActorRef worker = app.actorOf(Worker.class);
|
||||
// ActorRef worker = system.actorOf(Worker.class);
|
||||
// workers.add(worker);
|
||||
// }
|
||||
|
||||
// router = app.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi");
|
||||
// router = system.actorOf(new RoutedProps().withRoundRobinRouter().withLocalConnections(workers), "pi");
|
||||
// }
|
||||
|
||||
// // message handler
|
||||
|
|
@ -167,7 +167,7 @@
|
|||
// final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// // create the master
|
||||
// ActorRef master = app.actorOf(new UntypedActorFactory() {
|
||||
// ActorRef master = system.actorOf(new UntypedActorFactory() {
|
||||
// public UntypedActor create() {
|
||||
// return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
||||
// }
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
// object Pi extends App {
|
||||
|
||||
// val app = ActorSystem()
|
||||
// val system = ActorSystem()
|
||||
|
||||
// calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
|
||||
|
||||
|
|
@ -56,10 +56,10 @@
|
|||
// var start: Long = _
|
||||
|
||||
// // create the workers
|
||||
// val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker])
|
||||
// val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker])
|
||||
|
||||
// // wrap them with a load-balancing router
|
||||
// val router = app.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
|
||||
// val router = system.actorOf(RoutedProps().withRoundRobinRouter.withLocalConnections(workers), "pi")
|
||||
|
||||
// // message handler
|
||||
// def receive = {
|
||||
|
|
@ -102,7 +102,7 @@
|
|||
// val latch = new CountDownLatch(1)
|
||||
|
||||
// // create the master
|
||||
// val master = app.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
|
||||
// val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
|
||||
|
||||
// // start the calculation
|
||||
// master ! Calculate
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue