Cleanup of methods in Actor and ActorContext trait. See #1377

* Added JavaActorContext, UntypedActor.getContext
* implicit val context in Actor needs to be implicit to support forward,
it would be nice if it wasn't implicit because now I can't override context
in UntypedActor
* Removed implicit def system in Actor
* Removed implicit def defaultTimeout in Actor
* Removed receiveTimeout, children, dispatcher, become, unbecome, watch,
unwatch in Actor
* Removed corresponding as above from UntypedActor
* Removed implicit from dispatcher in ActorSystem
* Removed implicit def timeout in TypedActor
* Changed receiveTimeout to use Duration (in api)
* Changed many tests and samples to match new api
This commit is contained in:
Patrik Nordwall 2011-12-05 20:01:42 +01:00
parent 5530c4cbdb
commit 3204269f6a
56 changed files with 251 additions and 196 deletions

View file

@ -3,5 +3,6 @@ package akka.actor;
public class JavaAPITestActor extends UntypedActor {
public void onReceive(Object msg) {
getSender().tell("got it!");
getContext().getChildren();
}
}

View file

@ -21,6 +21,7 @@ object ActorFireForgetRequestReplySpec {
}
class CrashingActor extends Actor {
implicit val system = context.system
def receive = {
case "Die"
state.finished.await
@ -29,6 +30,7 @@ object ActorFireForgetRequestReplySpec {
}
class SenderActor(replyActor: ActorRef) extends Actor {
implicit val system = context.system
def receive = {
case "Init"
replyActor ! "Send"
@ -51,7 +53,7 @@ object ActorFireForgetRequestReplySpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach {
class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
import ActorFireForgetRequestReplySpec._
override def beforeEach() = {

View file

@ -26,7 +26,7 @@ object ActorLifeCycleSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
import ActorLifeCycleSpec._
"An Actor" must {

View file

@ -37,6 +37,7 @@ object ActorRefSpec {
}
class WorkerActor() extends Actor {
implicit val system = context.system
def receive = {
case "work" {
work
@ -111,7 +112,7 @@ object ActorRefSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorRefSpec extends AkkaSpec {
class ActorRefSpec extends AkkaSpec with DefaultTimeout {
import akka.actor.ActorRefSpec._
def promiseIntercept(f: Actor)(to: Promise[Actor]): Actor = try {

View file

@ -7,9 +7,10 @@ import org.scalatest.BeforeAndAfterAll
import akka.dispatch.FutureTimeoutException
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll {
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
def actorWithTimeout(t: Timeout): ActorRef = actorOf(Props(creator = () new Actor {
def receive = {

View file

@ -10,9 +10,9 @@ import akka.util.duration._
import java.util.concurrent.atomic._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
def startWatching(target: ActorRef) = actorOf(Props(new Actor {
watch(target)
context.startsWatching(target)
def receive = { case x testActor forward x }
}))
@ -52,8 +52,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val terminal = actorOf(Props(context { case _ }))
val monitor1, monitor3 = startWatching(terminal)
val monitor2 = actorOf(Props(new Actor {
watch(terminal)
unwatch(terminal)
context.startsWatching(terminal)
context.stopsWatching(terminal)
def receive = {
case "ping" sender ! "pong"
case t: Terminated testActor ! t
@ -107,7 +107,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val failed = (supervisor ? Props.empty).as[ActorRef].get
val brother = (supervisor ? Props(new Actor {
watch(failed)
context.startsWatching(failed)
def receive = Actor.emptyBehavior
})).as[ActorRef].get

View file

@ -58,7 +58,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
val forward = actorOf(new Forwarder(testActor))
val fsm = actorOf(new MyFSM(testActor))
val sup = actorOf(Props(new Actor {
watch(fsm)
context.startsWatching(fsm)
def receive = { case _ }
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))

View file

@ -37,7 +37,7 @@ class HotSwapSpec extends AkkaSpec {
case "init"
_log += "init"
barrier.await
case "swap" become({
case "swap" context.become({
case _
_log += "swapped"
barrier.await
@ -113,12 +113,12 @@ class HotSwapSpec extends AkkaSpec {
_log += "init"
barrier.await
case "swap"
become({
context.become({
case "swapped"
_log += "swapped"
barrier.await
case "revert"
unbecome()
context.unbecome()
})
barrier.await
}

View file

@ -17,6 +17,9 @@ object IOActorSpec {
class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor {
implicit val timeout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
override def preStart = {
listen(ioManager, host, port)
started.open()
@ -63,6 +66,9 @@ object IOActorSpec {
// Basic Redis-style protocol
class KVStore(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor {
implicit val timeout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
var kvs: Map[String, ByteString] = Map.empty
override def preStart = {
@ -117,6 +123,9 @@ object IOActorSpec {
class KVClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
implicit val timeout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
var socket: SocketHandle = _
override def preStart {
@ -171,7 +180,7 @@ object IOActorSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class IOActorSpec extends AkkaSpec with BeforeAndAfterEach {
class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
import IOActorSpec._
"an IO Actor" must {

View file

@ -18,7 +18,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
val timeoutLatch = TestLatch()
val timeoutActor = actorOf(new Actor {
receiveTimeout = Some(500L)
context.receiveTimeout = Some(500 milliseconds)
protected def receive = {
case ReceiveTimeout timeoutLatch.open
@ -33,7 +33,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
val timeoutLatch = TestLatch()
val timeoutActor = actorOf(new Actor {
receiveTimeout = Some(500L)
context.receiveTimeout = Some(500 milliseconds)
protected def receive = {
case ReceiveTimeout timeoutLatch.open
@ -57,7 +57,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
case object Tick
val timeoutActor = actorOf(new Actor {
receiveTimeout = Some(500L)
context.receiveTimeout = Some(500 milliseconds)
protected def receive = {
case Tick ()
@ -77,14 +77,14 @@ class ReceiveTimeoutSpec extends AkkaSpec {
case object Tick
val timeoutActor = actorOf(new Actor {
receiveTimeout = Some(500L)
context.receiveTimeout = Some(500 milliseconds)
protected def receive = {
case Tick ()
case ReceiveTimeout
count.incrementAndGet
timeoutLatch.open
receiveTimeout = None
context.receiveTimeout = None
}
})
@ -109,7 +109,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
}
"have ReceiveTimeout eq to Actors ReceiveTimeout" in {
akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout)
akka.actor.Actors.receiveTimeout must be theSameInstanceAs (ReceiveTimeout)
}
}
}

View file

@ -11,9 +11,10 @@ import akka.testkit.EventFilter
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import org.multiverse.api.latches.StandardLatch
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RestartStrategySpec extends AkkaSpec {
class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
@ -206,7 +207,7 @@ class RestartStrategySpec extends AkkaSpec {
val boss = actorOf(Props(new Actor {
def receive = {
case p: Props sender ! watch(context.actorOf(p))
case p: Props sender ! context.startsWatching(context.actorOf(p))
case t: Terminated maxNoOfRestartsLatch.open
}
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000))))

View file

@ -6,9 +6,10 @@ import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
def collectCancellable(c: Cancellable): Cancellable = {
@ -96,6 +97,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
* ticket #307
*/
"pick up schedule after actor restart" in {
object Ping
object Crash

View file

@ -22,7 +22,7 @@ object SupervisorHierarchySpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorHierarchySpec extends AkkaSpec {
class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
import SupervisorHierarchySpec._
"A Supervisor Hierarchy" must {
@ -52,7 +52,7 @@ class SupervisorHierarchySpec extends AkkaSpec {
val countDownMessages = new CountDownLatch(1)
val countDownMax = new CountDownLatch(1)
val boss = actorOf(Props(new Actor {
val crasher = watch(context.actorOf(Props(new CountDownActor(countDownMessages))))
val crasher = context.startsWatching(context.actorOf(Props(new CountDownActor(countDownMessages))))
protected def receive = {
case "killCrasher" crasher ! Kill

View file

@ -7,9 +7,10 @@ import akka.testkit.{ filterEvents, EventFilter }
import akka.dispatch.{ PinnedDispatcher, Dispatchers }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorMiscSpec extends AkkaSpec {
class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout {
"A Supervisor" must {

View file

@ -50,7 +50,7 @@ object SupervisorSpec {
}
class Master(sendTo: ActorRef) extends Actor {
val temp = watch(context.actorOf(Props(new PingPongActor(sendTo))))
val temp = context.startsWatching(context.actorOf(Props(new PingPongActor(sendTo))))
var s: ActorRef = _
@ -63,7 +63,7 @@ object SupervisorSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
import SupervisorSpec._

View file

@ -11,9 +11,10 @@ import akka.actor.Actor._
import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender {
class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout {
"In a 3 levels deep supervisor tree (linked in the constructor) we" must {

View file

@ -9,9 +9,10 @@ import org.scalatest.BeforeAndAfterAll
import akka.testkit.{ TestKit, filterEvents, EventFilter }
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender {
class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout {
import Ticket669Spec._
// TODO: does this really make sense?

View file

@ -16,6 +16,7 @@ import akka.serialization.SerializationExtension
import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.japi.{ Creator, Option JOption }
import akka.testkit.DefaultTimeout
object TypedActorSpec {
@ -80,7 +81,7 @@ object TypedActorSpec {
class Bar extends Foo with Serializable {
import TypedActor.{ dispatcher, timeout }
import TypedActor.dispatcher
def pigdog = "Pigdog"
@ -96,8 +97,10 @@ object TypedActorSpec {
new KeptPromise(Right(pigdog + numbered))
}
def futureComposePigdogFrom(foo: Foo): Future[String] =
def futureComposePigdogFrom(foo: Foo): Future[String] = {
implicit val timeout = TypedActor.system.settings.ActorTimeout
foo.futurePigdog(500).map(_.toUpperCase)
}
def optionPigdog(): Option[String] = Some(pigdog)
@ -157,7 +160,7 @@ object TypedActorSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfterAll {
class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout {
import TypedActorSpec._

View file

@ -53,7 +53,7 @@ object ActorModelSpec {
class DispatcherActor extends Actor {
private val busy = new Switch(false)
def interceptor = dispatcher.asInstanceOf[MessageDispatcherInterceptor]
def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor]
def ack {
if (!busy.switchOn()) {
@ -223,7 +223,7 @@ object ActorModelSpec {
}
}
abstract class ActorModelSpec extends AkkaSpec {
abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout {
import ActorModelSpec._
@ -343,7 +343,7 @@ abstract class ActorModelSpec extends AkkaSpec {
val waitTime = (30 seconds).dilated.toMillis
val boss = actorOf(Props(new Actor {
def receive = {
case "run" for (_ 1 to num) (watch(context.actorOf(props))) ! cachedMessage
case "run" for (_ 1 to num) (context.startsWatching(context.actorOf(props))) ! cachedMessage
case Terminated(child) stopLatch.countDown()
}
}).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss")))

View file

@ -7,6 +7,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher }
import akka.actor.{ Props, Actor }
import akka.util.Duration
import akka.util.duration._
import akka.testkit.DefaultTimeout
object DispatcherActorSpec {
class TestActor extends Actor {
@ -27,7 +28,7 @@ object DispatcherActorSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DispatcherActorSpec extends AkkaSpec {
class DispatcherActorSpec extends AkkaSpec with DefaultTimeout {
import DispatcherActorSpec._
private val unit = TimeUnit.MILLISECONDS

View file

@ -18,7 +18,7 @@ object PinnedActorSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach {
class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
import PinnedActorSpec._
private val unit = TimeUnit.MILLISECONDS

View file

@ -8,8 +8,9 @@ import akka.dispatch.Future
import akka.actor.future2actor
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
class Future2ActorSpec extends AkkaSpec {
class Future2ActorSpec extends AkkaSpec with DefaultTimeout {
"The Future2Actor bridge" must {

View file

@ -14,6 +14,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import org.scalatest.junit.JUnitSuite
import java.lang.ArithmeticException
import akka.testkit.DefaultTimeout
object FutureSpec {
class TestActor extends Actor {
@ -39,7 +40,7 @@ object FutureSpec {
class JavaFutureSpec extends JavaFutureTests with JUnitSuite
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll {
class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with DefaultTimeout {
import FutureSpec._
"A Promise" when {

View file

@ -3,9 +3,10 @@ package akka.dispatch
import akka.actor.{ Props, LocalActorRef, Actor }
import akka.testkit.AkkaSpec
import akka.util.Duration
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PriorityDispatcherSpec extends AkkaSpec {
class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout {
"A PriorityDispatcher" must {
"Order it's messages according to the specified comparator using an unbounded mailbox" in {

View file

@ -5,9 +5,10 @@ import akka.util.cps._
import akka.actor.Timeout
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class PromiseStreamSpec extends AkkaSpec {
class PromiseStreamSpec extends AkkaSpec with DefaultTimeout {
"A PromiseStream" must {

View file

@ -25,7 +25,7 @@ object EventStreamSpec {
case class SetTarget(ref: ActorRef)
class MyLog extends Actor {
var dst: ActorRef = system.deadLetters
var dst: ActorRef = context.system.deadLetters
def receive = {
case Logging.InitializeLogger(bus) bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized
case SetTarget(ref) dst = ref; dst ! "OK"

View file

@ -25,7 +25,7 @@ object ActorPoolSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TypedActorPoolSpec extends AkkaSpec {
class TypedActorPoolSpec extends AkkaSpec with DefaultTimeout {
import ActorPoolSpec._
"Actor Pool (2)" must {
"support typed actors" in {
@ -55,7 +55,7 @@ class TypedActorPoolSpec extends AkkaSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorPoolSpec extends AkkaSpec {
class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
import ActorPoolSpec._
"Actor Pool" must {

View file

@ -7,9 +7,10 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec
import akka.actor.DeploymentConfig._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfiguredLocalRoutingSpec extends AkkaSpec {
class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout {
val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer

View file

@ -20,7 +20,7 @@ object RoutingSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec {
class RoutingSpec extends AkkaSpec with DefaultTimeout {
val impl = system.asInstanceOf[ActorSystemImpl]

View file

@ -7,8 +7,9 @@ import org.scalatest.matchers.MustMatchers
import akka.dispatch.Future
import akka.testkit.AkkaSpec
import scala.util.Random
import akka.testkit.DefaultTimeout
class IndexSpec extends AkkaSpec with MustMatchers {
class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout {
private def emptyIndex = new Index[String, Int](100, _ compareTo _)

View file

@ -149,11 +149,10 @@ object Timeout {
implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout
}
trait ActorLogging { this: Actor
val log = akka.event.Logging(system.eventStream, context.self)
val log = akka.event.Logging(context.system.eventStream, context.self)
}
object Actor {
@ -190,6 +189,7 @@ trait Actor {
/**
* Stores the context for this actor, including self, sender, and hotswap.
* It is implicit to support operations such as `forward`.
*/
@transient
protected[akka] implicit val context: ActorContext = {
@ -211,13 +211,6 @@ trait Actor {
c
}
implicit def system = context.system
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
*/
implicit def defaultTimeout = system.settings.ActorTimeout
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>
@ -230,34 +223,11 @@ trait Actor {
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
* Is defined if the message was sent from another Actor,
* else `deadLetters` in [[akka.actor.ActorSystem]].
*/
final def sender: ActorRef = context.sender
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout: Option[Long] = context.receiveTimeout
/**
* User overridable callback/setting.
* <p/>
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout_=(timeout: Option[Long]) = context.receiveTimeout = timeout
/**
* Same as ActorContext.children
*/
def children: Iterable[ActorRef] = context.children
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def dispatcher: MessageDispatcher = context.dispatcher
/**
* User overridable callback/setting.
* <p/>
@ -325,30 +295,6 @@ trait Actor {
}
}
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) }
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
final def unbecome() { context.unbecome() }
/**
* Registers this actor as a Monitor for the provided ActorRef
* @return the provided ActorRef
*/
final def watch(subject: ActorRef): ActorRef = context startsWatching subject
/**
* Unregisters this actor as Monitor for the provided ActorRef
* @return the provided ActorRef
*/
final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject
// =========================================
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================

View file

@ -8,8 +8,10 @@ import akka.dispatch._
import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap }
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.{ Duration, Helpers }
import akka.japi.Procedure
/**
* The actor context - the view of the actor cell from the actor.
@ -20,14 +22,30 @@ trait ActorContext extends ActorRefFactory {
def self: ActorRef
def receiveTimeout: Option[Long]
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout: Option[Duration]
def receiveTimeout_=(timeout: Option[Long]): Unit
/**
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def receiveTimeout_=(timeout: Option[Duration]): Unit
def become(behavior: Actor.Receive, discardOld: Boolean): Unit
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit
def hotswap: Stack[PartialFunction[Any, Unit]]
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome(): Unit
def currentMessage: Envelope
@ -38,6 +56,9 @@ trait ActorContext extends ActorRefFactory {
def children: Iterable[ActorRef]
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def dispatcher: MessageDispatcher
def handleFailure(child: ActorRef, cause: Throwable): Unit
@ -48,11 +69,53 @@ trait ActorContext extends ActorRefFactory {
def parent: ActorRef
/**
* Registers this actor as a Monitor for the provided ActorRef
* @return the provided ActorRef
*/
def startsWatching(subject: ActorRef): ActorRef
/**
* Unregisters this actor as Monitor for the provided ActorRef
* @return the provided ActorRef
*/
def stopsWatching(subject: ActorRef): ActorRef
}
trait JavaActorContext extends ActorContext {
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getChildren(): java.lang.Iterable[ActorRef]
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def getReceiveTimeout: Option[Duration]
/**
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def setReceiveTimeout(timeout: Duration): Unit
/**
* Changes the Actor's behavior to become the new 'Procedure' handler.
* Puts the behavior on top of the hotswap stack.
*/
def become(behavior: Procedure[Any]): Unit
/**
* Changes the Actor's behavior to become the new 'Procedure' handler.
* Puts the behavior on top of the hotswap stack.
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit
}
private[akka] object ActorCell {
val contextStack = new ThreadLocal[Stack[ActorContext]] {
override def initialValue = Stack[ActorContext]()
@ -76,8 +139,8 @@ private[akka] class ActorCell(
val self: ActorRef with ScalaActorRef,
val props: Props,
val parent: ActorRef,
/*no member*/ _receiveTimeout: Option[Long],
var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext {
/*no member*/ _receiveTimeout: Option[Duration],
var hotswap: Stack[PartialFunction[Any, Unit]]) extends JavaActorContext {
import ActorCell._
@ -87,15 +150,28 @@ private[akka] class ActorCell(
final def provider = system.provider
override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None
override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None
override def receiveTimeout_=(timeout: Option[Long]): Unit = {
val timeoutMs = if (timeout.isDefined && timeout.get > 0) timeout.get else -1
override def receiveTimeout_=(timeout: Option[Duration]): Unit = {
val timeoutMs = if (timeout.isDefined && timeout.get.toMillis > 0) timeout.get.toMillis else -1
receiveTimeoutData = (timeoutMs, receiveTimeoutData._2)
}
/**
* In milliseconds
*/
var receiveTimeoutData: (Long, Cancellable) =
if (_receiveTimeout.isDefined) (_receiveTimeout.get, emptyCancellable) else emptyReceiveTimeoutData
if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData
/**
* JavaActorContext impl
*/
def getReceiveTimeout: Option[Duration] = receiveTimeout
/**
* JavaActorContext impl
*/
def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout)
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
@ -120,6 +196,11 @@ private[akka] class ActorCell(
@inline
final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
/**
* JavaActorContext impl
*/
def getDispatcher(): MessageDispatcher = dispatcher
final def isTerminated: Boolean = mailbox.isClosed
final def start(): Unit = {
@ -154,6 +235,14 @@ private[akka] class ActorCell(
final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child)
/**
* Impl JavaActorContext
*/
def getChildren(): java.lang.Iterable[ActorRef] = {
import scala.collection.JavaConverters.asJavaIterableConverter
asJavaIterableConverter(children).asJava
}
final def getChild(name: String): Option[ActorRef] =
if (isTerminated) None else childrenRefs.get(name).map(_.child)
@ -341,6 +430,19 @@ private[akka] class ActorCell(
hotswap = hotswap.push(behavior)
}
/**
* JavaActorContext impl
*/
def become(behavior: Procedure[Any]): Unit = become(behavior, false)
/*
* JavaActorContext impl
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit = {
def newReceive: Actor.Receive = { case msg behavior.apply(msg) }
become(newReceive, discardOld)
}
def unbecome() {
val h = hotswap
if (h.nonEmpty) hotswap = h.pop

View file

@ -139,7 +139,7 @@ class LocalActorRef private[akka] (
_supervisor: ActorRef,
val path: ActorPath,
val systemService: Boolean = false,
_receiveTimeout: Option[Long] = None,
_receiveTimeout: Option[Duration] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends ActorRef with ScalaActorRef with RefInternals {
@ -216,7 +216,7 @@ class LocalActorRef private[akka] (
/**
* This trait represents the Scala Actor API
* There are implicit conversions in ../actor/Implicits.scala
* There are implicit conversions in [[akka.actor]] package object
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef { ref: ActorRef

View file

@ -336,8 +336,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
// TODO why implicit val dispatcher?
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
val dispatcher = dispatcherFactory.defaultGlobalDispatcher
def terminationFuture: Future[Unit] = provider.terminationFuture
def guardian: ActorRef = provider.guardian

View file

@ -189,7 +189,7 @@ trait FSM[S, D] extends ListenerManagement {
type Timeout = Option[Duration]
type TransitionHandler = PartialFunction[(S, S), Unit]
val log = Logging(system, context.self)
val log = Logging(context.system, context.self)
/**
* ****************************************
@ -279,7 +279,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timers contains name) {
timers(name).cancel
}
val timer = Timer(name, msg, repeat, timerGen.next)
val timer = Timer(name, msg, repeat, timerGen.next)(context.system)
timer.schedule(self, timeout)
timers(name) = timer
stay
@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) {
val t = timeout.get
if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation)))
timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation)))
}
}
}
@ -566,7 +566,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
def logDepth: Int = 0
private val debugEvent = system.settings.FsmDebugEvent
private val debugEvent = context.system.settings.FsmDebugEvent
private val events = new Array[Event](logDepth)
private val states = new Array[AnyRef](logDepth)

View file

@ -239,7 +239,7 @@ class IOManager(bufferSize: Int = 8192) extends Actor {
var worker: IOWorker = _
override def preStart {
worker = new IOWorker(system, self, bufferSize)
worker = new IOWorker(context.system, self, bufferSize)
worker.start()
}

View file

@ -292,11 +292,6 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
*/
implicit def dispatcher = system.dispatcher
/**
* Returns the default timeout (for a TypedActor) when inside a method call in a TypedActor.
*/
implicit def timeout = system.settings.ActorTimeout
/**
* Implementation of TypedActor as an Actor
*/
@ -326,7 +321,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
def receive = {
case m: MethodCall
TypedActor.selfReference set proxyVar.get
TypedActor.currentSystem set system
TypedActor.currentSystem set context.system
try {
if (m.isOneWay) m(me)
else {

View file

@ -58,6 +58,8 @@ abstract class UntypedActor extends Actor {
@throws(classOf[Exception])
def onReceive(message: Any): Unit
def getContext(): JavaActorContext = context.asInstanceOf[JavaActorContext]
/**
* Returns the 'self' reference.
*/
@ -69,43 +71,6 @@ abstract class UntypedActor extends Actor {
*/
def getSender(): ActorRef = sender
/**
* Gets the current receive timeout
* When specified, the receive method should be able to handle a 'ReceiveTimeout' message.
*/
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def setReceiveTimeout(timeout: Long): Unit = receiveTimeout = Some(timeout)
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getChildren(): java.lang.Iterable[ActorRef] = {
import scala.collection.JavaConverters.asJavaIterableConverter
asJavaIterableConverter(context.children).asJava
}
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
*/
def getDispatcher(): MessageDispatcher = dispatcher
/**
* Java API for become
*/
def become(behavior: Procedure[Any]): Unit = become(behavior, false)
/*
* Java API for become with optional discardOld
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
super.become({ case msg behavior.apply(msg) }, discardOld)
/**
* User overridable callback.
* <p/>

View file

@ -49,7 +49,7 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas
def start(system: ActorSystemImpl) {
reaper = system.systemActorOf(Props(new Actor {
def receive = {
case ref: ActorRef watch(ref)
case ref: ActorRef context.startsWatching(ref)
case Terminated(ref) unsubscribe(ref)
}
}), "MainBusReaper")

View file

@ -120,7 +120,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
val requestedCapacity = capacity(_delegates)
val newDelegates = requestedCapacity match {
case qty if qty > 0
_delegates ++ Vector.fill(requestedCapacity)(watch(instance(defaultProps)))
_delegates ++ Vector.fill(requestedCapacity)(context.startsWatching(instance(defaultProps)))
case qty if qty < 0
_delegates.splitAt(_delegates.length + requestedCapacity) match {

View file

@ -15,7 +15,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers {
//#custom-config
val customConf = ConfigFactory.parseString("""
akka.actor.deployment {
/app/my-service {
/user/my-service {
router = round-robin
nr-of-instances = 3
}

View file

@ -13,7 +13,7 @@ import akka.event.Logging
//#my-actor
class MyActor extends Actor {
val log = Logging(system, this)
val log = Logging(context.system, this)
def receive = {
case "test" log.info("received test")
case _ log.info("received unknown message")

View file

@ -10,7 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec {
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" sender ! system.nodename
case "identify" sender ! context.system.nodename
}
}
}
@ -33,7 +33,7 @@ class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec {
}
}
class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec {
class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTimeout {
import DirectRoutedRemoteActorMultiJvmSpec._

View file

@ -2,13 +2,14 @@ package akka.remote.new_remote_actor
import akka.actor.Actor
import akka.remote._
import akka.testkit.DefaultTimeout
object NewRemoteActorMultiJvmSpec {
val NrOfNodes = 2
class SomeActor extends Actor with Serializable {
def receive = {
case "identify" sender ! system.nodename
case "identify" sender ! context.system.nodename
}
}
}
@ -32,7 +33,7 @@ class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec {
}
}
class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec {
class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTimeout {
import NewRemoteActorMultiJvmSpec._

View file

@ -4,12 +4,13 @@ import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object RandomRoutedRemoteActorMultiJvmSpec {
val NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! system.nodename
case "hit" sender ! context.system.nodename
case "end" self.stop()
}
}
@ -60,7 +61,7 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
}
}
class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec {
class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout {
import RandomRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a Random router" must {

View file

@ -4,12 +4,13 @@ import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object RoundRobinRoutedRemoteActorMultiJvmSpec {
val NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! system.nodename
case "hit" sender ! context.system.nodename
case "end" self.stop()
}
}
@ -60,7 +61,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
}
}
class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec {
class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout {
import RoundRobinRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a RoundRobin router" must {

View file

@ -4,12 +4,13 @@ import akka.actor.Actor
import akka.remote._
import akka.routing._
import akka.routing.Routing.Broadcast
import akka.testkit.DefaultTimeout
object ScatterGatherRoutedRemoteActorMultiJvmSpec {
val NrOfNodes = 4
class SomeActor extends Actor with Serializable {
def receive = {
case "hit" sender ! system.nodename
case "hit" sender ! context.system.nodename
case "end" self.stop()
}
}
@ -60,7 +61,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec {
}
}
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec {
class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout {
import ScatterGatherRoutedRemoteActorMultiJvmSpec._
val nodes = NrOfNodes
"A new remote actor configured with a ScatterGather router" must {

View file

@ -9,12 +9,13 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.remote.netty.NettyRemoteSupport
import akka.actor.Actor
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.dispatch.Future
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import java.util.concurrent.atomic.AtomicBoolean
trait NetworkFailureSpec { self: AkkaSpec
trait NetworkFailureSpec extends DefaultTimeout { self: AkkaSpec
import Actor._
import akka.util.Duration

View file

@ -25,6 +25,8 @@ object Think extends DiningHakkerMessage
*/
class Chopstick extends Actor {
import context._
//When a Chopstick is taken by a hakker
//It will refuse to be taken by other hakkers
//But the owning hakker can put it back
@ -51,6 +53,8 @@ class Chopstick extends Actor {
*/
class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
import context._
//When a hakker is thinking it can become hungry
//and try to pick up its chopsticks and eat
def thinking: Receive = {

View file

@ -33,6 +33,7 @@ case class TakenBy(hakker: ActorRef)
* A chopstick is an actor, it can be taken, and put back
*/
class Chopstick extends Actor with FSM[ChopstickState, TakenBy] {
import context._
// A chopstick begins its existence as available and taken by no one
startWith(Available, TakenBy(system.deadLetters))

View file

@ -15,12 +15,12 @@ object Main {
}
class HelloActor extends Actor {
val worldActor = system.actorOf[WorldActor]
val worldActor = context.actorOf[WorldActor]
def receive = {
case Start worldActor ! "Hello"
case s: String
println("Received message: %s".format(s))
system.stop()
context.system.stop()
}
}

View file

@ -613,3 +613,7 @@ object TestProbe {
trait ImplicitSender { this: TestKit
implicit def self = testActor
}
trait DefaultTimeout { this: TestKit
implicit val timeout = system.settings.ActorTimeout
}

View file

@ -37,6 +37,7 @@ object TestActorRefSpec {
}
class ReplyActor extends TActor {
implicit val system = context.system
var replyTo: ActorRef = null
def receiveT = {
@ -87,7 +88,7 @@ object TestActorRefSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
import TestActorRefSpec._
@ -156,7 +157,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
EventFilter[ActorKilledException]() intercept {
val a = TestActorRef(Props[WorkerActor])
val forwarder = actorOf(Props(new Actor {
watch(a)
context.startsWatching(a)
def receive = { case x testActor forward x }
}))
a.!(PoisonPill)(testActor)
@ -216,7 +217,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
"set receiveTimeout to None" in {
val a = TestActorRef[WorkerActor]
a.underlyingActor.receiveTimeout must be(None)
a.underlyingActor.context.receiveTimeout must be(None)
}
"set CallingThreadDispatcher" in {

View file

@ -8,7 +8,7 @@ import akka.dispatch.Future
import akka.util.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TestProbeSpec extends AkkaSpec {
class TestProbeSpec extends AkkaSpec with DefaultTimeout {
"A TestProbe" must {

View file

@ -106,16 +106,16 @@ public class Pi {
this.latch = latch;
Creator<Router> routerCreator = new Creator<Router>() {
public Router create() {
return new RoundRobinRouter(dispatcher(), new akka.actor.Timeout(-1));
return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1));
}
};
LinkedList<ActorRef> actors = new LinkedList<ActorRef>() {
{
for (int i = 0; i < nrOfWorkers; i++) add(context().actorOf(Worker.class));
for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class));
}
};
RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true);
router = new RoutedActorRef(system(), props, getSelf(), "pi");
router = new RoutedActorRef(getContext().system(), props, getSelf(), "pi");
}
// message handler

View file

@ -52,11 +52,14 @@ object Pi extends App {
var start: Long = _
// create the workers
val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker])
val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
// wrap them with a load-balancing router
// FIXME routers are intended to be used like this
implicit val timout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers))
val router = new RoutedActorRef(system, props, self, "pi")
val router = new RoutedActorRef(context.system, props, self, "pi")
// message handler
def receive = {