Changed the RemoteClientLifeCycleEvent to carry a reference to the RemoteClient + dito for RemoteClientException
This commit is contained in:
parent
eeff076a3d
commit
1a4601d0f6
1 changed files with 21 additions and 22 deletions
|
|
@ -46,20 +46,19 @@ object RemoteRequestProtocolIdFactory {
|
|||
*/
|
||||
sealed trait RemoteClientLifeCycleEvent
|
||||
case class RemoteClientError(
|
||||
@BeanProperty val cause: Throwable,
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
@BeanProperty val cause: Throwable,
|
||||
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientDisconnected(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientConnected(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
|
||||
case class RemoteClientStopped(
|
||||
@BeanProperty val host: String,
|
||||
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
|
||||
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
|
||||
|
||||
class RemoteClientException private[akka](message: String) extends AkkaException(message)
|
||||
/**
|
||||
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
|
||||
*/
|
||||
class RemoteClientException private[akka](message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
|
||||
|
|
@ -211,7 +210,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) {
|
||||
foreachListener(l => l ! RemoteClientError(connection.getCause, hostname, port))
|
||||
foreachListener(l => l ! RemoteClientError(connection.getCause, this))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
isRunning = true
|
||||
|
|
@ -226,7 +225,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
log.info("%s has been shut down", name)
|
||||
foreachListener(l => l ! RemoteClientStopped(hostname, port))
|
||||
foreachListener(l => l ! RemoteClientStopped(this))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -254,8 +253,8 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade
|
|||
}
|
||||
}
|
||||
} else {
|
||||
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
|
||||
foreachListener(l => l ! RemoteClientError(exception, hostname, port))
|
||||
val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
|
||||
foreachListener(l => l ! RemoteClientError(exception, this))
|
||||
throw exception
|
||||
}
|
||||
|
||||
|
|
@ -370,13 +369,13 @@ class RemoteClientHandler(
|
|||
}
|
||||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result)
|
||||
client.foreachListener(l => l ! RemoteClientError(exception, client.hostname, client.port))
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
|
||||
client.foreachListener(l => l ! RemoteClientError(exception, client))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.foreachListener(l => l ! RemoteClientError(e, client.hostname, client.port))
|
||||
client.foreachListener(l => l ! RemoteClientError(e, client))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -393,7 +392,7 @@ class RemoteClientHandler(
|
|||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
if (!client.connection.isSuccess) {
|
||||
client.foreachListener(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client.hostname, client.port))
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
|
|
@ -403,7 +402,7 @@ class RemoteClientHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.foreachListener(l => l ! RemoteClientConnected(client.hostname, client.port))
|
||||
client.foreachListener(l => l ! RemoteClientConnected(client))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
|
@ -413,19 +412,19 @@ class RemoteClientHandler(
|
|||
sslHandler.handshake.addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isSuccess) connect
|
||||
else throw new RemoteClientException("Could not establish SSL handshake")
|
||||
else throw new RemoteClientException("Could not establish SSL handshake", client)
|
||||
}
|
||||
})
|
||||
} else connect
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientDisconnected(client.hostname, client.port))
|
||||
client.foreachListener(l => l ! RemoteClientDisconnected(client))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientError(event.getCause, client.hostname, client.port))
|
||||
client.foreachListener(l => l ! RemoteClientError(event.getCause, client))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue