FIXMEs, young grasshopper
This commit is contained in:
parent
13a95ce15b
commit
1756b6aa54
13 changed files with 20 additions and 60 deletions
|
|
@ -290,6 +290,8 @@ private[akka] class ActorCell(
|
|||
parent.sendSystemMessage(akka.dispatch.Supervise(self))
|
||||
|
||||
dispatcher.attach(this)
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
dispatcher.systemDispatch(this, Create())
|
||||
}
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
|
|
@ -360,7 +362,7 @@ private[akka] class ActorCell(
|
|||
checkReceiveTimeout
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
|
||||
} catch {
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒
|
||||
try {
|
||||
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
|
||||
|
|
@ -394,7 +396,7 @@ private[akka] class ActorCell(
|
|||
|
||||
props.faultHandler.handleSupervisorRestarted(cause, self, children)
|
||||
} catch {
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒ try {
|
||||
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
|
|
|
|||
|
|
@ -41,10 +41,8 @@ trait ActorRefProvider {
|
|||
*/
|
||||
def deathWatch: DeathWatch
|
||||
|
||||
// FIXME: remove/replace???
|
||||
def nodename: String
|
||||
|
||||
// FIXME: remove/replace???
|
||||
def clustername: String
|
||||
|
||||
/**
|
||||
|
|
@ -291,7 +289,6 @@ class LocalActorRefProvider(
|
|||
new RootActorPath(LocalAddress(_systemName)),
|
||||
new Deployer(settings))
|
||||
|
||||
// FIXME remove both
|
||||
val nodename: String = "local"
|
||||
val clustername: String = "local"
|
||||
|
||||
|
|
|
|||
|
|
@ -544,23 +544,6 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit
|
|||
|
||||
// Private API
|
||||
|
||||
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = {
|
||||
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
|
||||
val actorVar = new AtomVar[ActorRef](null)
|
||||
|
||||
//FIXME
|
||||
val timeout = settings.ActorTimeout
|
||||
/*val timeout = props.timeout match {
|
||||
case Props.`defaultTimeout` ⇒ settings.ActorTimeout
|
||||
case x ⇒ x
|
||||
}*/
|
||||
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T]
|
||||
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
|
||||
val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props)
|
||||
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
|
||||
proxyVar.get
|
||||
}
|
||||
|
||||
private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = {
|
||||
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
|
||||
val actorVar = new AtomVar[ActorRef](null)
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
|
|||
try {
|
||||
runnable.run()
|
||||
} catch {
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
|
||||
} finally {
|
||||
cleanup()
|
||||
|
|
@ -208,8 +208,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
|||
*/
|
||||
protected[akka] def register(actor: ActorCell) {
|
||||
inhabitantsUpdater.incrementAndGet(this)
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -148,7 +148,7 @@ object Future {
|
|||
try {
|
||||
Right(body)
|
||||
} catch {
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒ Left(e)
|
||||
}
|
||||
}
|
||||
|
|
@ -322,7 +322,7 @@ object Future {
|
|||
next.apply()
|
||||
} catch {
|
||||
case e ⇒
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
executor match {
|
||||
case m: MessageDispatcher ⇒
|
||||
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage))
|
||||
|
|
@ -423,8 +423,8 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
|
|||
* Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this.
|
||||
* This is semantically the same as: Future.firstCompletedOf(Seq(this, that))
|
||||
*/
|
||||
//FIXME implement as The result of any of the Futures, or if oth failed, the first failure
|
||||
def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize
|
||||
// TODO ticket #1650
|
||||
def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that))
|
||||
|
||||
/**
|
||||
* Creates a new Future that will handle any matching Throwable that this
|
||||
|
|
|
|||
|
|
@ -116,25 +116,6 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
|
|||
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
|
||||
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs)
|
||||
|
||||
/**
|
||||
* FIXME implement support for this
|
||||
*/
|
||||
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
|
||||
if (bindings.isEmpty)
|
||||
Left(NoSerializerFoundException("No mapping serializer found for " + cl))
|
||||
else {
|
||||
bindings find {
|
||||
case (clazzName, _) ⇒
|
||||
ReflectiveAccess.getClassFor(clazzName) match {
|
||||
case Right(clazz) ⇒ clazz.isAssignableFrom(cl)
|
||||
case _ ⇒ false
|
||||
}
|
||||
} map {
|
||||
case (_, ser) ⇒ serializerOf(ser)
|
||||
} getOrElse Left(NoSerializerFoundException("No mapping serializer found for " + cl))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
|
||||
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
throw ie
|
||||
}
|
||||
false
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒
|
||||
notFull.signal()
|
||||
result = e
|
||||
|
|
@ -235,7 +235,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
if (backing.removeAll(c)) {
|
||||
val sz = backing.size()
|
||||
if (sz < maxCapacity) notFull.signal()
|
||||
if (sz > 0) notEmpty.signal() //FIXME needed??
|
||||
if (sz > 0) notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
|
|
@ -248,7 +248,7 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
try {
|
||||
if (backing.retainAll(c)) {
|
||||
val sz = backing.size()
|
||||
if (sz < maxCapacity) notFull.signal() //FIXME needed??
|
||||
if (sz < maxCapacity) notFull.signal()
|
||||
if (sz > 0) notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) wi
|
|||
queue.remove
|
||||
true
|
||||
} catch {
|
||||
// FIXME catching all and continue isn't good for OOME, ticket #1418
|
||||
// TODO catching all and continue isn't good for OOME, ticket #1418
|
||||
case e ⇒ false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ case class RemoteNettyAddress(host: String, ip: Option[InetAddress], port: Int)
|
|||
|
||||
object RemoteNettyAddress {
|
||||
def apply(host: String, port: Int): RemoteNettyAddress = {
|
||||
// FIXME this may BLOCK for extended periods of time!
|
||||
// TODO ticket #1639
|
||||
val ip = try Some(InetAddress.getByName(host)) catch { case _: UnknownHostException ⇒ None }
|
||||
new RemoteNettyAddress(host, ip, port)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -350,8 +350,7 @@ class ActiveRemoteClientHandler(
|
|||
runOnceNow {
|
||||
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
|
||||
}
|
||||
case e: Exception ⇒
|
||||
event.getChannel.close() //FIXME Is this the correct behavior???
|
||||
case e: Exception ⇒ event.getChannel.close()
|
||||
}
|
||||
|
||||
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress))
|
||||
|
|
@ -670,7 +669,7 @@ class RemoteServerHandler(
|
|||
val inbound = RemoteNettyAddress(origin.getHostname, origin.getPort)
|
||||
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
|
||||
remoteSupport.bindClient(inbound, client)
|
||||
case CommandType.SHUTDOWN ⇒ //FIXME Dispose passive connection here, ticket #1410
|
||||
case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed
|
||||
case _ ⇒ //Unknown command
|
||||
}
|
||||
case _ ⇒ //ignore
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ trait RemoteRouterConfig extends RouterConfig {
|
|||
case x ⇒ throw new ConfigurationException("unparseable remote node " + x)
|
||||
}
|
||||
val node = Stream.continually(nodes).flatten.iterator
|
||||
val impl = context.system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
|
||||
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield {
|
||||
val name = "c" + i
|
||||
val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next))
|
||||
|
|
|
|||
|
|
@ -88,12 +88,12 @@ class TestFSMRef[S, D, T <: Actor](
|
|||
object TestFSMRef {
|
||||
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName)
|
||||
}
|
||||
|
||||
def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
new TestFSMRef(impl, system.dispatchers.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], name)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class TestKit(_system: ActorSystem) {
|
|||
* registration as message target.
|
||||
*/
|
||||
lazy val testActor: ActorRef = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
impl.systemActorOf(Props(new TestActor(queue))
|
||||
.withDispatcher(CallingThreadDispatcher.Id),
|
||||
"testActor" + TestKit.testActorId.incrementAndGet)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue