Merge pull request #131 from jboner/wip-1378-fixme-patriknw

First walk through of FIXME. See #1378
This commit is contained in:
patriknw 2011-12-02 00:28:20 -08:00
commit 035f514843
28 changed files with 136 additions and 127 deletions

View file

@ -1,17 +1,26 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
package akka.event
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.util.duration._
import akka.testkit._
import org.scalatest.WordSpec
import akka.event.Logging
import akka.util.Duration
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import java.util.Properties
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.HotSwap
import akka.actor.UnhandledMessageException
import akka.actor.PoisonPill
import akka.actor.ActorSystemImpl
import akka.actor.Props
import akka.actor.OneForOneStrategy
import akka.actor.ActorKilledException
import akka.actor.Kill
object LoggingReceiveSpec {
class TestLogActor extends Actor {
@ -57,7 +66,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val r: Actor.Receive = {
case null
}
val log = Actor.LoggingReceive("funky", r)
val log = LoggingReceive("funky")(r)
log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
}
@ -69,7 +78,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[Logging.Error])
val actor = TestActorRef(new Actor {
def receive = loggable(this) {
def receive = LoggingReceive(this) {
case x
sender ! "x"
}
@ -99,7 +108,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
new TestKit(appLogging) with ImplicitSender {
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
val actor = TestActorRef(new Actor {
def receive = loggable(this)(loggable(this) {
def receive = LoggingReceive(this)(LoggingReceive(this) {
case _ sender ! "x"
})
})

View file

@ -81,7 +81,7 @@ akka {
# optional
replication { # use replication or not? only makes sense for a stateful actor
# FIXME should we have this config option here? If so, implement it all through.
# serialize-mailbox not implemented, ticket #1412
serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot?
# default is 'off'

View file

@ -160,25 +160,6 @@ object Actor {
type Receive = PartialFunction[Any, Unit]
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)
}
object LoggingReceive {
def apply(source: AnyRef, r: Receive)(implicit system: ActorSystem): Receive = r match {
case _: LoggingReceive r
case _ new LoggingReceive(source, r)
}
}
object emptyBehavior extends Receive {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("empty behavior apply()")
@ -235,22 +216,6 @@ trait Actor {
*/
implicit def defaultTimeout = system.settings.ActorTimeout
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the EventHandler each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = loggable {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def loggable(self: AnyRef)(r: Receive): Receive = if (system.settings.AddLoggingReceive) LoggingReceive(self, r) else r //TODO FIXME Shouldn't this be in a Loggable-trait?
/**
* The 'self' field holds the ActorRef for this actor.
* <p/>

View file

@ -194,6 +194,7 @@ private[akka] class ActorCell(
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")"))
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
try {
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
@ -226,6 +227,7 @@ private[akka] class ActorCell(
props.faultHandler.handleSupervisorRestarted(cause, self, children)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e try {
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
@ -287,7 +289,7 @@ private[akka] class ActorCell(
} catch {
case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self.toString, "error while processing " + message))
//TODO FIXME How should problems here be handled?
//TODO FIXME How should problems here be handled???
throw e
}
}
@ -298,7 +300,7 @@ private[akka] class ActorCell(
currentMessage = messageHandle
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg if stopping // receiving Terminated in response to stopping children is too common to generate noise

View file

@ -173,13 +173,13 @@ class LocalActorRef private[akka] (
* message sends done from the same thread after calling this method will not
* be processed until resumed.
*/
//FIXME TODO REMOVE THIS, NO REPLACEMENT
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
def suspend(): Unit = actorCell.suspend()
/**
* Resumes a suspended actor.
*/
//FIXME TODO REMOVE THIS, NO REPLACEMENT
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
def resume(): Unit = actorCell.resume()
/**
@ -191,7 +191,7 @@ class LocalActorRef private[akka] (
protected[akka] def underlying: ActorCell = actorCell
// FIXME TODO: remove this method
// FIXME TODO: remove this method. It is used in testkit.
// @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0")
protected[akka] def underlyingActorInstance: Actor = {
var instance = actorCell.actor
@ -266,7 +266,6 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
import akka.serialization.Serialization.currentSystem
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = currentSystem.value match {
@ -288,6 +287,8 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef with RefInternals {
private[akka] val uuid: Uuid = newUuid()
def name: String = uuid.toString
//FIXME REMOVE THIS, ticket #1416
//FIXME REMOVE THIS, ticket #1415
def suspend(): Unit = ()
def resume(): Unit = ()

View file

@ -36,10 +36,10 @@ trait ActorRefProvider {
def deathWatch: DeathWatch
// FIXME: remove/replace?
// FIXME: remove/replace???
def nodename: String
// FIXME: remove/replace?
// FIXME: remove/replace???
def clustername: String
/**
@ -64,7 +64,7 @@ trait ActorRefProvider {
/**
* Create an Actor with the given full path below the given supervisor.
*
* FIXME: Remove! this is dangerous!
* FIXME: Remove! this is dangerous!?
*/
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef
@ -296,7 +296,7 @@ class LocalActorRefProvider(
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, path: ActorPath, systemService: Boolean): ActorRef = {
val name = path.name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match {
case null

View file

@ -229,8 +229,6 @@ abstract class ActorSystem extends ActorRefFactory {
* effort basis and hence not strictly guaranteed.
*/
def deadLetters: ActorRef
// FIXME: do not publish this
def deadLetterMailbox: Mailbox
/**
* Light-weight scheduler for running asynchronous tasks after some deadline
@ -349,7 +347,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// TODO why implicit val dispatcher?
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
//FIXME Set this to a Failure when things bubble to the top
def terminationFuture: Future[Unit] = provider.terminationFuture
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian

View file

@ -57,9 +57,6 @@ trait BootableActorLoaderService extends Bootable {
abstract override def onUnload() = {
super.onUnload()
// FIXME shutdown all actors
// system.registry.local.shutdownAll
}
}

View file

@ -18,7 +18,6 @@ import com.typesafe.config.Config
trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit
private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only?
private[akka] def deploy(deployment: Deploy): Unit
private[akka] def lookupDeploymentFor(path: String): Option[Deploy]
def lookupDeployment(path: String): Option[Deploy] = path match {
@ -49,8 +48,6 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments)
def shutdown(): Unit = instance.shutdown() //TODO FIXME Why should we have "shutdown", should be crash only?
def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
def isLocal(deployment: Deploy): Boolean = deployment match {

View file

@ -69,6 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
try {
function()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
} finally {
cleanup()

View file

@ -27,10 +27,7 @@ class FutureTimeoutException(message: String, cause: Throwable = null) extends A
def this(message: String) = this(message, null)
}
class FutureFactory(dispatcher: MessageDispatcher, timeout: Timeout) {
// TODO: remove me ASAP !!!
implicit val _dispatcher = dispatcher
class FutureFactory()(implicit dispatcher: MessageDispatcher, timeout: Timeout) {
/**
* Java API, equivalent to Future.apply
@ -163,6 +160,7 @@ object Future {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
}
}
@ -411,7 +409,9 @@ object Future {
try {
next.apply()
} catch {
case e e.printStackTrace() //TODO FIXME strategy for handling exceptions in callbacks
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage))
}
}
} finally { _taskStack set None }
@ -984,7 +984,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else promise complete (try { Right(fallback) } catch { case e Left(e) })
else promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
}
}
}
@ -994,6 +994,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} else this
private def notifyCompleted(func: Future[T] Unit) {
// FIXME catching all and continue isn't good for OOME, ticket #1418
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}

View file

@ -28,9 +28,6 @@ object Mailbox {
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
// mailbox debugging helper using println (see below)
// FIXME TODO take this out before release
final val debug = false
}
/**
@ -167,7 +164,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var processedMessages = 0
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0
do {
if (debug) println(actor.self + " processing message " + nextMessage)
actor invoke nextMessage
processAllSystemMessages() //After we're done, process all system messages
@ -190,7 +186,6 @@ abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMes
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs)
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// dont ever execute normal message when system message present!
@ -245,7 +240,6 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor.Actor.Receive
import akka.actor.ActorSystem
import akka.event.Logging.Debug
object LoggingReceive {
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the event bus each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = LoggingReceive(this) {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def apply(source: AnyRef)(r: Receive)(implicit system: ActorSystem): Receive = r match {
case _: LoggingReceive r
case _ if !system.settings.AddLoggingReceive r
case _ new LoggingReceive(source, r)
}
}
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)
}

View file

@ -251,7 +251,7 @@ trait BasicRouter extends Router {
next match {
case Some(connection)
try {
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it??
} catch {
case e: Exception
connectionManager.remove(connection)

View file

@ -108,6 +108,7 @@ class BoundedBlockingQueue[E <: AnyRef](
throw ie
}
false
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
notFull.signal()
result = e
@ -234,7 +235,7 @@ class BoundedBlockingQueue[E <: AnyRef](
if (backing.removeAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal()
if (sz > 0) notEmpty.signal() //FIXME needed?
if (sz > 0) notEmpty.signal() //FIXME needed??
true
} else false
} finally {
@ -247,7 +248,7 @@ class BoundedBlockingQueue[E <: AnyRef](
try {
if (backing.retainAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal() //FIXME needed?
if (sz < maxCapacity) notFull.signal() //FIXME needed??
if (sz > 0) notEmpty.signal()
true
} else false

View file

@ -717,11 +717,12 @@ options:
``akka.actor.debug.receive`` — which enables the :meth:`loggable`
statement to be applied to an actors :meth:`receive` function::
def receive = Actor.loggable(this) { // `Actor` unnecessary with import Actor._
import akka.event.LoggingReceive
def receive = LoggingReceive(this) {
case msg => ...
}
The first argument to :meth:`loggable` defines the source to be used in the
The first argument to :meth:`LoggingReceive` defines the source to be used in the
logging events, which should be the current actor.
If the abovementioned setting is not given in ``akka.conf``, this method will

View file

@ -62,7 +62,8 @@ class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
queue.remove
true
} catch {
case e false //review why catch Throwable? And swallow potential Errors?
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e false
}
}

View file

@ -66,7 +66,7 @@ public class ZooKeeperQueue<T extends Object> {
return element.getData();
} else {
throw new UnsupportedOperationException("Non-blocking ZooKeeperQueue is not yet supported");
/* FIXME DOES NOT WORK
/* TODO DOES NOT WORK
try {
String headName = getSmallestElement(_zkClient.getChildren(_elementsPath));
String headPath = getElementPath(headName);

View file

@ -59,7 +59,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
queue.clear
true
} catch {
case e false
case e: Exception false
}
override def cleanUp() {

View file

@ -8,8 +8,7 @@
akka {
remote {
# FIXME rename to transport
layer = "akka.cluster.netty.NettyRemoteSupport"
transport = "akka.cluster.netty.NettyRemoteSupport"
use-compression = off
@ -27,6 +26,15 @@ akka {
# generates fewer mistakes but needs more time to detect actual crashes
max-sample-size = 1000
}
gossip {
initialDelay = 5s
frequency = 1s
}
compute-grid-dispatcher { # The dispatcher used for remote system messages
name = ComputeGridDispatcher # defaults to same settings as default-dispatcher
}
server {
hostname = "" # The hostname or ip to bind the remoting to, InetAddress.getLocalHost.getHostAddress is used if empty

View file

@ -15,6 +15,7 @@ import akka.config.ConfigurationException
import akka.serialization.SerializationExtension
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.SECONDS
import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp }
@ -122,19 +123,15 @@ class Gossiper(remote: Remote) {
private val nodeFingerprint = address.##
private val random = SecureRandom.getInstance("SHA1PRNG")
private val initalDelayForGossip = 5 seconds // FIXME make configurable
private val gossipFrequency = 1 seconds // FIXME make configurable
private val timeUnit = {
assert(gossipFrequency.unit == initalDelayForGossip.unit)
initalDelayForGossip.unit
}
private val initalDelayForGossip = remoteExtension.InitalDelayForGossip
private val gossipFrequency = remoteExtension.GossipFrequency
private val state = new AtomicReference[State](State(currentGossip = newGossip()))
{
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))
system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, SECONDS), Duration(gossipFrequency.toSeconds, SECONDS))
}
/**

View file

@ -45,7 +45,7 @@ object NetworkEventStream {
case event: RemoteClientLifeCycleEvent
listeners(event.remoteAddress) foreach (_ notify event)
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent
case event: RemoteServerLifeCycleEvent // FIXME handle RemoteServerLifeCycleEvent, ticket #1408 and #1190
case Register(listener, connectionAddress)
listeners(connectionAddress) += listener
@ -62,7 +62,7 @@ class NetworkEventStream(system: ActorSystemImpl) {
import NetworkEventStream._
// FIXME: check that this supervision is correct
// FIXME: check that this supervision is correct, ticket #1408
private[akka] val sender = system.provider.actorOf(system,
Props[Channel].copy(dispatcher = system.dispatcherFactory.newPinnedDispatcher("NetworkEventStream")),
system.systemGuardian, "network-event-sender", systemService = true)

View file

@ -47,10 +47,9 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
val remoteDaemonServiceName = "akka-system-remote-daemon".intern
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build
val computeGridDispatcher = dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher")
// FIXME it is probably better to create another supervisor for handling the children created by handle_*
// FIXME it is probably better to create another supervisor for handling the children created by handle_*, ticket #1408
private[remote] lazy val remoteDaemonSupervisor = system.actorOf(Props(
OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want?
@ -73,13 +72,11 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
lazy val server: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport(system)
remote.start() //TODO FIXME Any application loader here?
remote.start() //TODO Any application loader here?
system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
// TODO actually register this provider in system in remote mode
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
remote
}
@ -157,9 +154,9 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
sender ! Success(remoteAddress)
} catch {
case error: Throwable //FIXME doesn't seem sensible
sender ! Failure(error)
throw error
case exc: Exception
sender ! Failure(exc)
throw exc
}
}
@ -192,7 +189,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
def tempName = "$_" + Helpers.base64(tempNumber.getAndIncrement())
def tempPath = remoteDaemon.path / tempName
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun0_unit(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -201,7 +198,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun0_any(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -210,7 +207,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun1_arg_unit(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(
@ -219,7 +216,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}).copy(dispatcher = computeGridDispatcher), remoteDaemon, tempPath, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
// FIXME: handle real remote supervision
// FIXME: handle real remote supervision, ticket #1408
def handle_fun1_arg_any(message: RemoteSystemDaemonMessageProtocol) {
new LocalActorRef(systemImpl,
Props(

View file

@ -86,7 +86,7 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
else {
val name = path.name
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](system.settings.ActorTimeout)(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
@ -100,7 +100,7 @@ class RemoteActorRefProvider(
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
} else {
implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
@ -177,7 +177,7 @@ class RemoteActorRefProvider(
/**
* Copied from LocalActorRefProvider...
*/
// FIXME: implement supervision
// FIXME: implement supervision, ticket #1408
def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): ActorRef = {
if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
new RoutedActorRef(system, props, supervisor, name)
@ -266,7 +266,7 @@ class RemoteActorRefProvider(
}
}
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
@ -306,7 +306,7 @@ private[akka] case class RemoteActorRef private[akka] (
def resume(): Unit = ()
def stop() { //FIXME send the cause as well!
def stop() {
synchronized {
if (running) {
running = false

View file

@ -10,7 +10,6 @@ import java.net.InetAddress
import akka.config.ConfigurationException
import com.eaio.uuid.UUID
import akka.actor._
import scala.collection.JavaConverters._
object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider {
@ -22,19 +21,19 @@ class RemoteExtensionSettings(val config: Config) extends Extension {
import config._
val RemoteTransport = getString("akka.remote.layer")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
val InitalDelayForGossip = Duration(config.getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS)
val GossipFrequency = Duration(config.getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_))
// FIXME remove nodename from config - should only be passed as command line arg or read from properties file etc.
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" new UUID().toString
case "" throw new ConfigurationException("akka.cluster.nodename configuration property must be defined")
case value value
}

View file

@ -124,7 +124,7 @@ class ActiveRemoteClient private[akka] (
import remoteSupport.clientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
//TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
@ -159,7 +159,7 @@ class ActiveRemoteClient private[akka] (
def closeChannel(connection: ChannelFuture) = {
val channel = connection.getChannel
openChannels.remove(channel)
channel.close
channel.close()
}
def attemptReconnect(): Boolean = {
@ -339,7 +339,7 @@ class ActiveRemoteClientHandler(
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
}
case e: Exception
event.getChannel.close //FIXME Is this the correct behavior?
event.getChannel.close() //FIXME Is this the correct behavior???
}
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress))
@ -650,7 +650,7 @@ class RemoteServerHandler(
val inbound = RemoteAddress(origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //No need to do anything here
case CommandType.SHUTDOWN //FIXME Dispose passive connection here, ticket #1410
case _ //Unknown command
}
case _ //ignore
@ -661,7 +661,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport))
event.getChannel.close
event.getChannel.close()
}
private def getClientAddress(c: Channel): Option[RemoteAddress] =
@ -681,7 +681,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
if (open.get) {
super.add(channel)
} else {
channel.close
channel.close()
false
}
} finally {
@ -692,7 +692,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
override def close(): ChannelGroupFuture = {
guard.writeLock().lock()
try {
if (open.getAndSet(false)) super.close else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
if (open.getAndSet(false)) super.close() else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
} finally {
guard.writeLock().unlock()
}

View file

@ -3,16 +3,15 @@ package akka.remote
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteConfigSpec extends AkkaSpec {
class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
"ClusterSpec: A Deployer" must {
"be able to parse 'akka.actor.cluster._' config elements" in {
"RemoteExtension" must {
"be able to parse remote and cluster config elements" in {
val config = RemoteExtension(system).config
import config._
//akka.remote
getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport")
getString("akka.remote.secure-cookie") must equal("")
getBoolean("akka.remote.use-passive-connections") must equal(true)
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
@ -35,7 +34,7 @@ class RemoteConfigSpec extends AkkaSpec {
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
//akka.cluster
getString("akka.cluster.name") must equal("default-cluster")
getString("akka.cluster.nodename") must equal("")
getString("akka.cluster.nodename") must equal("node1")
getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String])
// getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000)

View file

@ -214,7 +214,6 @@ class CallingThreadDispatcher(
}
if (handle ne null) {
try {
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
mbox.actor.invoke(handle)
true
} catch {