Improved RemoteClient listener info
This commit is contained in:
parent
40a90af1d0
commit
06296b2a4f
3 changed files with 16 additions and 13 deletions
|
|
@ -119,7 +119,7 @@ trait ActorRef extends TransactionManagement {
|
|||
* <p/>
|
||||
* Identifier for actor, does not have to be a unique one. Default is the 'uuid'.
|
||||
* <p/>
|
||||
* This field is used for logging, AspectRegistry.actorsFor, identifier for remote
|
||||
* This field is used for logging, AspectRegistry.actorsFor(id), identifier for remote
|
||||
* actor in RemoteServer etc.But also as the identifier for persistence, which means
|
||||
* that you can use a custom name to be able to retrieve the "correct" persisted state
|
||||
* upon restart, remote restart etc.
|
||||
|
|
@ -208,8 +208,8 @@ trait ActorRef extends TransactionManagement {
|
|||
|
||||
protected[akka] var _sender: Option[ActorRef] = None
|
||||
protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None
|
||||
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s}
|
||||
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf}
|
||||
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
|
||||
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
|
||||
|
||||
/**
|
||||
* The reference sender Actor of the last received message.
|
||||
|
|
|
|||
|
|
@ -38,8 +38,11 @@ object RemoteRequestProtocolIdFactory {
|
|||
def nextId: Long = id.getAndIncrement + nodeId
|
||||
}
|
||||
|
||||
/**
|
||||
* Life-cycle events for RemoteClient.
|
||||
*/
|
||||
sealed trait RemoteClientLifeCycleEvent
|
||||
case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientError(cause: Throwable, host: String, port: Int) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent
|
||||
|
||||
|
|
@ -186,7 +189,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
|
|||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) {
|
||||
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause))
|
||||
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(connection.getCause, hostname, port))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
isRunning = true
|
||||
|
|
@ -222,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, loader: O
|
|||
}
|
||||
} else {
|
||||
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
|
||||
listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, hostname, port))
|
||||
throw exception
|
||||
}
|
||||
|
||||
|
|
@ -311,12 +314,12 @@ class RemoteClientHandler(val name: String,
|
|||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception))
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(exception, client.hostname, client.port))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e, client.hostname, client.port))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -331,7 +334,7 @@ class RemoteClientHandler(val name: String,
|
|||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
if (!client.connection.isSuccess) {
|
||||
client.listeners.toArray.foreach(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause))
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
|
|
@ -351,7 +354,7 @@ class RemoteClientHandler(val name: String,
|
|||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause))
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(event.getCause, client.hostname, client.port))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
|
||||
// Therefore, if repositories are defined, this must happen as def, not as val.
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here!
|
||||
val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString
|
||||
val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
|
||||
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo)
|
||||
|
|
@ -365,8 +365,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
|
|||
def removeDupEntries(paths: PathFinder) =
|
||||
Path.lazyPathFinder {
|
||||
val mapped = paths.get map { p => (p.relativePath, p) }
|
||||
(Map() ++ mapped).values.toList
|
||||
}
|
||||
(Map() ++ mapped).values.toList
|
||||
}
|
||||
|
||||
def allArtifacts = {
|
||||
Path.fromFile(buildScalaInstance.libraryJar) +++
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue