Removed logging

This commit is contained in:
Jonas Bonér 2011-02-28 22:54:32 +01:00
parent 359a63c2fb
commit 9354f0790c
51 changed files with 154 additions and 556 deletions

View file

@ -38,7 +38,7 @@ import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging =>
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement =>
private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
@ -142,7 +142,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
*/
abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) extends Logging {
val remoteAddress: InetSocketAddress) {
val name = this.getClass.getSimpleName + "@" + remoteAddress.getHostName + "::" + remoteAddress.getPort
@ -194,7 +194,6 @@ abstract class RemoteClient private[akka] (
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
log.slf4j.debug("sending message: {} has future {}", request, senderFuture)
if (isRunning) {
if (request.getOneWay) {
currentChannel.write(request).addListener(new ChannelFutureListener {
@ -272,22 +271,17 @@ class ActiveRemoteClient private[akka] (
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
log.slf4j.info("Starting remote client connection to [{}]", remoteAddress)
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Remote client connection to [{}] has failed", remoteAddress)
log.slf4j.debug("Remote client connection failed", connection.getCause)
false
} else {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
if(isRunning) {
log.slf4j.debug("Reaping expired futures awaiting completion from [{}]", remoteAddress)
val i = futures.entrySet.iterator
while(i.hasNext) {
val e = i.next
@ -304,15 +298,12 @@ class ActiveRemoteClient private[akka] (
case true => true
case false if reconnectIfAlreadyConnected =>
isAuthenticated.set(false)
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
openChannels.remove(connection.getChannel)
connection.getChannel.close
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", connection.getCause)
false
} else true
case false => false
@ -320,7 +311,6 @@ class ActiveRemoteClient private[akka] (
}
def shutdown = runSwitch switchOff {
log.slf4j.info("Shutting down {}", name)
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop
timer = null
@ -329,7 +319,6 @@ class ActiveRemoteClient private[akka] (
bootstrap.releaseExternalResources
bootstrap = null
connection = null
log.slf4j.info("{} has been shut down", name)
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -339,7 +328,6 @@ class ActiveRemoteClient private[akka] (
} else {
val timeLeft = RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)
if (timeLeft > 0) {
log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
true
} else false
}
@ -399,12 +387,11 @@ class ActiveRemoteClientHandler(
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer,
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler with Logging {
extends SimpleChannelUpstreamHandler {
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.slf4j.debug(event.toString)
}
super.handleUpstream(ctx, event)
}
@ -414,8 +401,6 @@ class ActiveRemoteClientHandler(
event.getMessage match {
case reply: RemoteMessageProtocol =>
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
log.slf4j.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
log.slf4j.debug("Trying to map back to future: {}",replyUuid)
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) {
@ -444,7 +429,6 @@ class ActiveRemoteClientHandler(
} catch {
case e: Exception =>
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
log.slf4j.error("Unexpected exception in remote client handler", e)
throw e
}
}
@ -465,7 +449,6 @@ class ActiveRemoteClientHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow
}
@ -482,16 +465,11 @@ class ActiveRemoteClientHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress))
if (event.getCause ne null)
log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause)
else
log.slf4j.error("Unexpected exception from downstream in remote client: {}", event)
if (event.getCause ne null) event.getCause.printStackTrace
event.getChannel.close
}
@ -506,8 +484,6 @@ class ActiveRemoteClientHandler(
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem =>
log.debug("Couldn't parse exception returned from RemoteServer",problem)
log.warn("Couldn't create instance of {} with message {}, returning UnparsableException",classname, exception.getMessage)
UnparsableException(classname, exception.getMessage)
}
}
@ -579,7 +555,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => serverModule.log.slf4j.warn("Could not close remote server channel in a graceful way")
case e => {}
}
}
}
@ -607,12 +583,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
log.slf4j.debug("Starting up remote server on {}:{}",_hostname, _port)
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
case e =>
log.slf4j.error("Could not start up remote server", e)
notifyListeners(RemoteServerError(e, this))
}
this
@ -622,7 +596,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
_isRunning switchOff {
currentServer.getAndSet(None) foreach {
instance =>
log.slf4j.debug("Shutting down remote server on {}:{}",instance.host, instance.port)
instance.shutdown
}
}
@ -634,7 +607,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* @param typedActor typed actor to register
*/
def registerTypedActor(id: String, typedActor: AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote typed actor [{}] with id [{}]", typedActor.getClass.getName, id)
if (id.startsWith(UUID_PREFIX)) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid)
else registerTypedActor(id, typedActor, typedActors)
}
@ -645,7 +617,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* @param typedActor typed actor to register
*/
def registerTypedPerSessionActor(id: String, factory: => AnyRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
}
@ -655,13 +626,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
log.slf4j.debug("Registering server side remote actor [{}] with id [{}]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
else register(id, actorRef, actors)
}
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
log.slf4j.debug("Registering remote actor {} to it's uuid {}", actorRef, actorRef.uuid)
register(actorRef.uuid.toString, actorRef, actorsByUuid)
}
@ -678,7 +647,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(id: String, factory: => ActorRef): Unit = synchronized {
log.slf4j.debug("Registering server side remote session actor with id [{}]", id)
registerPerSession(id, () => factory, actorsFactories)
}
@ -702,7 +670,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
actors.remove(actorRef.id, actorRef)
actorsByUuid.remove(actorRef.uuid, actorRef)
}
@ -715,7 +682,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
val actorRef = actors get id
@ -732,7 +698,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/
def unregisterPerSession(id: String): Unit = {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote session actor with id [{}]", id)
actorsFactories.remove(id)
}
}
@ -744,7 +709,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
*/
def unregisterTypedActor(id: String):Unit = guard withGuard {
if (_isRunning.isOn) {
log.slf4j.info("Unregistering server side remote typed actor with id [{}]", id)
if (id.startsWith(UUID_PREFIX)) typedActorsByUuid.remove(id.substring(UUID_PREFIX.length))
else typedActors.remove(id)
}
@ -818,7 +782,7 @@ class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging {
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
val CHANNEL_INIT = "channel-init".intern
@ -855,7 +819,6 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
log.slf4j.debug("Remote client [{}] connected to [{}]", clientAddress, server.name)
if (SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
// Begin handshake.
@ -876,17 +839,16 @@ class RemoteServerHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
import scala.collection.JavaConversions.asScalaIterable
val clientAddress = getClientAddress(ctx)
log.slf4j.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name)
// stop all session actors
for (map <- Option(sessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) {
try { actor.stop } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
try { actor.stop } catch { case e: Exception => }
}
// stop all typed session actors
for (map <- Option(typedSessionActors.remove(event.getChannel));
actor <- asScalaIterable(map.values)) {
try { TypedActor.stop(actor) } catch { case e: Exception => log.slf4j.warn("Couldn't stop {}",actor,e) }
try { TypedActor.stop(actor) } catch { case e: Exception => }
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
@ -894,13 +856,11 @@ class RemoteServerHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
log.slf4j.debug("Remote client [{}] channel closed from [{}]", clientAddress, server.name)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.slf4j.debug(event.toString)
}
super.handleUpstream(ctx, event)
}
@ -914,7 +874,6 @@ class RemoteServerHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
log.slf4j.error("Unexpected exception from remote downstream", event.getCause)
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))
}
@ -926,7 +885,6 @@ class RemoteServerHandler(
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
log.slf4j.debug("Received RemoteMessageProtocol[\n{}]",request)
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
@ -937,7 +895,6 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
log.slf4j.debug("Dispatching to remote actor [{}:{}]", actorInfo.getTarget, actorInfo.getUuid)
val actorRef =
try { createActor(actorInfo, channel).start } catch {
@ -967,16 +924,13 @@ class RemoteServerHandler(
None,
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
onComplete(f => {
log.slf4j.debug("Future was completed, now flushing to remote!")
val result = f.result
val exception = f.exception
if (exception.isDefined) {
log.slf4j.debug("Returning exception from actor invocation [{}]",exception.get.getClass)
write(channel, createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
}
else if (result.isDefined) {
log.slf4j.debug("Returning result from actor invocation [{}]",result.get)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
@ -1004,7 +958,6 @@ class RemoteServerHandler(
private def dispatchToTypedActor(request: RemoteMessageProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo
val typedActorInfo = actorInfo.getTypedActorInfo
log.slf4j.debug("Dispatching to remote typed actor [{} :: {}]", typedActorInfo.getMethod, typedActorInfo.getInterface)
val typedActor = createTypedActor(actorInfo, channel)
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
@ -1031,7 +984,6 @@ class RemoteServerHandler(
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, messageBuilder.build)
log.slf4j.debug("Returning result from remote typed actor invocation [{}]", result)
} catch {
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
}
@ -1100,7 +1052,6 @@ class RemoteServerHandler(
if (UNTRUSTED_MODE) throw new SecurityException(
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
log.slf4j.info("Creating a new client-managed remote actor [{}:{}]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
@ -1111,7 +1062,6 @@ class RemoteServerHandler(
actorRef
} catch {
case e =>
log.slf4j.error("Could not create remote actor instance", e)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
@ -1167,8 +1117,6 @@ class RemoteServerHandler(
if (UNTRUSTED_MODE) throw new SecurityException(
"Remote server is operating is untrusted mode, can not create remote actors on behalf of the remote client")
log.slf4j.info("Creating a new remote typed actor:\n\t[{} :: {}]", interfaceClassname, targetClassname)
val (interfaceClass, targetClass) =
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
applicationLoader.get.loadClass(targetClassname))
@ -1180,7 +1128,6 @@ class RemoteServerHandler(
newInstance
} catch {
case e =>
log.slf4j.error("Could not create remote typed actor instance", e)
server.notifyListeners(RemoteServerError(e, server))
throw e
}
@ -1201,8 +1148,6 @@ class RemoteServerHandler(
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
val actorInfo = request.getActorInfo
log.slf4j.error("Could not invoke remote actor [{}]", actorInfo.getTarget)
log.slf4j.debug("Could not invoke remote actor", exception)
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None,
Right(request.getUuid),
@ -1230,7 +1175,6 @@ class RemoteServerHandler(
"The remote client [" + clientAddress + "] does not have a secure cookie.")
if (!(request.getCookie == SECURE_COOKIE.get)) throw new SecurityException(
"The remote client [" + clientAddress + "] secure cookie is not the same as remote server secure cookie")
log.slf4j.info("Remote client [{}] successfully authenticated using secure cookie", clientAddress)
}
}