First walk throught of FIXME. See #1378

* Fixed obvious
* Created tickets for several, #1408, #1409, #1410, #1412, #1415, 1416, #1418
* Moved LoggingReceive from akka.actor to akka.event
* Touched several of the FIXME to make them visible in code review
This commit is contained in:
Patrik Nordwall 2011-11-30 10:20:57 +01:00
parent af3a7101da
commit 80ac1737cd
28 changed files with 146 additions and 129 deletions

View file

@ -1,18 +1,27 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
package akka.event
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.duration._
import akka.testkit._
import org.scalatest.WordSpec
import akka.event.Logging
import akka.util.Duration
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import scala.collection.JavaConverters._
import java.util.Properties
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.HotSwap
import akka.actor.UnhandledMessageException
import akka.actor.PoisonPill
import akka.actor.ActorSystemImpl
import akka.actor.Props
import akka.actor.OneForOneStrategy
import akka.actor.ActorKilledException
import akka.actor.Kill
object LoggingReceiveSpec {
class TestLogActor extends Actor {
@ -58,7 +67,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val r: Actor.Receive = {
case null
}
val log = Actor.LoggingReceive("funky", r)
val log = LoggingReceive("funky")(r)
log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
}
@ -70,7 +79,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[Logging.Error])
val actor = TestActorRef(new Actor {
def receive = loggable(this) {
def receive = LoggingReceive(this) {
case x
sender ! "x"
}
@ -100,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
new TestKit(appLogging) with ImplicitSender {
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
val actor = TestActorRef(new Actor {
def receive = loggable(this)(loggable(this) {
def receive = LoggingReceive(this)(LoggingReceive(this) {
case _ sender ! "x"
})
})

View file

@ -81,7 +81,7 @@ akka {
# optional
replication { # use replication or not? only makes sense for a stateful actor
# FIXME should we have this config option here? If so, implement it all through.
# serialize-mailbox not implemented, ticket #1412
serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot?
# default is 'off'

View file

@ -160,25 +160,6 @@ object Actor {
type Receive = PartialFunction[Any, Unit]
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)
}
object LoggingReceive {
def apply(source: AnyRef, r: Receive)(implicit system: ActorSystem): Receive = r match {
case _: LoggingReceive r
case _ new LoggingReceive(source, r)
}
}
object emptyBehavior extends Receive {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("empty behavior apply()")
@ -235,22 +216,6 @@ trait Actor {
*/
implicit def defaultTimeout = system.settings.ActorTimeout
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the EventHandler each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = loggable {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def loggable(self: AnyRef)(r: Receive): Receive = if (system.settings.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait?
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>

View file

@ -190,6 +190,7 @@ private[akka] class ActorCell(
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")"))
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
try {
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
@ -222,6 +223,7 @@ private[akka] class ActorCell(
props.faultHandler.handleSupervisorRestarted(cause, self, children)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e try {
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
@ -283,7 +285,7 @@ private[akka] class ActorCell(
} catch {
case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self.toString, "error while processing " + message))
//TODO FIXME How should problems here be handled?
//TODO FIXME How should problems here be handled???
throw e
}
}
@ -294,7 +296,7 @@ private[akka] class ActorCell(
currentMessage = messageHandle
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg if stopping // receiving Terminated in response to stopping children is too common to generate noise

View file

@ -111,12 +111,12 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
/**
* Suspends the actor. It will not process messages while suspended.
*/
def suspend(): Unit //TODO FIXME REMOVE THIS
def suspend(): Unit //TODO FIXME REMOVE THIS, ticket #1415
/**
* Resumes a suspended actor.
*/
def resume(): Unit //TODO FIXME REMOVE THIS
def resume(): Unit //TODO FIXME REMOVE THIS, ticket #1415
/**
* Shuts down the actor its dispatcher and message queue.
@ -135,7 +135,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS, ticket #1416
/**
* Deregisters this actor from being a death monitor of the provided ActorRef
@ -144,7 +144,7 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
*/
def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS, ticket #1416
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
@ -201,13 +201,13 @@ class LocalActorRef private[akka] (
* message sends done from the same thread after calling this method will not
* be processed until resumed.
*/
//FIXME TODO REMOVE THIS, NO REPLACEMENT
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
def suspend(): Unit = actorCell.suspend()
/**
* Resumes a suspended actor.
*/
//FIXME TODO REMOVE THIS, NO REPLACEMENT
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
def resume(): Unit = actorCell.resume()
/**
@ -237,7 +237,7 @@ class LocalActorRef private[akka] (
protected[akka] def underlying: ActorCell = actorCell
// FIXME TODO: remove this method
// FIXME TODO: remove this method. It is used in testkit.
// @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0")
protected[akka] def underlyingActorInstance: Actor = {
var instance = actorCell.actor
@ -308,7 +308,6 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
import akka.serialization.Serialization.currentSystem
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = currentSystem.value match {
@ -330,9 +329,11 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef {
private[akka] val uuid: Uuid = newUuid()
def name: String = uuid.toString
//FIXME REMOVE THIS, ticket #1416
def startsWatching(actorRef: ActorRef): ActorRef = actorRef
def stopsWatching(actorRef: ActorRef): ActorRef = actorRef
//FIXME REMOVE THIS, ticket #1415
def suspend(): Unit = ()
def resume(): Unit = ()

View file

@ -36,10 +36,10 @@ trait ActorRefProvider {
def deathWatch: DeathWatch
// FIXME: remove/replace?
// FIXME: remove/replace???
def nodename: String
// FIXME: remove/replace?
// FIXME: remove/replace???
def clustername: String
/**
@ -64,7 +64,7 @@ trait ActorRefProvider {
/**
* Create an Actor with the given full path below the given supervisor.
*
* FIXME: Remove! this is dangerous!
* FIXME: Remove! this is dangerous!?
*/
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef
@ -296,7 +296,7 @@ class LocalActorRefProvider(
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
val name = path.name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match {
case null

View file

@ -209,8 +209,6 @@ abstract class ActorSystem extends ActorRefFactory {
* effort basis and hence not strictly guaranteed.
*/
def deadLetters: ActorRef
// FIXME: do not publish this
def deadLetterMailbox: Mailbox
/**
* Light-weight scheduler for running asynchronous tasks after some deadline
@ -328,7 +326,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
//FIXME Set this to a Failure when things bubble to the top
//FIXME Set this to a Failure when things bubble to the top. What does this mean?
def terminationFuture: Future[Unit] = provider.terminationFuture
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian

View file

@ -58,8 +58,7 @@ trait BootableActorLoaderService extends Bootable {
abstract override def onUnload() = {
super.onUnload()
// FIXME shutdown all actors
// system.registry.local.shutdownAll
system.stop()
}
}

View file

@ -18,7 +18,6 @@ import com.typesafe.config.Config
trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit
private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only?
private[akka] def deploy(deployment: Deploy): Unit
private[akka] def lookupDeploymentFor(path: String): Option[Deploy]
def lookupDeployment(path: String): Option[Deploy] = path match {
@ -49,8 +48,6 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments)
def shutdown(): Unit = instance.shutdown() //TODO FIXME Why should we have "shutdown", should be crash only?
def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
def isLocal(deployment: Deploy): Boolean = deployment match {

View file

@ -69,6 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
try {
function()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
} finally {
cleanup()

View file

@ -163,6 +163,7 @@ object Future {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
}
}
@ -411,7 +412,9 @@ object Future {
try {
next.apply()
} catch {
case e e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage))
}
}
} finally { _taskStack set None }
@ -984,7 +987,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else promise complete (try { Right(fallback) } catch { case e Left(e) })
else promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
}
}
}
@ -994,6 +997,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} else this
private def notifyCompleted(func: Future[T] Unit) {
// FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}

View file

@ -28,9 +28,6 @@ object Mailbox {
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
// mailbox debugging helper using println (see below)
// FIXME TODO take this out before release
final val debug = false
}
/**
@ -167,7 +164,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0
do {
if (debug) println(actor.self + " processing message " + nextMessage)
actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages
@ -190,7 +186,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// dont ever execute normal message when system message present!
@ -245,7 +240,6 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor.Actor.Receive
import akka.actor.ActorSystem
import akka.event.Logging.Debug
object LoggingReceive {
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the event bus each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = LoggingReceive(this) {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def apply(source: AnyRef)(r: Receive)(implicit system: ActorSystem): Receive = r match {
case _: LoggingReceive r
case _ if !system.settings.AddLoggingReceive r
case _ new LoggingReceive(source, r)
}
}
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)
}

View file

@ -245,7 +245,7 @@ trait BasicRouter extends Router {
next match {
case Some(connection)
try {
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it??
} catch {
case e: Exception
connectionManager.remove(connection)

View file

@ -108,6 +108,7 @@ class BoundedBlockingQueue[E <: AnyRef](
throw ie
}
false
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
notFull.signal()
result = e
@ -234,7 +235,7 @@ class BoundedBlockingQueue[E <: AnyRef](
if (backing.removeAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal()
if (sz > 0) notEmpty.signal() //FIXME needed?
if (sz > 0) notEmpty.signal() //FIXME needed??
true
} else false
} finally {
@ -247,7 +248,7 @@ class BoundedBlockingQueue[E <: AnyRef](
try {
if (backing.retainAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal() //FIXME needed?
if (sz < maxCapacity) notFull.signal() //FIXME needed??
if (sz > 0) notEmpty.signal()
true
} else false

View file

@ -717,11 +717,12 @@ options:
``akka.actor.debug.receive`` — which enables the :meth:`loggable`
statement to be applied to an actors :meth:`receive` function::
def receive = Actor.loggable(this) { // `Actor` unnecessary with import Actor._
import akka.event.LoggingReceive
def receive = LoggingReceive(this) {
case msg => ...
}
The first argument to :meth:`loggable` defines the source to be used in the
The first argument to :meth:`LoggingReceive` defines the source to be used in the
logging events, which should be the current actor.
If the abovementioned setting is not given in ``akka.conf``, this method will

View file

@ -62,7 +62,8 @@ class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
queue.remove
true
} catch {
case e false //review why catch Throwable? And swallow potential Errors?
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e false
}
}

View file

@ -66,7 +66,7 @@ public class ZooKeeperQueue<T extends Object> {
return element.getData();
} else {
throw new UnsupportedOperationException("Non-blocking ZooKeeperQueue is not yet supported");
/* FIXME DOES NOT WORK
/* TODO DOES NOT WORK
try {
String headName = getSmallestElement(_zkClient.getChildren(_elementsPath));
String headPath = getElementPath(headName);

View file

@ -59,7 +59,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
queue.clear
true
} catch {
case e false
case e: Exception false
}
override def cleanUp() {

View file

@ -8,8 +8,7 @@
akka {
remote {
# FIXME rename to transport
layer = "akka.cluster.netty.NettyRemoteSupport"
transport = "akka.cluster.netty.NettyRemoteSupport"
use-compression = off
@ -28,6 +27,15 @@ akka {
max-sample-size = 1000
}
gossip {
initialDelay = 5s
frequency = 1s
}
compute-grid-dispatcher { # The dispatcher used for remote system messages
name = ComputeGridDispatcher # defaults to same settings as default-dispatcher
}
server {
hostname = "" # The hostname or ip to bind the remoting to, InetAddress.getLocalHost.getHostAddress is used if empty
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)

View file

@ -15,6 +15,7 @@ import akka.config.ConfigurationException
import akka.serialization.SerializationExtension
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.SECONDS
import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp }
@ -122,19 +123,15 @@ class Gossiper(remote: Remote) {
private val nodeFingerprint = address.##
private val random = SecureRandom.getInstance("SHA1PRNG")
private val initalDelayForGossip = 5 seconds // FIXME make configurable
private val gossipFrequency = 1 seconds // FIXME make configurable
private val timeUnit = {
assert(gossipFrequency.unit == initalDelayForGossip.unit)
initalDelayForGossip.unit
}
private val initalDelayForGossip = remoteExtension.InitalDelayForGossip
private val gossipFrequency = remoteExtension.GossipFrequency
private val state = new AtomicReference[State](State(currentGossip = newGossip()))
{
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))
system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))
}
/**

View file

@ -45,7 +45,7 @@ object NetworkEventStream {
case event: RemoteClientLifeCycleEvent
listeners(event.remoteAddress) foreach (_ notify event)
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent, ticket #1408 and #1190
case Register(listener, connectionAddress)
listeners(connectionAddress) += listener
@ -62,7 +62,7 @@ class NetworkEventStream(system: ActorSystemImpl) {
import NetworkEventStream._
// FIXME: check that this supervision is correct
// FIXME: check that this supervision is correct, ticket #1408
private[akka] val sender = system.provider.actorOf(system,
Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
system.systemGuardian, "network-event-sender", systemService = true)

View file

@ -47,10 +47,9 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
val remoteDaemonServiceName = "akka-system-remote-daemon".intern
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build
val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
// FIXME it is probably better to create another supervisor for handling the children created by handle_*
// FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408
private[remote] lazy val remoteDaemonSupervisor = system.actorOf(Props(
OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want?
@ -73,13 +72,11 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
lazy val server: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport(system)
remote.start() //TODO FIXME Any application loader here?
remote.start() //TODO Any application loader here?
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
// TODO actually register this provider in system in remote mode
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
remote
}
@ -157,9 +154,9 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
sender ! Success(remoteAddress)
} catch {
case error: Throwable //FIXME doesn't seem sensible
sender ! Failure(error)
throw error
case exc: Exception
sender ! Failure(exc)
throw exc
}
}
@ -192,7 +189,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
def tempPath = remoteDaemon.path / tempName
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -201,7 +198,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -210,7 +207,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -219,7 +216,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(

View file

@ -86,7 +86,7 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
else {
val name = path.name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
@ -100,7 +100,7 @@ class RemoteActorRefProvider(
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
} else {
implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
@ -177,7 +177,7 @@ class RemoteActorRefProvider(
/**
* Copied from LocalActorRefProvider...
*/
// FIXME: implement supervision
// FIXME: implement supervision, ticket #1408
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
new RoutedActorRef(system, props, supervisor, name)
@ -266,7 +266,7 @@ class RemoteActorRefProvider(
}
}
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
@ -306,7 +306,7 @@ private[akka] case class RemoteActorRef private[akka] (
def resume(): Unit = ()
def stop() { //FIXME send the cause as well!
def stop() { //FIXME send the cause as well! (WDYM?)
synchronized {
if (running) {
running = false
@ -318,9 +318,9 @@ private[akka] case class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = provider.serialize(this)
def startsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
def startsWatching(actorRef: ActorRef): ActorRef = unsupported ////FIXME Implement Remote DeathWatch, ticket #1190
def stopsWatching(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
def stopsWatching(actorRef: ActorRef): ActorRef = unsupported ////FIXME Implement Remote DeathWatch, ticket #1190
protected[akka] def restart(cause: Throwable): Unit = ()

View file

@ -13,7 +13,6 @@ import java.net.InetAddress
import akka.config.ConfigurationException
import com.eaio.uuid.UUID
import akka.actor._
import scala.collection.JavaConverters._
object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider {
@ -29,19 +28,20 @@ class RemoteExtensionSettings(cfg: Config) extends Extension {
import config._
val RemoteTransport = getString("akka.remote.layer")
val RemoteTransport = getString("akka.remote.transport")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_))
// FIXME remove nodename from config - should only be passed as command line arg or read from properties file etc.
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" new UUID().toString
case "" throw new ConfigurationException("akka.cluster.nodename configuration property must be defined")
case value value
}

View file

@ -66,7 +66,7 @@ abstract class RemoteClient private[akka] (
* Sends the message across the wire
*/
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
if (isRunning) { //FIXME RACY, ticket #1409
log.debug("Sending message: " + new RemoteMessage(request, remoteSupport))
try {
@ -125,7 +125,7 @@ class ActiveRemoteClient private[akka] (
import remoteSupport.clientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
//TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
@ -161,7 +161,7 @@ class ActiveRemoteClient private[akka] (
def closeChannel(connection: ChannelFuture) = {
val channel = connection.getChannel
openChannels.remove(channel)
channel.close
channel.close()
}
def attemptReconnect(): Boolean = {
@ -345,7 +345,7 @@ class ActiveRemoteClientHandler(
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
}
case e: Exception
event.getChannel.close //FIXME Is this the correct behavior?
event.getChannel.close() //FIXME Is this the correct behavior???
}
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress))
@ -648,7 +648,7 @@ class RemoteServerHandler(
val inbound = RemoteAddress(origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here
case CommandType.SHUTDOWN //FIXME Dispose passive connection here, ticket #1410
case _ //Unknown command
}
case _ //ignore
@ -659,7 +659,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport))
event.getChannel.close
event.getChannel.close()
}
private def getClientAddress(c: Channel): Option[RemoteAddress] =
@ -679,7 +679,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
if (open.get) {
super.add(channel)
} else {
channel.close
channel.close()
false
}
} finally {
@ -690,7 +690,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
override def close(): ChannelGroupFuture = {
guard.writeLock().lock()
try {
if (open.getAndSet(false)) super.close else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
if (open.getAndSet(false)) super.close() else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
} finally {
guard.writeLock().unlock()
}

View file

@ -3,16 +3,16 @@ package akka.remote
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteConfigSpec extends AkkaSpec {
class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
"ClusterSpec: A Deployer" must {
"be able to parse 'akka.actor.cluster._' config elements" in {
"RemoteExtension" must {
"be able to parse remote and cluster config elements" in {
val config = RemoteExtension(system).config
import config._
//akka.remote
getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport")
getString("akka.remote.transport") must equal("akka.cluster.netty.NettyRemoteSupport")
getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
@ -35,7 +35,7 @@ class RemoteConfigSpec extends AkkaSpec {
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
//akka.cluster
getString("akka.cluster.name") must equal("default-cluster")
getString("akka.cluster.nodename") must equal("")
getString("akka.cluster.nodename") must equal("node1")
getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String])
// getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000)

View file

@ -214,7 +214,6 @@ class CallingThreadDispatcher(
}
if (handle ne null) {
try {
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
mbox.actor.invoke(handle)
true
} catch {