diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala similarity index 93% rename from akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala rename to akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 0e5602a899..f917c22bad 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -1,18 +1,27 @@ /** * Copyright (C) 2009-2011 Typesafe Inc. */ -package akka.actor +package akka.event import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.util.duration._ import akka.testkit._ import org.scalatest.WordSpec -import akka.event.Logging import akka.util.Duration import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import scala.collection.JavaConverters._ import java.util.Properties +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.HotSwap +import akka.actor.UnhandledMessageException +import akka.actor.PoisonPill +import akka.actor.ActorSystemImpl +import akka.actor.Props +import akka.actor.OneForOneStrategy +import akka.actor.ActorKilledException +import akka.actor.Kill object LoggingReceiveSpec { class TestLogActor extends Actor { @@ -58,7 +67,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val r: Actor.Receive = { case null ⇒ } - val log = Actor.LoggingReceive("funky", r) + val log = LoggingReceive("funky")(r) log.isDefinedAt("hallo") expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) } @@ -70,7 +79,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Error]) val actor = TestActorRef(new Actor { - def receive = loggable(this) { + def receive = LoggingReceive(this) { case x ⇒ sender ! "x" } @@ -100,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd new TestKit(appLogging) with ImplicitSender { system.eventStream.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { - def receive = loggable(this)(loggable(this) { + def receive = LoggingReceive(this)(LoggingReceive(this) { case _ ⇒ sender ! "x" }) }) diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index fbfd7d7e9c..9db5a8dcff 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -81,7 +81,7 @@ akka { # optional replication { # use replication or not? only makes sense for a stateful actor - # FIXME should we have this config option here? If so, implement it all through. + # serialize-mailbox not implemented, ticket #1412 serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot? # default is 'off' diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b8c0bbb327..2980d48691 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -160,25 +160,6 @@ object Actor { type Receive = PartialFunction[Any, Unit] - /** - * This decorator adds invocation logging to a Receive function. - */ - class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { - def isDefinedAt(o: Any) = { - val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) - handled - } - def apply(o: Any): Unit = r(o) - } - - object LoggingReceive { - def apply(source: AnyRef, r: Receive)(implicit system: ActorSystem): Receive = r match { - case _: LoggingReceive ⇒ r - case _ ⇒ new LoggingReceive(source, r) - } - } - object emptyBehavior extends Receive { def isDefinedAt(x: Any) = false def apply(x: Any) = throw new UnsupportedOperationException("empty behavior apply()") @@ -235,22 +216,6 @@ trait Actor { */ implicit def defaultTimeout = system.settings.ActorTimeout - /** - * Wrap a Receive partial function in a logging enclosure, which sends a - * debug message to the EventHandler each time before a message is matched. - * This includes messages which are not handled. - * - *

-   * def receive = loggable {
-   *   case x => ...
-   * }
-   * 
- * - * This method does NOT modify the given Receive unless - * akka.actor.debug.receive is set within akka.conf. - */ - def loggable(self: AnyRef)(r: Receive): Receive = if (system.settings.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait? - /** * The 'self' field holds the ActorRef for this actor. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index afd462ff1e..0f9cd38370 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -190,6 +190,7 @@ private[akka] class ActorCell( checkReceiveTimeout if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")")) } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { system.eventStream.publish(Error(e, self.toString, "error while creating actor")) @@ -222,6 +223,7 @@ private[akka] class ActorCell( props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { system.eventStream.publish(Error(e, self.toString, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted @@ -283,7 +285,7 @@ private[akka] class ActorCell( } catch { case e ⇒ //Should we really catch everything here? system.eventStream.publish(Error(e, self.toString, "error while processing " + message)) - //TODO FIXME How should problems here be handled? + //TODO FIXME How should problems here be handled??? throw e } } @@ -294,7 +296,7 @@ private[akka] class ActorCell( currentMessage = messageHandle try { try { - cancelReceiveTimeout() // FIXME: leave this here? + cancelReceiveTimeout() // FIXME: leave this here??? messageHandle.message match { case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) case msg if stopping ⇒ // receiving Terminated in response to stopping children is too common to generate noise diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e62a04938a..aa54f79205 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -111,12 +111,12 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable /** * Suspends the actor. It will not process messages while suspended. */ - def suspend(): Unit //TODO FIXME REMOVE THIS + def suspend(): Unit //TODO FIXME REMOVE THIS, ticket #1415 /** * Resumes a suspended actor. */ - def resume(): Unit //TODO FIXME REMOVE THIS + def resume(): Unit //TODO FIXME REMOVE THIS, ticket #1415 /** * Shuts down the actor its dispatcher and message queue. @@ -135,7 +135,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * * @return the same ActorRef that is provided to it, to allow for cleaner invocations */ - def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS + def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS, ticket #1416 /** * Deregisters this actor from being a death monitor of the provided ActorRef @@ -144,7 +144,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable * * @return the same ActorRef that is provided to it, to allow for cleaner invocations */ - def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS + def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS, ticket #1416 override def hashCode: Int = HashCode.hash(HashCode.SEED, address) @@ -201,13 +201,13 @@ class LocalActorRef private[akka] ( * message sends done from the same thread after calling this method will not * be processed until resumed. */ - //FIXME TODO REMOVE THIS, NO REPLACEMENT + //FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415 def suspend(): Unit = actorCell.suspend() /** * Resumes a suspended actor. */ - //FIXME TODO REMOVE THIS, NO REPLACEMENT + //FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415 def resume(): Unit = actorCell.resume() /** @@ -237,7 +237,7 @@ class LocalActorRef private[akka] ( protected[akka] def underlying: ActorCell = actorCell - // FIXME TODO: remove this method + // FIXME TODO: remove this method. It is used in testkit. // @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0") protected[akka] def underlyingActorInstance: Actor = { var instance = actorCell.actor @@ -308,7 +308,6 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) { import akka.serialization.Serialization.currentSystem def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path) - def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE @throws(classOf[java.io.ObjectStreamException]) def readResolve(): AnyRef = currentSystem.value match { @@ -330,9 +329,11 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { private[akka] val uuid: Uuid = newUuid() def name: String = uuid.toString + //FIXME REMOVE THIS, ticket #1416 def startsWatching(actorRef: ActorRef): ActorRef = actorRef def stopsWatching(actorRef: ActorRef): ActorRef = actorRef + //FIXME REMOVE THIS, ticket #1415 def suspend(): Unit = () def resume(): Unit = () diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 764838cdb8..5ebe475cd0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -36,10 +36,10 @@ trait ActorRefProvider { def deathWatch: DeathWatch - // FIXME: remove/replace? + // FIXME: remove/replace??? def nodename: String - // FIXME: remove/replace? + // FIXME: remove/replace??? def clustername: String /** @@ -64,7 +64,7 @@ trait ActorRefProvider { /** * Create an Actor with the given full path below the given supervisor. * - * FIXME: Remove! this is dangerous! + * FIXME: Remove! this is dangerous!? */ private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef @@ -296,7 +296,7 @@ class LocalActorRefProvider( private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = { val name = path.name - val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout? + val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher) actors.putIfAbsent(path.toString, newFuture) match { case null ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4f02473b4f..f17a4fca8d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -209,8 +209,6 @@ abstract class ActorSystem extends ActorRefFactory { * effort basis and hence not strictly guaranteed. */ def deadLetters: ActorRef - // FIXME: do not publish this - def deadLetterMailbox: Mailbox /** * Light-weight scheduler for running asynchronous tasks after some deadline @@ -328,7 +326,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher - //FIXME Set this to a Failure when things bubble to the top + //FIXME Set this to a Failure when things bubble to the top. What does this mean? def terminationFuture: Future[Unit] = provider.terminationFuture def guardian: ActorRef = provider.guardian def systemGuardian: ActorRef = provider.systemGuardian diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index 6e0f99b50d..6f8608378d 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -58,8 +58,7 @@ trait BootableActorLoaderService extends Bootable { abstract override def onUnload() = { super.onUnload() - // FIXME shutdown all actors - // system.registry.local.shutdownAll + system.stop() } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index ec1d8dfc4c..709d5dbe48 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -18,7 +18,6 @@ import com.typesafe.config.Config trait ActorDeployer { private[akka] def init(deployments: Seq[Deploy]): Unit - private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only? private[akka] def deploy(deployment: Deploy): Unit private[akka] def lookupDeploymentFor(path: String): Option[Deploy] def lookupDeployment(path: String): Option[Deploy] = path match { @@ -49,8 +48,6 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments) - def shutdown(): Unit = instance.shutdown() //TODO FIXME Why should we have "shutdown", should be crash only? - def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def isLocal(deployment: Deploy): Boolean = deployment match { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 208eae51ca..fd45069e68 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -69,6 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, try { function() } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) } finally { cleanup() diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index e9a3035ea8..274f82fe6c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -163,6 +163,7 @@ object Future { try { Right(body) } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ Left(e) } } @@ -411,7 +412,9 @@ object Future { try { next.apply() } catch { - case e ⇒ e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks + case e ⇒ + // FIXME catching all and continue isn't good for OOME, ticket #1418 + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage)) } } } finally { _taskStack set None } @@ -984,7 +987,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def run() { if (!isCompleted) { if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) - else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) + else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 } } } @@ -994,6 +997,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { + // FIXME catching all and continue isn't good for OOME, ticket #1418 try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 49110c2974..3e55fc720e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -28,9 +28,6 @@ object Mailbox { // secondary status: Scheduled bit may be added to Open/Suspended final val Scheduled = 4 - // mailbox debugging helper using println (see below) - // FIXME TODO take this out before release - final val debug = false } /** @@ -167,7 +164,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes var processedMessages = 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0 do { - if (debug) println(actor.self + " processing message " + nextMessage) actor invoke nextMessage processAllSystemMessages() //After we're done, process all system messages @@ -190,7 +186,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes var nextMessage = systemDrain() try { while (nextMessage ne null) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! @@ -245,7 +240,6 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) - if (Mailbox.debug) println(actor.self + " having enqueued " + message) val head = systemQueueGet /* * this write is safely published by the compareAndSet contained within diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala new file mode 100644 index 0000000000..250af89812 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.event + +import akka.actor.Actor.Receive +import akka.actor.ActorSystem +import akka.event.Logging.Debug + +object LoggingReceive { + + /** + * Wrap a Receive partial function in a logging enclosure, which sends a + * debug message to the event bus each time before a message is matched. + * This includes messages which are not handled. + * + *


+   * def receive = LoggingReceive(this) {
+   *   case x => ...
+   * }
+   * 
+ * + * This method does NOT modify the given Receive unless + * akka.actor.debug.receive is set within akka.conf. + */ + def apply(source: AnyRef)(r: Receive)(implicit system: ActorSystem): Receive = r match { + case _: LoggingReceive ⇒ r + case _ if !system.settings.AddLoggingReceive ⇒ r + case _ ⇒ new LoggingReceive(source, r) + } +} + +/** + * This decorator adds invocation logging to a Receive function. + */ +class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { + def isDefinedAt(o: Any) = { + val handled = r.isDefinedAt(o) + system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + handled + } + def apply(o: Any): Unit = r(o) +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 293c1abb4b..10eecbfc56 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -245,7 +245,7 @@ trait BasicRouter extends Router { next match { case Some(connection) ⇒ try { - connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it? + connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?? } catch { case e: Exception ⇒ connectionManager.remove(connection) diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index 5b17ca5c7d..3c0f386b84 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -108,6 +108,7 @@ class BoundedBlockingQueue[E <: AnyRef]( throw ie } false + // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ notFull.signal() result = e @@ -234,7 +235,7 @@ class BoundedBlockingQueue[E <: AnyRef]( if (backing.removeAll(c)) { val sz = backing.size() if (sz < maxCapacity) notFull.signal() - if (sz > 0) notEmpty.signal() //FIXME needed? + if (sz > 0) notEmpty.signal() //FIXME needed?? true } else false } finally { @@ -247,7 +248,7 @@ class BoundedBlockingQueue[E <: AnyRef]( try { if (backing.retainAll(c)) { val sz = backing.size() - if (sz < maxCapacity) notFull.signal() //FIXME needed? + if (sz < maxCapacity) notFull.signal() //FIXME needed?? if (sz > 0) notEmpty.signal() true } else false diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index ca1469217c..7ff69fc9a9 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -717,11 +717,12 @@ options: ``akka.actor.debug.receive`` — which enables the :meth:`loggable` statement to be applied to an actor’s :meth:`receive` function:: - def receive = Actor.loggable(this) { // `Actor` unnecessary with import Actor._ + import akka.event.LoggingReceive + def receive = LoggingReceive(this) { case msg => ... } - The first argument to :meth:`loggable` defines the source to be used in the + The first argument to :meth:`LoggingReceive` defines the source to be used in the logging events, which should be the current actor. If the abovementioned setting is not given in ``akka.conf``, this method will diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 8a81b2f8e4..8fa7f81e25 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -62,7 +62,8 @@ class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with queue.remove true } catch { - case e ⇒ false //review why catch Throwable? And swallow potential Errors? + // FIXME catching all and continue isn't good for OOME, ticket #1418 + case e ⇒ false } } diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/java/akka/cluster/zookeeper/ZooKeeperQueue.java b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/java/akka/cluster/zookeeper/ZooKeeperQueue.java index af76896f4b..4e06b64e6b 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/java/akka/cluster/zookeeper/ZooKeeperQueue.java +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/java/akka/cluster/zookeeper/ZooKeeperQueue.java @@ -66,7 +66,7 @@ public class ZooKeeperQueue { return element.getData(); } else { throw new UnsupportedOperationException("Non-blocking ZooKeeperQueue is not yet supported"); - /* FIXME DOES NOT WORK + /* TODO DOES NOT WORK try { String headName = getSmallestElement(_zkClient.getChildren(_elementsPath)); String headPath = getElementPath(headName); diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 8350f743d5..c5efa62358 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -59,7 +59,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) queue.clear true } catch { - case e ⇒ false + case e: Exception ⇒ false } override def cleanUp() { diff --git a/akka-remote/src/main/resources/akka-remote-reference.conf b/akka-remote/src/main/resources/akka-remote-reference.conf index 39fcda8cd7..130f1f4526 100644 --- a/akka-remote/src/main/resources/akka-remote-reference.conf +++ b/akka-remote/src/main/resources/akka-remote-reference.conf @@ -8,8 +8,7 @@ akka { remote { - # FIXME rename to transport - layer = "akka.cluster.netty.NettyRemoteSupport" + transport = "akka.cluster.netty.NettyRemoteSupport" use-compression = off @@ -27,6 +26,15 @@ akka { # generates fewer mistakes but needs more time to detect actual crashes max-sample-size = 1000 } + + gossip { + initialDelay = 5s + frequency = 1s + } + + compute-grid-dispatcher { # The dispatcher used for remote system messages + name = ComputeGridDispatcher # defaults to same settings as default-dispatcher + } server { hostname = "" # The hostname or ip to bind the remoting to, InetAddress.getLocalHost.getHostAddress is used if empty diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 8050b24fe1..119a4a43a6 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -15,6 +15,7 @@ import akka.config.ConfigurationException import akka.serialization.SerializationExtension import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.TimeUnit.SECONDS import java.security.SecureRandom import System.{ currentTimeMillis ⇒ newTimestamp } @@ -122,19 +123,15 @@ class Gossiper(remote: Remote) { private val nodeFingerprint = address.## private val random = SecureRandom.getInstance("SHA1PRNG") - private val initalDelayForGossip = 5 seconds // FIXME make configurable - private val gossipFrequency = 1 seconds // FIXME make configurable - private val timeUnit = { - assert(gossipFrequency.unit == initalDelayForGossip.unit) - initalDelayForGossip.unit - } + private val initalDelayForGossip = remoteExtension.InitalDelayForGossip + private val gossipFrequency = remoteExtension.GossipFrequency private val state = new AtomicReference[State](State(currentGossip = newGossip())) { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) - system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) + system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) + system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS)) } /** diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 3376ad9416..cde377cda7 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -45,7 +45,7 @@ object NetworkEventStream { case event: RemoteClientLifeCycleEvent ⇒ listeners(event.remoteAddress) foreach (_ notify event) - case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent + case event: RemoteServerLifeCycleEvent ⇒ // FIXME handle RemoteServerLifeCycleEvent, ticket #1408 and #1190 case Register(listener, connectionAddress) ⇒ listeners(connectionAddress) += listener @@ -62,7 +62,7 @@ class NetworkEventStream(system: ActorSystemImpl) { import NetworkEventStream._ - // FIXME: check that this supervision is correct + // FIXME: check that this supervision is correct, ticket #1408 private[akka] val sender = system.provider.actorOf(system, Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")), system.systemGuardian, "network-event-sender", systemService = true) diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 3db093efd9..82daeaf820 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -47,10 +47,9 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { val remoteDaemonServiceName = "akka-system-remote-daemon".intern - // FIXME configure computeGridDispatcher to what? - val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build + val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher") - // FIXME it is probably better to create another supervisor for handling the children created by handle_* + // FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408 private[remote] lazy val remoteDaemonSupervisor = system.actorOf(Props( OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want? @@ -73,13 +72,11 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { lazy val server: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport(system) - remote.start() //TODO FIXME Any application loader here? + remote.start() //TODO Any application loader here? system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - // TODO actually register this provider in system in remote mode - //provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) remote } @@ -157,9 +154,9 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { sender ! Success(remoteAddress) } catch { - case error: Throwable ⇒ //FIXME doesn't seem sensible - sender ! Failure(error) - throw error + case exc: Exception ⇒ + sender ! Failure(exc) + throw exc } } @@ -192,7 +189,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement()) def tempPath = remoteDaemon.path / tempName - // FIXME: handle real remote supervision + // FIXME: handle real remote supervision, ticket #1408 def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) { new LocalActorRef(systemImpl, Props( @@ -201,7 +198,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]]) } - // FIXME: handle real remote supervision + // FIXME: handle real remote supervision, ticket #1408 def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) { new LocalActorRef(systemImpl, Props( @@ -210,7 +207,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]]) } - // FIXME: handle real remote supervision + // FIXME: handle real remote supervision, ticket #1408 def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) { new LocalActorRef(systemImpl, Props( @@ -219,7 +216,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { }).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) } - // FIXME: handle real remote supervision + // FIXME: handle real remote supervision, ticket #1408 def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) { new LocalActorRef(systemImpl, Props( diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 358568e13c..3850e43e83 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -86,7 +86,7 @@ class RemoteActorRefProvider( if (systemService) local.actorOf(system, props, supervisor, path, systemService) else { val name = path.name - val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout? + val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher) actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future case null ⇒ @@ -100,7 +100,7 @@ class RemoteActorRefProvider( if (isReplicaNode) { // we are on one of the replica node for this remote actor - local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create + local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?) } else { implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher @@ -177,7 +177,7 @@ class RemoteActorRefProvider( /** * Copied from LocalActorRefProvider... */ - // FIXME: implement supervision + // FIXME: implement supervision, ticket #1408 def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = { if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router") new RoutedActorRef(system, props, supervisor, name) @@ -266,7 +266,7 @@ class RemoteActorRefProvider( } } - private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch + private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190 private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) @@ -306,7 +306,7 @@ private[akka] case class RemoteActorRef private[akka] ( def resume(): Unit = () - def stop() { //FIXME send the cause as well! + def stop() { //FIXME send the cause as well! (WDYM?) synchronized { if (running) { running = false @@ -318,9 +318,9 @@ private[akka] case class RemoteActorRef private[akka] ( @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = provider.serialize(this) - def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement + def startsWatching(actorRef: ActorRef): ActorRef = unsupported ////FIXME Implement Remote DeathWatch, ticket #1190 - def stopsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement + def stopsWatching(actorRef: ActorRef): ActorRef = unsupported ////FIXME Implement Remote DeathWatch, ticket #1190 protected[akka] def restart(cause: Throwable): Unit = () diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index 53efaba4b4..ab07f9c3da 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -13,7 +13,6 @@ import java.net.InetAddress import akka.config.ConfigurationException import com.eaio.uuid.UUID import akka.actor._ - import scala.collection.JavaConverters._ object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider { @@ -29,19 +28,20 @@ class RemoteExtensionSettings(cfg: Config) extends Extension { import config._ - val RemoteTransport = getString("akka.remote.layer") + val RemoteTransport = getString("akka.remote.transport") val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") val ShouldCompressData = config.getBoolean("akka.remote.use-compression") val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) + val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) // TODO cluster config will go into akka-cluster-reference.conf when we enable that module val ClusterName = getString("akka.cluster.name") val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_)) - // FIXME remove nodename from config - should only be passed as command line arg or read from properties file etc. val NodeName: String = config.getString("akka.cluster.nodename") match { - case "" ⇒ new UUID().toString + case "" ⇒ throw new ConfigurationException("akka.cluster.nodename configuration property must be defined") case value ⇒ value } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d12f5ea7e4..99e5f11097 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -66,7 +66,7 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send(request: RemoteMessageProtocol) { - if (isRunning) { //TODO FIXME RACY + if (isRunning) { //FIXME RACY, ticket #1409 log.debug("Sending message: " + new RemoteMessage(request, remoteSupport)) try { @@ -125,7 +125,7 @@ class ActiveRemoteClient private[akka] ( import remoteSupport.clientSettings._ - //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) + //TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private var bootstrap: ClientBootstrap = _ @volatile @@ -161,7 +161,7 @@ class ActiveRemoteClient private[akka] ( def closeChannel(connection: ChannelFuture) = { val channel = connection.getChannel openChannels.remove(channel) - channel.close + channel.close() } def attemptReconnect(): Boolean = { @@ -345,7 +345,7 @@ class ActiveRemoteClientHandler( client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread } case e: Exception ⇒ - event.getChannel.close //FIXME Is this the correct behavior? + event.getChannel.close() //FIXME Is this the correct behavior??? } } else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress)) @@ -648,7 +648,7 @@ class RemoteServerHandler( val inbound = RemoteAddress(origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) remoteSupport.bindClient(inbound, client) - case CommandType.SHUTDOWN ⇒ //TODO FIXME Dispose passive connection here + case CommandType.SHUTDOWN ⇒ //FIXME Dispose passive connection here, ticket #1410 case _ ⇒ //Unknown command } case _ ⇒ //ignore @@ -659,7 +659,7 @@ class RemoteServerHandler( override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport)) - event.getChannel.close + event.getChannel.close() } private def getClientAddress(c: Channel): Option[RemoteAddress] = @@ -679,7 +679,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na if (open.get) { super.add(channel) } else { - channel.close + channel.close() false } } finally { @@ -690,7 +690,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na override def close(): ChannelGroupFuture = { guard.writeLock().lock() try { - if (open.getAndSet(false)) super.close else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") + if (open.getAndSet(false)) super.close() else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") } finally { guard.writeLock().unlock() } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index f72904fc3f..63b631473d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -3,16 +3,16 @@ package akka.remote import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RemoteConfigSpec extends AkkaSpec { +class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") { - "ClusterSpec: A Deployer" must { - "be able to parse 'akka.actor.cluster._' config elements" in { + "RemoteExtension" must { + "be able to parse remote and cluster config elements" in { val config = RemoteExtension(system).config import config._ //akka.remote - getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport") + getString("akka.remote.transport") must equal("akka.cluster.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") getBoolean("akka.remote.use-passive-connections") must equal(true) // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) @@ -35,7 +35,7 @@ class RemoteConfigSpec extends AkkaSpec { // TODO cluster config will go into akka-cluster-reference.conf when we enable that module //akka.cluster getString("akka.cluster.name") must equal("default-cluster") - getString("akka.cluster.nodename") must equal("") + getString("akka.cluster.nodename") must equal("node1") getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String]) // getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 382c25523d..edbc589099 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -214,7 +214,6 @@ class CallingThreadDispatcher( } if (handle ne null) { try { - if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle) mbox.actor.invoke(handle) true } catch {