diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index b77283cefe..8a0d665d4d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -129,7 +129,7 @@ class ActorRefSpec extends WordSpec with MustMatchers { }) } - def refStackMustBeEmpty = Actor.actorRefInCreation.get.headOption must be === None + def refStackMustBeEmpty = ActorInstance.refStack.get.headOption must be === None refStackMustBeEmpty diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala index 6718a83a51..10db8c377b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRegistrySpec.scala @@ -62,9 +62,9 @@ class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll { def shouldFindThingsFromLocalActorRegistry { Actor.registry.local.shutdownAll val actor = actorOf[TestActor]("test-actor-1") - val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.actorInstance.get().isInstanceOf[TestActor] ⇒ a }) + val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] ⇒ a }) assert(found.isDefined) - assert(found.get.actorInstance.get().isInstanceOf[TestActor]) + assert(found.get.underlyingActorInstance.isInstanceOf[TestActor]) assert(found.get.address === "test-actor-1") actor.stop } @@ -76,8 +76,8 @@ class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll { val actor2 = actorOf[TestActor]("test-actor-2") val actors = Actor.registry.local.actors assert(actors.size === 2) - assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].actorInstance.get().isInstanceOf[TestActor]) - assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].actorInstance.get().isInstanceOf[TestActor]) + assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor]) + assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor]) actor1.stop actor2.stop } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 75fa689894..8acdf90075 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -120,12 +120,12 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit { object FSMTimingSpec { def suspend(actorRef: ActorRef): Unit = actorRef match { - case l: LocalActorRef ⇒ l.dispatcher.suspend(l) + case l: LocalActorRef ⇒ l.suspend() case _ ⇒ } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: LocalActorRef ⇒ l.dispatcher.resume(l) + case l: LocalActorRef ⇒ l.resume() case _ ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index c28d0afa29..5a5de4393d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -103,28 +103,28 @@ object ActorModelSpec { stats.get(actorRef) } - abstract override def suspend(actorRef: LocalActorRef) { - super.suspend(actorRef) - getStats(actorRef).suspensions.incrementAndGet() + abstract override def suspend(actor: ActorInstance) { + super.suspend(actor) + getStats(actor.ref).suspensions.incrementAndGet() } - abstract override def resume(actorRef: LocalActorRef) { - super.resume(actorRef) - getStats(actorRef).resumes.incrementAndGet() + abstract override def resume(actor: ActorInstance) { + super.resume(actor) + getStats(actor.ref).resumes.incrementAndGet() } - protected[akka] abstract override def register(actorRef: LocalActorRef) { - super.register(actorRef) - getStats(actorRef).registers.incrementAndGet() + protected[akka] abstract override def register(actor: ActorInstance) { + super.register(actor) + getStats(actor.ref).registers.incrementAndGet() } - protected[akka] abstract override def unregister(actorRef: LocalActorRef) { - super.unregister(actorRef) - getStats(actorRef).unregisters.incrementAndGet() + protected[akka] abstract override def unregister(actor: ActorInstance) { + super.unregister(actor) + getStats(actor.ref).unregisters.incrementAndGet() } protected[akka] abstract override def dispatch(invocation: MessageInvocation) { - getStats(invocation.receiver).msgsReceived.incrementAndGet() + getStats(invocation.receiver.ref).msgsReceived.incrementAndGet() super.dispatch(invocation) } @@ -342,12 +342,12 @@ abstract class ActorModelSpec extends JUnitSuite { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor.asInstanceOf[LocalActorRef] val done = new CountDownLatch(1) - dispatcher.suspend(a) + a.suspend a ! CountDown(done) assertNoCountDown(done, 1000, "Should not process messages while suspended") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) - dispatcher.resume(a) + a.resume assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) @@ -379,11 +379,11 @@ abstract class ActorModelSpec extends JUnitSuite { def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor.asInstanceOf[LocalActorRef] - dispatcher.suspend(a) + a.suspend val f1: Future[String] = a ? Reply("foo") mapTo manifest[String] val stopped = a ? PoisonPill val shouldBeCompleted = for (i ← 1 to 10) yield a ? Reply(i) - dispatcher.resume(a) + a.resume assert(f1.get === "foo") stopped.await for (each ← shouldBeCompleted) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index b587719b32..892fcb992e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -80,11 +80,11 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) - slow.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) - fast.actorInstance.get().asInstanceOf[DelayableActor].invocationCount must be > sentToFast - fast.actorInstance.get().asInstanceOf[DelayableActor].invocationCount must be > - (slow.actorInstance.get().asInstanceOf[DelayableActor].invocationCount) + fast.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + slow.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true) + fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast + fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > + (slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount) slow.stop() fast.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index b861e4a68d..c9ceab557e 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -84,7 +84,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte new MessageInvocation( actorOf(new Actor { //Dummy actor def receive = { case _ ⇒ } - }).asInstanceOf[LocalActorRef], msg, NullChannel) + }).asInstanceOf[LocalActorRef].underlying, msg, NullChannel) } def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index d6b490d00e..57b60d650a 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -35,12 +35,12 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers { } }).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef] - dispatcher.suspend(actor) //Make sure the actor isn't treating any messages, let it buffer the incoming messages + actor.suspend //Make sure the actor isn't treating any messages, let it buffer the incoming messages val msgs = (1 to 100).toList for (m ← msgs) actor ! m - dispatcher.resume(actor) //Signal the actor to start treating it's message backlog + actor.resume //Signal the actor to start treating it's message backlog actor.?('Result).as[List[Int]].get must be === (msgs.reverse) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index dca202fb71..bc197fecb5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -19,7 +19,6 @@ import akka.event.EventHandler import akka.experimental import akka.AkkaException -import scala.collection.immutable.Stack import scala.reflect.BeanProperty import com.eaio.uuid.UUID @@ -174,10 +173,6 @@ object Actor { */ type Receive = PartialFunction[Any, Unit] - private[actor] val actorRefInCreation = new ThreadLocal[Stack[ScalaActorRef with SelfActorRef]] { - override def initialValue = Stack[ScalaActorRef with SelfActorRef]() - } - private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) @@ -467,7 +462,7 @@ trait Actor { */ @transient val someSelf: Some[ScalaActorRef with SelfActorRef] = { - val refStack = Actor.actorRefInCreation.get + val refStack = ActorInstance.refStack.get if (refStack.isEmpty) throw new ActorInitializationException( "\n\tYou can not create an instance of an " + getClass.getName + " explicitly using 'new MyActor'." + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + @@ -481,7 +476,7 @@ trait Actor { throw new ActorInitializationException("Trying to create an instance of " + getClass.getName + " outside of a wrapping 'actorOf'") else { // Push a null marker so any subsequent calls to new Actor doesn't reuse this actor ref - Actor.actorRefInCreation.set(refStack.push(null)) + ActorInstance.refStack.set(refStack.push(null)) Some(ref) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorInstance.scala b/akka-actor/src/main/scala/akka/actor/ActorInstance.scala new file mode 100644 index 0000000000..6864486989 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/ActorInstance.scala @@ -0,0 +1,433 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.actor + +import akka.event.EventHandler +import akka.config.Supervision._ +import akka.dispatch._ +import akka.util._ +import java.util.{ Collection ⇒ JCollection } +import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec +import scala.collection.immutable.Stack + +private[akka] object ActorInstance { + sealed trait Status + object Status { + object Unstarted extends Status + object Running extends Status + object BeingRestarted extends Status + object Shutdown extends Status + } + + val refStack = new ThreadLocal[Stack[ScalaActorRef with SelfActorRef]] { + override def initialValue = Stack[ScalaActorRef with SelfActorRef]() + } +} + +private[akka] class ActorInstance(props: Props, self: LocalActorRef) { + import ActorInstance._ + + val guard = new ReentrantGuard // TODO: remove this last synchronization point + + @volatile + var status: Status = Status.Unstarted + + @volatile + var mailbox: AnyRef = _ + + @volatile + var futureTimeout: Option[ScheduledFuture[AnyRef]] = None + + @volatile + var _supervisor: Option[ActorRef] = None + + @volatile + var maxNrOfRetriesCount: Int = 0 + + @volatile + var restartTimeWindowStartNanos: Long = 0L + + @volatile + lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] + + val actor: AtomicReference[Actor] = new AtomicReference[Actor]() + + def ref: ActorRef = self + + def uuid: Uuid = self.uuid + + def actorClass: Class[_] = actor.get.getClass + + def dispatcher: MessageDispatcher = props.dispatcher + + def isRunning: Boolean = status match { + case Status.BeingRestarted | Status.Running ⇒ true + case _ ⇒ false + } + + def isShutdown: Boolean = status == Status.Shutdown + + def start(): Unit = guard.withGuard { + if (isShutdown) throw new ActorStartException("Can't start an actor that has been stopped") + if (!isRunning) { + if (props.supervisor.isDefined) props.supervisor.get.link(self) + actor.set(newActor) + dispatcher.attach(this) + status = Status.Running + try { + val a = actor.get + if (Actor.debugLifecycle) EventHandler.debug(a, "started") + a.preStart() + Actor.registry.register(self) + checkReceiveTimeout // schedule the initial receive timeout + } catch { + case e ⇒ + status = Status.Unstarted + throw e + } + } + } + + def newActor: Actor = { + val stackBefore = refStack.get + refStack.set(stackBefore.push(self)) + try { + if (status == Status.BeingRestarted) { + val a = actor.get() + val fresh = try a.freshInstance catch { + case e ⇒ + EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory") + None + } + fresh match { + case Some(actor) ⇒ actor + case None ⇒ props.creator() + } + } else { + props.creator() + } + } finally { + val stackAfter = refStack.get + if (stackAfter.nonEmpty) + refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self + } + } match { + case null ⇒ throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") + case valid ⇒ valid + } + + def suspend(): Unit = dispatcher.suspend(this) + + def resume(): Unit = dispatcher.resume(this) + + def stop(): Unit = guard.withGuard { + if (isRunning) { + self.receiveTimeout = None + cancelReceiveTimeout + Actor.registry.unregister(self) + status = Status.Shutdown + dispatcher.detach(this) + try { + val a = actor.get + if (Actor.debugLifecycle) EventHandler.debug(a, "stopping") + a.postStop() + stopSupervisedActors() + } finally { + self.currentMessage = null + setActorSelf(null) + } + } + } + + def stopSupervisedActors(): Unit = guard.withGuard { + val i = _linkedActors.values.iterator + while (i.hasNext) { + i.next.stop() + i.remove() + } + } + + def link(actorRef: ActorRef): ActorRef = { + guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == self.uuid) return actorRef // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(self) + } + } + if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + actorRef) + actorRef + } + + def unlink(actorRef: ActorRef): ActorRef = { + guard.withGuard { + if (_linkedActors.remove(actorRef.uuid) eq null) + throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") + actorRef.supervisor = None + if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped supervising " + actorRef) + } + actorRef + } + + def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values) + + def supervisor: Option[ActorRef] = _supervisor + + def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup + + def sender: Option[ActorRef] = { + val msg = self.currentMessage + if (msg eq null) None + else msg.channel match { + case ref: ActorRef ⇒ Some(ref) + case _ ⇒ None + } + } + + def senderFuture(): Option[Promise[Any]] = { + val msg = self.currentMessage + if (msg eq null) None + else msg.channel match { + case f: ActorPromise ⇒ Some(f) + case _ ⇒ None + } + } + + def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = + if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel) + else throw new ActorInitializationException("Actor has not been started") + + def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Timeout, + channel: UntypedChannel): Future[Any] = if (isRunning) { + val future = channel match { + case f: ActorPromise ⇒ f + case _ ⇒ new ActorPromise(timeout)(dispatcher) + } + dispatcher dispatchMessage new MessageInvocation(this, message, future) + future + } else throw new ActorInitializationException("Actor has not been started") + + def invoke(messageHandle: MessageInvocation): Unit = { + guard.lock.lock() + try { + if (!isShutdown) { + self.currentMessage = messageHandle + try { + try { + cancelReceiveTimeout() // FIXME: leave this here? + actor.get().apply(messageHandle.message) + self.currentMessage = null // reset current message after successful invocation + } catch { + case e ⇒ + EventHandler.error(e, self, e.getMessage) + + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + + self.channel.sendException(e) + + if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this) + + if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected + } finally { + checkReceiveTimeout // Reschedule receive timeout + } + } catch { + case e ⇒ + EventHandler.error(e, actor.get(), e.getMessage) + throw e + } + } else { + // throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side + } + } finally { + guard.lock.unlock() + } + } + + def handleDeath(death: Death) { + props.faultHandler match { + case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + restartLinkedActors(death.cause, maxRetries, within) + + case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + restartLinkedActors(death.cause, None, None) + + case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + death.deceased.restart(death.cause, maxRetries, within) + + case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + unlink(death.deceased) + death.deceased.stop() + self ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause) + + case _ ⇒ + if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here + } + } + + def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + def performRestart() { + val failedActor = actor.get + if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") + val message = if (self.currentMessage ne null) Some(self.currentMessage.message) else None + failedActor.preRestart(reason, message) + val freshActor = newActor + setActorSelf(null) // only null out the references if we could instantiate the new actor + actor.set(freshActor) // assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.postRestart(reason) + if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") + } + + @tailrec + def attemptRestart() { + val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { + guard.withGuard[Boolean] { + status = Status.BeingRestarted + + val success = + try { + performRestart() + true + } catch { + case e ⇒ + EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString)) + false // an error or exception here should trigger a retry + } finally { + self.currentMessage = null + } + + if (success) { + status = Status.Running + dispatcher.resume(this) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + } + success + } + } else { + // tooManyRestarts + if (supervisor.isDefined) + supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(self, maxNrOfRetries, withinTimeRange, reason) + stop() + true // done + } + + if (success) () // alles gut + else attemptRestart() + } + + attemptRestart() // recur + } + + def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { + // immortal + false + } else if (withinTimeRange.isEmpty) { + // restrict number of restarts + val retries = maxNrOfRetriesCount + 1 + maxNrOfRetriesCount = retries //Increment number of retries + retries > maxNrOfRetries.get + } else { + // cannot restart more than N within M timerange + val retries = maxNrOfRetriesCount + 1 + + val windowStart = restartTimeWindowStartNanos + val now = System.nanoTime + // we are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get) + + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartTimeWindowStartNanos = now + + // reset number of restarts if window has expired, otherwise, increment it + maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries // increment number of retries + + val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1 + + // the actor is dead if it dies X times within the window of restart + insideWindow && retries > restartCountLimit + } + + denied == false // if we weren't denied, we have a go + } + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { + props.faultHandler.lifeCycle match { + case Temporary ⇒ + val i = _linkedActors.values.iterator + while (i.hasNext) { + val actorRef = i.next() + + i.remove() + + actorRef.stop() + // when this comes down through the handleDeath path, we get here when the temp actor is restarted + if (supervisor.isDefined) { + supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason) + + //FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist? + if (!i.hasNext) + supervisor.get ! UnlinkAndStop(self) + } + } + + case Permanent ⇒ + val i = _linkedActors.values.iterator + while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange) + } + } + + def checkReceiveTimeout() { + cancelReceiveTimeout() + val recvtimeout = self.receiveTimeout + if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { + //Only reschedule if desired and there are currently no more messages to be processed + futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) + } + } + + def cancelReceiveTimeout() { + if (futureTimeout.isDefined) { + futureTimeout.get.cancel(true) + futureTimeout = None + } + } + + def setActorSelf(value: ActorRef): Unit = { + @tailrec + def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = { + val success = try { + val selfField = clazz.getDeclaredField("self") + val someSelfField = clazz.getDeclaredField("someSelf") + selfField.setAccessible(true) + someSelfField.setAccessible(true) + selfField.set(actor, value) + someSelfField.set(actor, if (value ne null) Some(value) else null) + true + } catch { + case e: NoSuchFieldException ⇒ false + } + + if (success) true + else { + val parent = clazz.getSuperclass + if (parent eq null) + throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + lookupAndSetSelfFields(parent, actor, value) + } + } + + lookupAndSetSelfFields(actor.get.getClass, actor.get, value) + } +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bc0a3126f6..063394c4ee 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -4,169 +4,16 @@ package akka.actor -import akka.event.EventHandler import akka.dispatch._ -import akka.config._ import akka.config.Supervision._ import akka.util._ import akka.serialization.{ Serializer, Serialization } import ReflectiveAccess._ import ClusterModule._ import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.{ Collection ⇒ JCollection } - import scala.collection.immutable.Stack -import scala.annotation.tailrec import java.lang.{ UnsupportedOperationException, IllegalStateException } -import akka.japi.Creator - -private[akka] object ActorRefInternals { - - /** - * LifeCycles for ActorRefs. - */ - private[akka] sealed trait StatusType - - object UNSTARTED extends StatusType - - object RUNNING extends StatusType - - object BEING_RESTARTED extends StatusType - - object SHUTDOWN extends StatusType - -} - -/** - * ActorRef configuration object, this is threadsafe and fully sharable - * - * Props() returns default configuration - * FIXME document me - */ -object Props { - final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") - final val defaultDeployId: String = "" - final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher - final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) - final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None) - final val defaultSupervisor: Option[ActorRef] = None - - /** - * The default Props instance, uses the settings from the Props object starting with default* - */ - final val default = new Props() - - /** - * Returns a cached default implementation of Props - */ - def apply(): Props = default - - /** - * Returns a Props that has default values except for "creator" which will be a function that creates an instance - * of the supplied type using the default constructor - */ - def apply[T <: Actor: ClassManifest]: Props = - default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance) - - /** - * Returns a Props that has default values except for "creator" which will be a function that creates an instance - * of the supplied class using the default constructor - */ - def apply(actorClass: Class[_ <: Actor]): Props = - default.withCreator(actorClass.newInstance) - - /** - * Returns a Props that has default values except for "creator" which will be a function that creates an instance - * using the supplied thunk - */ - def apply(creator: ⇒ Actor): Props = default.withCreator(creator) - - /** - * Returns a Props that has default values except for "creator" which will be a function that creates an instance - * using the supplied thunk - */ - def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create) - - def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(self) }) -} - -/** - * ActorRef configuration object, this is thread safe and fully sharable - */ -case class Props(creator: () ⇒ Actor = Props.defaultCreator, - deployId: String = Props.defaultDeployId, - @transient dispatcher: MessageDispatcher = Props.defaultDispatcher, - timeout: Timeout = Props.defaultTimeout, - faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, - supervisor: Option[ActorRef] = Props.defaultSupervisor) { - /** - * No-args constructor that sets all the default values - * Java API - */ - def this() = this( - creator = Props.defaultCreator, - deployId = Props.defaultDeployId, - dispatcher = Props.defaultDispatcher, - timeout = Props.defaultTimeout, - faultHandler = Props.defaultFaultHandler, - supervisor = Props.defaultSupervisor) - - /** - * Returns a new Props with the specified creator set - * Scala API - */ - def withCreator(c: ⇒ Actor) = copy(creator = () ⇒ c) - - /** - * Returns a new Props with the specified creator set - * Java API - */ - def withCreator(c: Creator[Actor]) = copy(creator = () ⇒ c.create) - - /** - * Returns a new Props with the specified deployId set - * Java and Scala API - */ - def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id) - - /** - * Returns a new Props with the specified dispatcher set - * Java API - */ - def withDispatcher(d: MessageDispatcher) = copy(dispatcher = d) - - /** - * Returns a new Props with the specified timeout set - * Java API - */ - def withTimeout(t: Timeout) = copy(timeout = t) - - /** - * Returns a new Props with the specified faulthandler set - * Java API - */ - def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) - - /** - * Returns a new Props with the specified supervisor set, if null, it's equivalent to withSupervisor(Option.none()) - * Java API - */ - def withSupervisor(s: ActorRef) = copy(supervisor = Option(s)) - - /** - * Returns a new Props with the specified supervisor set - * Java API - */ - def withSupervisor(s: akka.japi.Option[ActorRef]) = copy(supervisor = s.asScala) - - /** - * Returns a new Props with the specified supervisor set - * Scala API - */ - def withSupervisor(s: scala.Option[ActorRef]) = copy(supervisor = s) -} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -204,17 +51,8 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha private[akka] val uuid = newUuid - @volatile - protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED - def address: String - /** - * This is a reference to the message currently being processed by the actor - */ - @volatile - protected[akka] var currentMessage: MessageInvocation = null - /** * Comparison only takes address into account. */ @@ -266,13 +104,21 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha else forward(message)(sender) } + /** + * Suspends the actor. It will not process messages while suspended. + */ + def suspend(): Unit + + /** + * Resumes a suspended actor. + */ + def resume(): Unit + /** * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit() { - stop() - } + def exit(): Unit = stop() /** * Shuts down the actor its dispatcher and message queue. @@ -282,15 +128,12 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha /** * Is the actor running? */ - def isRunning: Boolean = _status match { //TODO Remove this method - case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING ⇒ true - case _ ⇒ false - } + def isRunning: Boolean // TODO remove this method /** * Is the actor shut down? */ - def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN + def isShutdown: Boolean /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -347,6 +190,12 @@ abstract class SelfActorRef extends ActorRef with ForwardableChannel { self: Loc @volatile protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]() + /** + * This is a reference to the message currently being processed by the actor + */ + @volatile + protected[akka] var currentMessage: MessageInvocation = null + /** * User overridable callback/setting. *

@@ -480,65 +329,6 @@ class LocalActorRef private[akka] ( override private[akka] val uuid: Uuid = newUuid) extends SelfActorRef with ScalaActorRef { - protected[akka] val guard = new ReentrantGuard //TODO FIXME remove the last synchronization point - - @volatile - protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None - - @volatile - private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] - - @volatile - private[akka] var _supervisor: Option[ActorRef] = None - - @volatile - private var maxNrOfRetriesCount: Int = 0 - - @volatile - private var restartTimeWindowStartNanos: Long = 0L - - @volatile - protected[akka] var mailbox: AnyRef = _ - - protected[akka] val actorInstance = guard.withGuard { - new AtomicReference[Actor]({ - if (props.supervisor.isDefined) props.supervisor.get.link(this) - newActor - }) - } //TODO Why is the guard needed here? - - protected[akka] override def timeout: Long = props.timeout.duration.toMillis //TODO remove this if possible - - private def serializer: Serializer = //TODO Is this used or needed? - try { Serialization.serializerFor(this.getClass) } catch { - case e: Exception ⇒ throw new akka.config.ConfigurationException( - "Could not create Serializer object for [" + this.getClass.getName + "]") - } - - private lazy val hasReplicationStorage: Boolean = if (!systemService) { - import DeploymentConfig._ - isReplicated(replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)) - } else false - - private lazy val replicationStorage: Option[TransactionLog] = if (!systemService) { - import DeploymentConfig._ - val replicationScheme = replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient) - if (isReplicated(replicationScheme)) { - if (isReplicatedWithTransactionLog(replicationScheme)) { - EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]".format(address, replicationScheme)) - - Some(transactionLog.newLogFor(uuid.toString, isWriteBehindReplication(replicationScheme), replicationScheme)) //TODO FIXME @jboner shouldn't this be address? - } else if (isReplicatedWithDataGrid(replicationScheme)) { - throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported") - } else { - throw new ConfigurationException("Unknown replication storage type [" + replicationScheme + "]") - } - } else None - } else None - - // If it was started inside "newActor", initialize it - if (isRunning) initializeActorInstance - // used only for deserialization private[akka] def this( __uuid: Uuid, @@ -551,10 +341,22 @@ class LocalActorRef private[akka] ( hotswap = __hotswap receiveTimeout = __receiveTimeout - setActorSelfFields(actorInstance.get(), this) //TODO Why is this needed? + actorInstance.setActorSelf(this) // TODO: why is this needed? } - // ========= PUBLIC FUNCTIONS ========= + private[this] val actorInstance = new ActorInstance(props, this) + + actorInstance.start() + + /** + * Is the actor running? + */ + def isRunning: Boolean = actorInstance.isRunning + + /** + * Is the actor shut down? + */ + def isShutdown: Boolean = actorInstance.isShutdown /** * Returns the dispatcher (MessageDispatcher) that is used for this Actor @@ -562,61 +364,19 @@ class LocalActorRef private[akka] ( def dispatcher: MessageDispatcher = props.dispatcher /** - * Starts up the actor and its message queue. + * Suspends the actor. It will not process messages while suspended. */ - protected[akka] def startInternal(): this.type = guard.withGuard[this.type] { - if (isShutdown) throw new ActorStartException("Can't restart an actor that has been shut down with 'stop' or 'exit'") - if (!isRunning) { - dispatcher.attach(this) - _status = ActorRefInternals.RUNNING - try { - // If we are not currently creating this ActorRef instance - if ((actorInstance ne null) && (actorInstance.get ne null)) - initializeActorInstance - - checkReceiveTimeout //Schedule the initial Receive timeout - } catch { - case e ⇒ - _status = ActorRefInternals.UNSTARTED - throw e - } - } - this - } + def suspend(): Unit = actorInstance.suspend() /** - * Shuts down the actor its dispatcher and message queue. + * Resumes a suspended actor. */ - def stop() { - guard.withGuard { - if (isRunning) { - receiveTimeout = None - cancelReceiveTimeout - Actor.registry.unregister(this) + def resume(): Unit = actorInstance.resume() - // This lines can trigger cluster start which makes cluster ZK client hang trying to reconnect indefinitely - //if (ClusterModule.isEnabled) Actor.remote.unregister(this) - _status = ActorRefInternals.SHUTDOWN - dispatcher.detach(this) - try { - val a = actorInstance.get() - if (Actor.debugLifecycle) EventHandler.debug(a, "stopping") - a.postStop() - } finally { - currentMessage = null - try { //When a supervisor is stopped, it's linked actors should also be stopped - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove - } - } finally { setActorSelfFields(actorInstance.get, null) } - } - } - - if (hasReplicationStorage) replicationStorage.get.delete() //TODO shouldn't this be inside the if (isRunning?) - } - } + /** + * Shuts down the actor and its message queue + */ + def stop(): Unit = actorInstance.stop() /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -629,21 +389,7 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. * Returns the ref that was passed into it */ - def link(actorRef: ActorRef): ActorRef = { - guard.withGuard { - val actorRefSupervisor = actorRef.supervisor - val hasSupervisorAlready = actorRefSupervisor.isDefined - if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return actorRef // we already supervise this guy - else if (hasSupervisorAlready) throw new IllegalActorStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - else { - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) - } - } - if (Actor.debugLifecycle) EventHandler.debug(actorInstance.get(), "now supervising " + actorRef) - actorRef - } + def link(actorRef: ActorRef): ActorRef = actorInstance.link(actorRef) /** * Unlink the actor. @@ -651,50 +397,69 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. * Returns the ref that was passed into it */ - def unlink(actorRef: ActorRef): ActorRef = { - guard.withGuard { - if (_linkedActors.remove(actorRef.uuid) eq null) - throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") - actorRef.supervisor = None - if (Actor.debugLifecycle) EventHandler.debug(actorInstance.get(), "stopped supervising " + actorRef) - } - actorRef - } + def unlink(actorRef: ActorRef): ActorRef = actorInstance.unlink(actorRef) + + /** + * Returns an unmodifiable Java Collection containing the linked actors + */ + def linkedActors: JCollection[ActorRef] = actorInstance.linkedActors + + /** + * Returns the supervisor, if there is one. + */ + def supervisor: Option[ActorRef] = actorInstance.supervisor /** * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ @deprecated("will be removed in 2.0, use channel instead", "1.2") - def sender: Option[ActorRef] = { - val msg = currentMessage - if (msg eq null) None - else msg.channel match { - case ref: ActorRef ⇒ Some(ref) - case _ ⇒ None - } - } + def sender: Option[ActorRef] = actorInstance.sender /** * The reference sender future of the last received message. * Is defined if the message was sent with sent with '?'/'ask', else None. */ @deprecated("will be removed in 2.0, use channel instead", "1.2") - def senderFuture(): Option[Promise[Any]] = { - val msg = currentMessage - if (msg eq null) None - else msg.channel match { - case f: ActorPromise ⇒ Some(f) - case _ ⇒ None - } - } - - /** - * Returns the supervisor, if there is one. - */ - def supervisor: Option[ActorRef] = _supervisor + def senderFuture(): Option[Promise[Any]] = actorInstance.senderFuture // ========= AKKA PROTECTED FUNCTIONS ========= + + protected[akka] def actorClass: Class[_] = actorInstance.actorClass + + protected[akka] def underlying: ActorInstance = actorInstance + + protected[akka] def underlyingActorInstance: Actor = actorInstance.actor.get + + protected[akka] override def timeout: Long = props.timeout.duration.toMillis // TODO: remove this if possible + + protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = { + actorInstance.supervisor = sup + } + + protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = { + actorInstance.postMessageToMailbox(message, channel) + } + + protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( + message: Any, + timeout: Timeout, + channel: UntypedChannel): Future[Any] = { + actorInstance.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) + } + + protected[akka] def handleDeath(death: Death): Unit = actorInstance.handleDeath(death) + + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { + actorInstance.restart(reason, maxNrOfRetries, withinTimeRange) + } + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { + actorInstance.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + } + + // ========= PRIVATE FUNCTIONS ========= + @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = { val inetaddr = @@ -702,286 +467,6 @@ class LocalActorRef private[akka] ( else ReflectiveAccess.RemoteModule.configDefaultAddress SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout) } - - protected[akka] def supervisor_=(sup: Option[ActorRef]) { - _supervisor = sup - } - - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = - if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel) - else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor' before using it") - - protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout( - message: Any, - timeout: Timeout, - channel: UntypedChannel): Future[Any] = if (isRunning) { - val future = channel match { - case f: ActorPromise ⇒ f - case _ ⇒ new ActorPromise(timeout)(dispatcher) - } - dispatcher dispatchMessage new MessageInvocation(this, message, future) - future - } else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor' before using it") - - /** - * Callback for the dispatcher. This is the single entry point to the user Actor implementation. - */ - protected[akka] def invoke(messageHandle: MessageInvocation) { - guard.lock.lock() - try { - if (!isShutdown) { - currentMessage = messageHandle - try { - try { - cancelReceiveTimeout() // FIXME: leave this here? - actorInstance.get().apply(messageHandle.message) - currentMessage = null // reset current message after successful invocation - } catch { - case e ⇒ - EventHandler.error(e, this, e.getMessage) - - //Prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - channel.sendException(e) - - if (supervisor.isDefined) supervisor.get ! Death(this, e, true) else dispatcher.resume(this) - if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected - } finally { - checkReceiveTimeout // Reschedule receive timeout - } - } catch { - case e ⇒ - EventHandler.error(e, actorInstance.get(), e.getMessage) - throw e - } - } else { - // throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side - } - } finally { - guard.lock.unlock() - if (hasReplicationStorage) replicationStorage.get.recordEntry(messageHandle, this) - } - } - - protected[akka] def handleDeath(death: Death) { - props.faultHandler match { - case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - restartLinkedActors(death.cause, maxRetries, within) - - case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - restartLinkedActors(death.cause, None, None) - - case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - death.deceased.restart(death.cause, maxRetries, within) - - case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - unlink(death.deceased) - death.deceased.stop() - this ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause) - - case _ ⇒ - if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here - } - } - - private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { - - val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { - //Immortal - false - } else if (withinTimeRange.isEmpty) { - // restrict number of restarts - val retries = maxNrOfRetriesCount + 1 - maxNrOfRetriesCount = retries //Increment number of retries - retries > maxNrOfRetries.get - } else { - // cannot restart more than N within M timerange - val retries = maxNrOfRetriesCount + 1 - - val windowStart = restartTimeWindowStartNanos - val now = System.nanoTime - //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get) - - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartTimeWindowStartNanos = now - - //Reset number of restarts if window has expired, otherwise, increment it - maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries //Increment number of retries - - val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1 - - //The actor is dead if it dies X times within the window of restart - insideWindow && retries > restartCountLimit - } - - denied == false //If we weren't denied, we have a go - } - - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - def performRestart() { - val failedActor = actorInstance.get - if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - val message = if (currentMessage ne null) Some(currentMessage.message) else None - failedActor.preRestart(reason, message) - val freshActor = newActor - setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.postRestart(reason) - if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") - } - - @tailrec - def attemptRestart() { - val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) { - guard.withGuard[Boolean] { - _status = ActorRefInternals.BEING_RESTARTED - - val success = - try { - performRestart() - true - } catch { - case e ⇒ - EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString)) - false // an error or exception here should trigger a retry - } finally { - currentMessage = null - } - - if (success) { - _status = ActorRefInternals.RUNNING - dispatcher.resume(this) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - } - success - } - } else { - // tooManyRestarts - if (supervisor.isDefined) - supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - stop() - true // done - } - - if (success) () // alles gut - else attemptRestart() - } - - attemptRestart() // recur - } - - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = - props.faultHandler.lifeCycle match { - case Temporary ⇒ - val i = _linkedActors.values.iterator - while (i.hasNext) { - val actorRef = i.next() - - i.remove() - - actorRef.stop() - // when this comes down through the handleDeath path, we get here when the temp actor is restarted - if (supervisor.isDefined) { - supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason) - - //FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist? - if (!i.hasNext) - supervisor.get ! UnlinkAndStop(this) - } - } - - case Permanent ⇒ - val i = _linkedActors.values.iterator - while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange) - } - - def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values) - - // ========= PRIVATE FUNCTIONS ========= - - private[this] def newActor: Actor = { - import Actor.{ actorRefInCreation ⇒ refStack } - val stackBefore = refStack.get - refStack.set(stackBefore.push(this)) - try { - if (_status == ActorRefInternals.BEING_RESTARTED) { - val a = actorInstance.get() - val fresh = try a.freshInstance catch { - case e ⇒ - EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory") - None - } - fresh match { - case Some(ref) ⇒ ref - case None ⇒ props.creator() - } - } else { - props.creator() - } - } finally { - val stackAfter = refStack.get - if (stackAfter.nonEmpty) - refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self - } - } match { - case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - case valid ⇒ valid - } - - private def setActorSelfFields(actor: Actor, value: ActorRef) { - - @tailrec - def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = { - val success = try { - val selfField = clazz.getDeclaredField("self") - val someSelfField = clazz.getDeclaredField("someSelf") - selfField.setAccessible(true) - someSelfField.setAccessible(true) - selfField.set(actor, value) - someSelfField.set(actor, if (value ne null) Some(value) else null) - true - } catch { - case e: NoSuchFieldException ⇒ false - } - - if (success) true - else { - val parent = clazz.getSuperclass - if (parent eq null) - throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") - lookupAndSetSelfFields(parent, actor, value) - } - } - - lookupAndSetSelfFields(actorInstance.get().getClass, actorInstance.get(), value) - } - - private def initializeActorInstance() { - val a = actorInstance.get() - if (Actor.debugLifecycle) EventHandler.debug(a, "started") - a.preStart() // run actor preStart - Actor.registry.register(this) - } - - protected[akka] def checkReceiveTimeout() { - cancelReceiveTimeout() - val recvtimeout = receiveTimeout - if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { - //Only reschedule if desired and there are currently no more messages to be processed - _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS)) - } - } - - protected[akka] def cancelReceiveTimeout() { - if (_futureTimeout.isDefined) { - _futureTimeout.get.cancel(true) - _futureTimeout = None - } - } - - startInternal() } /** @@ -1006,6 +491,13 @@ private[akka] case class RemoteActorRef private[akka] ( loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { + @volatile + private var running: Boolean = true + + def isRunning: Boolean = running + + def isShutdown: Boolean = !running + ClusterModule.ensureEnabled() protected[akka] override def timeout: Long = _timeout @@ -1028,10 +520,14 @@ private[akka] case class RemoteActorRef private[akka] ( else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } + def suspend(): Unit = unsupported + + def resume(): Unit = unsupported + def stop() { synchronized { - if (_status == ActorRefInternals.RUNNING) { - _status = ActorRefInternals.SHUTDOWN + if (running) { + running = false postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } } @@ -1057,10 +553,6 @@ private[akka] case class RemoteActorRef private[akka] ( } private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") - - /* If you start me up... */ - if (_status == ActorRefInternals.UNSTARTED) - _status = ActorRefInternals.RUNNING } /** @@ -1156,6 +648,10 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { unsupported } + def suspend(): Unit = unsupported + + def resume(): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { unsupported } diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 95f08c1f1e..849aa42839 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -200,7 +200,7 @@ trait IO { if (!reinvoked && (_next eq Idle) && _messages.nonEmpty) { try { reinvoked = true - while ((_next eq Idle) && _messages.nonEmpty) self.asInstanceOf[LocalActorRef] invoke _messages.dequeue + while ((_next eq Idle) && _messages.nonEmpty) self.asInstanceOf[LocalActorRef].underlying invoke _messages.dequeue } finally { reinvoked = false } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala new file mode 100644 index 0000000000..b35c46b5db --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -0,0 +1,139 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.actor + +import akka.config.Supervision._ +import akka.dispatch._ +import akka.japi.Creator +import akka.util._ + +/** + * ActorRef configuration object, this is threadsafe and fully sharable + * + * Props() returns default configuration + * FIXME document me + */ +object Props { + final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") + final val defaultDeployId: String = "" + final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher + final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis")) + final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None) + final val defaultSupervisor: Option[ActorRef] = None + + /** + * The default Props instance, uses the settings from the Props object starting with default* + */ + final val default = new Props() + + /** + * Returns a cached default implementation of Props + */ + def apply(): Props = default + + /** + * Returns a Props that has default values except for "creator" which will be a function that creates an instance + * of the supplied type using the default constructor + */ + def apply[T <: Actor: ClassManifest]: Props = + default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance) + + /** + * Returns a Props that has default values except for "creator" which will be a function that creates an instance + * of the supplied class using the default constructor + */ + def apply(actorClass: Class[_ <: Actor]): Props = + default.withCreator(actorClass.newInstance) + + /** + * Returns a Props that has default values except for "creator" which will be a function that creates an instance + * using the supplied thunk + */ + def apply(creator: ⇒ Actor): Props = default.withCreator(creator) + + /** + * Returns a Props that has default values except for "creator" which will be a function that creates an instance + * using the supplied thunk + */ + def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create) + + def apply(behavior: (ScalaActorRef with SelfActorRef) ⇒ Actor.Receive): Props = apply(new Actor { def receive = behavior(self) }) +} + +/** + * ActorRef configuration object, this is thread safe and fully sharable + */ +case class Props(creator: () ⇒ Actor = Props.defaultCreator, + deployId: String = Props.defaultDeployId, + @transient dispatcher: MessageDispatcher = Props.defaultDispatcher, + timeout: Timeout = Props.defaultTimeout, + faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, + supervisor: Option[ActorRef] = Props.defaultSupervisor) { + /** + * No-args constructor that sets all the default values + * Java API + */ + def this() = this( + creator = Props.defaultCreator, + deployId = Props.defaultDeployId, + dispatcher = Props.defaultDispatcher, + timeout = Props.defaultTimeout, + faultHandler = Props.defaultFaultHandler, + supervisor = Props.defaultSupervisor) + + /** + * Returns a new Props with the specified creator set + * Scala API + */ + def withCreator(c: ⇒ Actor) = copy(creator = () ⇒ c) + + /** + * Returns a new Props with the specified creator set + * Java API + */ + def withCreator(c: Creator[Actor]) = copy(creator = () ⇒ c.create) + + /** + * Returns a new Props with the specified deployId set + * Java and Scala API + */ + def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id) + + /** + * Returns a new Props with the specified dispatcher set + * Java API + */ + def withDispatcher(d: MessageDispatcher) = copy(dispatcher = d) + + /** + * Returns a new Props with the specified timeout set + * Java API + */ + def withTimeout(t: Timeout) = copy(timeout = t) + + /** + * Returns a new Props with the specified faulthandler set + * Java API + */ + def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) + + /** + * Returns a new Props with the specified supervisor set, if null, it's equivalent to withSupervisor(Option.none()) + * Java API + */ + def withSupervisor(s: ActorRef) = copy(supervisor = Option(s)) + + /** + * Returns a new Props with the specified supervisor set + * Java API + */ + def withSupervisor(s: akka.japi.Option[ActorRef]) = copy(supervisor = s.asScala) + + /** + * Returns a new Props with the specified supervisor set + * Scala API + */ + def withSupervisor(s: scala.Option[ActorRef]) = copy(supervisor = s) +} diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index e7b2f98c9b..c2d649bf6b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -5,7 +5,7 @@ package akka.dispatch import util.DynamicVariable -import akka.actor.{ LocalActorRef, ActorRef, Actor, IllegalActorStateException } +import akka.actor.{ ActorInstance, Actor, IllegalActorStateException } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -52,27 +52,27 @@ class BalancingDispatcher( @volatile private var actorType: Option[Class[_]] = None @volatile - private var members = Vector[LocalActorRef]() + private var members = Vector[ActorInstance]() private val donationInProgress = new DynamicVariable(false) - protected[akka] override def register(actorRef: LocalActorRef) = { + protected[akka] override def register(actor: ActorInstance) = { //Verify actor type conformity actorType match { - case None ⇒ actorType = Some(actorRef.actorInstance.get().getClass) + case None ⇒ actorType = Some(actor.actorClass) case Some(aType) ⇒ - if (aType != actorRef.actorInstance.get().getClass) + if (aType != actor.actorClass) throw new IllegalActorStateException(String.format( "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", - actorRef, aType)) + actor, aType)) } - members :+= actorRef //Update members, doesn't need synchronized, is guarded in attach - super.register(actorRef) + members :+= actor //Update members, doesn't need synchronized, is guarded in attach + super.register(actor) } - protected[akka] override def unregister(actorRef: LocalActorRef) = { - members = members.filterNot(actorRef eq) //Update members, doesn't need synchronized, is guarded in detach - super.unregister(actorRef) + protected[akka] override def unregister(actor: ActorInstance) = { + members = members.filterNot(actor eq) //Update members, doesn't need synchronized, is guarded in detach + super.unregister(actor) } override protected[akka] def dispatch(invocation: MessageInvocation) = { @@ -126,7 +126,7 @@ class BalancingDispatcher( * Rewrites the message and adds that message to the recipients mailbox * returns true if the message is non-null */ - protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = { + protected def donate(organ: MessageInvocation, recipient: ActorInstance): Boolean = { if (organ ne null) { recipient.postMessageToMailbox(organ.message, organ.channel) true @@ -136,10 +136,10 @@ class BalancingDispatcher( /** * Returns an available recipient for the message, if any */ - protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[LocalActorRef], startIndex: Int): ActorRef = { + protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorInstance], startIndex: Int): ActorInstance = { val prSz = potentialRecipients.size var i = 0 - var recipient: ActorRef = null + var recipient: ActorInstance = null while ((i < prSz) && (recipient eq null)) { val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index eaa5fc613b..49aecc70a4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.event.EventHandler import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } -import akka.actor.{ LocalActorRef, ActorKilledException, ActorRef } +import akka.actor.{ ActorInstance, ActorKilledException } /** * Default settings are: @@ -108,13 +108,13 @@ class Dispatcher( /** * @return the mailbox associated with the actor */ - protected def getMailbox(receiver: LocalActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + protected def getMailbox(receiver: ActorInstance) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] - override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).isEmpty + override def mailboxIsEmpty(actor: ActorInstance): Boolean = getMailbox(actor).isEmpty - override def mailboxSize(actorRef: LocalActorRef): Int = getMailbox(actorRef).size + override def mailboxSize(actor: ActorInstance): Int = getMailbox(actor).size - def createMailbox(actorRef: LocalActorRef): AnyRef = mailboxType match { + def createMailbox(actor: ActorInstance): AnyRef = mailboxType match { case b: UnboundedMailbox ⇒ new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { @inline @@ -160,8 +160,8 @@ class Dispatcher( protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) - protected override def cleanUpMailboxFor(actorRef: LocalActorRef) { - val m = getMailbox(actorRef) + protected override def cleanUpMailboxFor(actor: ActorInstance) { + val m = getMailbox(actor) if (!m.isEmpty) { var invocation = m.dequeue lazy val exception = new ActorKilledException("Actor has been stopped") @@ -174,11 +174,11 @@ class Dispatcher( override val toString = getClass.getSimpleName + "[" + name + "]" - def suspend(actorRef: LocalActorRef): Unit = - getMailbox(actorRef).suspended.tryLock + def suspend(actor: ActorInstance): Unit = + getMailbox(actor).suspended.tryLock - def resume(actorRef: LocalActorRef): Unit = { - val mbox = getMailbox(actorRef) + def resume(actor: ActorInstance): Unit = { + val mbox = getMailbox(actor) mbox.suspended.tryUnlock reRegisterForExecution(mbox) } @@ -296,7 +296,7 @@ class PriorityDispatcher( trait PriorityMailbox { self: Dispatcher ⇒ def comparator: java.util.Comparator[MessageInvocation] - override def createMailbox(actorRef: LocalActorRef): AnyRef = self.mailboxType match { + override def createMailbox(actor: ActorInstance): AnyRef = self.mailboxType match { case b: UnboundedMailbox ⇒ new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox { @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index da666fa038..ec73f9af31 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -4,7 +4,7 @@ package akka.dispatch -import akka.actor.ActorRef +import akka.actor.LocalActorRef import akka.actor.newUuid import akka.config.Config._ import akka.util.{ Duration, ReflectiveAccess } @@ -65,9 +65,9 @@ object Dispatchers { *

* E.g. each actor consumes its own thread. */ - def newPinnedDispatcher(actor: ActorRef) = actor match { + def newPinnedDispatcher(actor: LocalActorRef) = actor match { case null ⇒ new PinnedDispatcher() - case some ⇒ new PinnedDispatcher(some) + case some ⇒ new PinnedDispatcher(some.underlying) } /** @@ -76,9 +76,9 @@ object Dispatchers { *

* E.g. each actor consumes its own thread. */ - def newPinnedDispatcher(actor: ActorRef, mailboxType: MailboxType) = actor match { + def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { case null ⇒ new PinnedDispatcher(mailboxType) - case some ⇒ new PinnedDispatcher(some, mailboxType) + case some ⇒ new PinnedDispatcher(some.underlying, mailboxType) } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 164d22275c..eec2f558de 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -16,7 +16,7 @@ import akka.actor._ /** * @author Jonas Bonér */ -final case class MessageInvocation(val receiver: LocalActorRef, +final case class MessageInvocation(val receiver: ActorInstance, val message: Any, val channel: UntypedChannel) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") @@ -68,7 +68,7 @@ abstract class MessageDispatcher extends Serializable { /** * Creates and returns a mailbox for the given actor. */ - protected[akka] def createMailbox(actorRef: LocalActorRef): AnyRef + protected[akka] def createMailbox(actor: ActorInstance): AnyRef /** * Name of this dispatcher. @@ -76,20 +76,20 @@ abstract class MessageDispatcher extends Serializable { def name: String /** - * Attaches the specified actorRef to this dispatcher + * Attaches the specified actor instance to this dispatcher */ - final def attach(actorRef: LocalActorRef) { + final def attach(actor: ActorInstance) { guard withGuard { - register(actorRef) + register(actor) } } /** - * Detaches the specified actorRef from this dispatcher + * Detaches the specified actor instance from this dispatcher */ - final def detach(actorRef: LocalActorRef) { + final def detach(actor: ActorInstance) { guard withGuard { - unregister(actorRef) + unregister(actor) } } @@ -132,11 +132,11 @@ abstract class MessageDispatcher extends Serializable { * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * and only call it under the dispatcher-guard, see "attach" for the only invocation */ - protected[akka] def register(actorRef: LocalActorRef) { - if (actorRef.mailbox eq null) - actorRef.mailbox = createMailbox(actorRef) + protected[akka] def register(actor: ActorInstance) { + if (actor.mailbox eq null) + actor.mailbox = createMailbox(actor) - uuids add actorRef.uuid + uuids add actor.uuid if (active.isOff) { active.switchOn { start() @@ -148,10 +148,10 @@ abstract class MessageDispatcher extends Serializable { * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * and only call it under the dispatcher-guard, see "detach" for the only invocation */ - protected[akka] def unregister(actorRef: LocalActorRef) = { - if (uuids remove actorRef.uuid) { - cleanUpMailboxFor(actorRef) - actorRef.mailbox = null + protected[akka] def unregister(actor: ActorInstance) = { + if (uuids remove actor.uuid) { + cleanUpMailboxFor(actor) + actor.mailbox = null if (uuids.isEmpty && _tasks.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ @@ -169,7 +169,7 @@ abstract class MessageDispatcher extends Serializable { * Overridable callback to clean up the mailbox for a given actor, * called when an actor is unregistered. */ - protected def cleanUpMailboxFor(actorRef: LocalActorRef) {} + protected def cleanUpMailboxFor(actor: ActorInstance) {} /** * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors @@ -214,12 +214,12 @@ abstract class MessageDispatcher extends Serializable { /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ - def suspend(actorRef: LocalActorRef) + def suspend(actor: ActorInstance) /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ - def resume(actorRef: LocalActorRef) + def resume(actor: ActorInstance) /** * Will be called when the dispatcher is to queue an invocation for execution @@ -241,12 +241,12 @@ abstract class MessageDispatcher extends Serializable { /** * Returns the size of the mailbox for the specified actor */ - def mailboxSize(actorRef: LocalActorRef): Int + def mailboxSize(actor: ActorInstance): Int /** * Returns the "current" emptiness status of the mailbox for the specified actor */ - def mailboxIsEmpty(actorRef: LocalActorRef): Boolean + def mailboxIsEmpty(actor: ActorInstance): Boolean /** * Returns the amount of tasks queued for execution diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index b2922469a3..befc1cb1ec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -5,44 +5,44 @@ package akka.dispatch import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ LocalActorRef, ActorRef } +import akka.actor.ActorInstance /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxType) +class PinnedDispatcher(_actor: ActorInstance, _name: String, _mailboxType: MailboxType) extends Dispatcher( _name, Dispatchers.THROUGHPUT, -1, _mailboxType, PinnedDispatcher.oneThread) { def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType) - def this(_actor: ActorRef, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE) + def this(_actor: ActorInstance, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE) def this(_name: String) = this(null, _name, Dispatchers.MAILBOX_TYPE) def this(_mailboxType: MailboxType) = this(null, "anon", _mailboxType) - def this(_actor: ActorRef, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType) + def this(_actor: ActorInstance, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType) - def this(_actor: ActorRef) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE) + def this(_actor: ActorInstance) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE) def this() = this(Dispatchers.MAILBOX_TYPE) - protected[akka] val owner = new AtomicReference[ActorRef](_actor) + protected[akka] val owner = new AtomicReference[ActorInstance](_actor) //Relies on an external lock provided by MessageDispatcher.attach - protected[akka] override def register(actorRef: LocalActorRef) = { + protected[akka] override def register(actorInstance: ActorInstance) = { val actor = owner.get() - if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) - owner.compareAndSet(null, actorRef) //Register if unregistered - super.register(actorRef) + if ((actor ne null) && actorInstance != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) + owner.compareAndSet(null, actorInstance) //Register if unregistered + super.register(actorInstance) } //Relies on an external lock provided by MessageDispatcher.detach - protected[akka] override def unregister(actorRef: LocalActorRef) = { - super.unregister(actorRef) - owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak) + protected[akka] override def unregister(actor: ActorInstance) = { + super.unregister(actor) + owner.compareAndSet(actor, null) //Unregister (prevent memory leak) } } diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 95a8d9ab86..a2ba6c3670 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -123,4 +123,4 @@ trait Supervision { self: DeathWatch => * Default implementation of preRestart == postStop * Default implementation of postRestart == preStart * -* */ \ No newline at end of file +* */ \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 8b8cc31c0d..c733550170 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -152,7 +152,7 @@ trait SmallestMailboxSelector { var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount def mailboxSize(a: ActorRef): Int = a match { - case l: LocalActorRef ⇒ l.dispatcher.mailboxSize(l) + case l: LocalActorRef ⇒ l.dispatcher.mailboxSize(l.underlying) case _ ⇒ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority } @@ -238,7 +238,7 @@ trait MailboxPressureCapacitor { def pressureThreshold: Int def pressure(delegates: Seq[ActorRef]): Int = delegates count { - case a: LocalActorRef ⇒ a.dispatcher.mailboxSize(a) > pressureThreshold + case a: LocalActorRef ⇒ a.dispatcher.mailboxSize(a.underlying) > pressureThreshold case _ ⇒ false } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b7c5eddeaa..32dcfc8438 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -227,20 +227,23 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte */ private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) { - router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections)) + @volatile + private var running: Boolean = true + + def isRunning: Boolean = running + + def isShutdown: Boolean = !running def stop() { synchronized { - if (_status == ActorRefInternals.RUNNING) { - _status = ActorRefInternals.SHUTDOWN + if (running) { + running = false postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } } } - /*If you start me up*/ - if (_status == ActorRefInternals.UNSTARTED) - _status = ActorRefInternals.RUNNING + router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections)) } /** diff --git a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala index 06ba255c4d..74d63401ed 100644 --- a/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala +++ b/akka-camel-typed/src/main/scala/akka/camel/TypedConsumer.scala @@ -44,7 +44,7 @@ private[camel] object TypedConsumer { private def withConsumeAnnotatedMethodsonImplClass[T](tc: AnyRef, actorRef: ActorRef, f: (AnyRef, Method) ⇒ T): List[T] = actorRef match { case l: LocalActorRef ⇒ - val implClass = l.actorInstance.get().asInstanceOf[TypedActor.TypedActor[AnyRef, AnyRef]].me.getClass + val implClass = l.underlyingActorInstance.asInstanceOf[TypedActor.TypedActor[AnyRef, AnyRef]].me.getClass for (m ← implClass.getDeclaredMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(tc, m) case _ ⇒ Nil } diff --git a/akka-camel/src/main/scala/akka/camel/Consumer.scala b/akka-camel/src/main/scala/akka/camel/Consumer.scala index d5843ebe0b..77d04eb7d1 100644 --- a/akka-camel/src/main/scala/akka/camel/Consumer.scala +++ b/akka-camel/src/main/scala/akka/camel/Consumer.scala @@ -141,7 +141,7 @@ private[camel] object Consumer { */ def withConsumer[T](actorRef: ActorRef)(f: Consumer ⇒ T): Option[T] = actorRef match { case l: LocalActorRef ⇒ - l.actorInstance.get() match { + l.underlyingActorInstance match { case c: Consumer ⇒ Some(f(c)) case _ ⇒ None } diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index 891e809423..b2f7100ed6 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -265,14 +265,24 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall val address = exchange.getExchangeId + @volatile + private var running: Boolean = false + + def isRunning: Boolean = running + + def isShutdown: Boolean = !running + def start = { - if (_status == ActorRefInternals.UNSTARTED) - _status = ActorRefInternals.RUNNING + running = true this } - def stop() = { - _status = ActorRefInternals.SHUTDOWN + def suspend(): Unit = () + + def resume(): Unit = () + + def stop(): Unit = { + running = false } /** diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala index 1fe5728327..45ab3a2ccb 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerPublishRequestorTest.scala @@ -39,7 +39,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { requestor ! ActorRegistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher ? GetRetainedMessage).get === - ConsumerActorRegistered(consumer, consumer.actorInstance.get.asInstanceOf[Consumer])) + ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) } @Test @@ -48,7 +48,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite { requestor ! ActorUnregistered(consumer.address, consumer, None) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher ? GetRetainedMessage).get === - ConsumerActorUnregistered(consumer, consumer.actorInstance.get.asInstanceOf[Consumer])) + ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer])) } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerRegisteredTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerRegisteredTest.scala index be91f3ac2e..5003413d42 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerRegisteredTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerRegisteredTest.scala @@ -49,7 +49,7 @@ class ConsumerRegisteredTest extends JUnitSuite { } private def consumerOf(ref: ActorRef) = ref match { - case l: LocalActorRef ⇒ l.actorInstance.get.asInstanceOf[Consumer] + case l: LocalActorRef ⇒ l.underlyingActorInstance.asInstanceOf[Consumer] case _ ⇒ null: Consumer } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 9837cf2d5d..84f43f7dd1 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -79,7 +79,7 @@ object ActorSerialization { .setTimeout(actorRef.timeout) if (localRef.isDefined) - builder.setActorClassname(localRef.get.actorInstance.get.getClass.getName) //TODO FIXME Why is the classname needed anymore? + builder.setActorClassname(localRef.get.actorClass.getName) //TODO FIXME Why is the classname needed anymore? replicationScheme match { case _: Transient | Transient ⇒ @@ -104,7 +104,7 @@ object ActorSerialization { localRef foreach { l ⇒ if (serializeMailBox) { - l.mailbox match { + l.underlying.mailbox match { case null ⇒ throw new IllegalActorStateException("Can't serialize an actor that has not been started.") case q: java.util.Queue[_] ⇒ val l = new scala.collection.mutable.ListBuffer[MessageInvocation] @@ -113,7 +113,7 @@ object ActorSerialization { l map { m ⇒ RemoteActorSerialization.createRemoteMessageProtocolBuilder( - Option(m.receiver), + Option(m.receiver.ref), Left(actorRef.uuid), actorRef.address, actorRef.timeout, @@ -130,7 +130,7 @@ object ActorSerialization { } l.receiveTimeout.foreach(builder.setReceiveTimeout(_)) - val actorInstance = l.actorInstance.get + val actorInstance = l.underlyingActorInstance Serialization.serialize(actorInstance.asInstanceOf[T]) match { case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes)) case Left(exception) ⇒ throw new Exception("Error serializing : " + actorInstance.getClass.getName) diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index b6067833ba..732c86389a 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -39,15 +39,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true) for (i ← 1 to 10) actor1 ! "hello" - actor1.getDispatcher.mailboxSize(actor1) should be > (0) + actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.getDispatcher.mailboxSize(actor1) should be > (0) + actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) (actor2 ? "hello-reply").get should equal("world") val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.getDispatcher.mailboxSize(actor1) should equal(0) + actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) (actor3 ? "hello-reply").get should equal("world") } @@ -65,15 +65,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) - actor1.getDispatcher.mailboxSize(actor1) should be > (0) + actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.getDispatcher.mailboxSize(actor1) should be > (0) + actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) (actor2 ? "hello-reply").get should equal("hello") val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.getDispatcher.mailboxSize(actor1) should equal(0) + actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) (actor3 ? "hello-reply").get should equal("hello") } } @@ -102,15 +102,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll val msg = MyMessage(123, "debasish ghosh", true) val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build for (i ← 1 to 10) actor1 ! b - actor1.getDispatcher.mailboxSize(actor1) should be > (0) + actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0) val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor2.getDispatcher.mailboxSize(actor1) should be > (0) + actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0) (actor2 ? "hello-reply").get should equal("world") val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef] Thread.sleep(1000) - actor3.getDispatcher.mailboxSize(actor1) should equal(0) + actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0) (actor3 ? "hello-reply").get should equal("world") } } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 9d6b5b6df9..2d044f1e13 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -151,7 +151,7 @@ class Agent[T](initialValue: T) { * still be executed in order. */ def sendOff(f: T ⇒ T): Unit = send((value: T) ⇒ { - suspend + suspend() val threadBased = actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(new PinnedDispatcher())) threadBased ! Update(f) value @@ -167,7 +167,7 @@ class Agent[T](initialValue: T) { def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = { val result = new DefaultPromise[T](timeout) send((value: T) ⇒ { - suspend + suspend() val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)) result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value @@ -206,12 +206,12 @@ class Agent[T](initialValue: T) { /** * Suspends processing of `send` actions for the agent. */ - def suspend() = updater.dispatcher.suspend(updater) + def suspend() = updater.suspend() /** * Resumes processing of `send` actions for the agent. */ - def resume() = updater.dispatcher.resume(updater) + def resume() = updater.resume() /** * Closes the agents and makes it eligible for garbage collection. @@ -300,7 +300,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { case update: Update[_] ⇒ try { self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) } finally { - agent.resume + agent.resume() self.stop() } case _ ⇒ self.stop() diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 2657faf868..780e56abe6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -11,7 +11,7 @@ import java.util.concurrent.RejectedExecutionException import akka.util.Switch import java.lang.ref.WeakReference import scala.annotation.tailrec -import akka.actor.{ LocalActorRef, ActorRef } +import akka.actor.ActorInstance /* * Locking rules: @@ -107,9 +107,9 @@ object CallingThreadDispatcher { class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher { import CallingThreadDispatcher._ - protected[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox + protected[akka] override def createMailbox(actor: ActorInstance) = new CallingThreadMailbox - private def getMailbox(actor: LocalActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + private def getMailbox(actor: ActorInstance) = actor.mailbox.asInstanceOf[CallingThreadMailbox] protected[akka] override def start() {} @@ -117,11 +117,11 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: protected[akka] override def timeoutMs = 100L - override def suspend(actor: LocalActorRef) { + override def suspend(actor: ActorInstance) { getMailbox(actor).suspended.switchOn } - override def resume(actor: LocalActorRef) { + override def resume(actor: ActorInstance) { val mbox = getMailbox(actor) val queue = mbox.queue val wasActive = queue.isActive @@ -133,9 +133,9 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: } } - override def mailboxSize(actor: LocalActorRef) = getMailbox(actor).queue.size + override def mailboxSize(actor: ActorInstance) = getMailbox(actor).queue.size - override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).queue.isEmpty + override def mailboxIsEmpty(actor: ActorInstance): Boolean = getMailbox(actor).queue.isEmpty protected[akka] override def dispatch(handle: MessageInvocation) { val mbox = getMailbox(handle.receiver) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index a3f372b5d5..01bf530c2b 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -25,14 +25,14 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor * thrown will be available to you, while still being able to use * become/unbecome and their message counterparts. */ - def apply(o: Any) { actorInstance.get().apply(o) } + def apply(o: Any) { underlyingActorInstance.apply(o) } /** * Retrieve reference to the underlying actor, where the static type matches the factory used inside the * constructor. Beware that this reference is discarded by the ActorRef upon restarting the actor (should this * reference be linked to a supervisor). The old Actor may of course still be used in post-mortem assertions. */ - def underlyingActor: T = actorInstance.get().asInstanceOf[T] + def underlyingActor: T = underlyingActorInstance.asInstanceOf[T] override def toString = "TestActor[" + address + ":" + uuid + "]"