Added error handling to the actor creation in the different providers

This commit is contained in:
Jonas Bonér 2011-09-27 16:52:33 +02:00
parent 07012b3d62
commit 07b29c0627
2 changed files with 21 additions and 61 deletions

View file

@ -144,74 +144,28 @@ class LocalActorRefProvider extends ActorRefProvider {
val oldFuture = actors.putIfAbsent(address, newFuture)
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
def newActor() = Some(new LocalActorRef(props, address, systemService))
val actor =
val actor = try {
Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
case Some(Deploy(_, _, router, _, LocalScope)) newActor() // create a local actor
case None newActor() // create a local actor
case _ None // non-local actor
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
actor foreach { a Actor.registry.register(a) }
newFuture.completeWithResult(actor)
actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.result.getOrElse(None)
oldFuture.await.resultOrException.getOrElse(None)
}
}
}
// class ClusterActorRefProvider extends ActorRefProvider {
// def actorOf(props: Props, address: String): Option[ActorRef] = {
// deploy match {
// case Deploy(configAddress, recipe, router, failureDetector, Cluster(preferredHomeNodes, replicas, replication))
// ClusterModule.ensureEnabled()
// if (configAddress != address) throw new IllegalStateException("Deployment config for [" + address + "] is wrong [" + deploy + "]")
// if (!remote.isRunning) throw new IllegalStateException("Remote server is not running")
// val isHomeNode = DeploymentConfig.isHomeNode(preferredHomeNodes)
// val serializer = recipe match {
// case Some(r) Serialization.serializerFor(r.implementationClass)
// case None Serialization.serializerFor(classOf[Actor]) //FIXME revisit this decision of default
// }
// def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = {
// // add actor to cluster registry (if not already added)
// if (!cluster.isClustered(address)) //WARNING!!!! Racy
// cluster.store(address, factory, replicas.factor, replicationScheme, false, serializer)
// // remote node (not home node), check out as ClusterActorRef
// cluster.ref(address, DeploymentConfig.routerTypeFor(router), DeploymentConfig.failureDetectorTypeFor(failureDetector))
// }
// replication match {
// case _: Transient | Transient
// storeActorAndGetClusterRef(Transient, serializer)
// case replication: Replication
// if (DeploymentConfig.routerTypeFor(router) != akka.routing.RouterType.Direct) throw new ConfigurationException(
// "Can't replicate an actor [" + address + "] configured with another router than \"direct\" - found [" + router + "]")
// if (isHomeNode) { // stateful actor's home node
// cluster.use(address, serializer)
// .getOrElse(throw new ConfigurationException(
// "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
// } else {
// storeActorAndGetClusterRef(replication, serializer)
// }
// }
// case invalid throw new IllegalActorStateException(
// "Could not create actor with address [" + address + "], not bound to a valid deployment scheme [" + invalid + "]")
// }
// }
// def findActorRef(address: String): Option[ActorRef]
// }

View file

@ -43,7 +43,7 @@ class RemoteActorRefProvider extends ActorRefProvider {
val oldFuture = actors.putIfAbsent(address, newFuture)
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
val actor =
val actor = try {
Deployer.lookupDeploymentFor(address) match {
case Some(Deploy(_, _, router, _, RemoteScope(host, port)))
// FIXME create RoutedActorRef if 'router' is specified
@ -60,13 +60,19 @@ class RemoteActorRefProvider extends ActorRefProvider {
}
case deploy None // non-remote actor
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
newFuture.completeWithResult(actor)
actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.result.getOrElse(None)
oldFuture.await.resultOrException.getOrElse(None)
}
}