From c0c60478df5ce867bd30af06136866e3895fa77a Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Thu, 21 Jul 2011 16:57:58 +0300 Subject: [PATCH 01/10] ticket 917 --- .../main/scala/akka/event/EventHandler.scala | 41 +-- .../remote/BootableRemoteActorService.scala | 6 + .../remote/netty/NettyRemoteSupport.scala | 299 +++++++++++------- .../serialization/SerializationProtocol.scala | 14 +- 4 files changed, 231 insertions(+), 129 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 0a0e00e2cc..4b1fc2e1aa 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.dispatch.Dispatchers import akka.config.Config._ import akka.config.ConfigurationException -import akka.util.{ ListenerManagement, ReflectiveAccess } +import akka.util.{ListenerManagement, ReflectiveAccess} import akka.serialization._ import akka.AkkaException @@ -26,7 +26,7 @@ import akka.AkkaException * case EventHandler.Info(instance, message) ⇒ ... * case EventHandler.Debug(instance, message) ⇒ ... * case genericEvent ⇒ ... - * } + * } * }) * * EventHandler.addListener(eventHandlerListener) @@ -95,10 +95,10 @@ object EventHandler extends ListenerManagement { @volatile var level: Int = config.getString("akka.event-handler-level", "INFO") match { - case "ERROR" ⇒ ErrorLevel + case "ERROR" ⇒ ErrorLevel case "WARNING" ⇒ WarningLevel - case "INFO" ⇒ InfoLevel - case "DEBUG" ⇒ DebugLevel + case "INFO" ⇒ InfoLevel + case "DEBUG" ⇒ DebugLevel case unknown ⇒ throw new ConfigurationException( "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") } @@ -106,21 +106,22 @@ object EventHandler extends ListenerManagement { def start() { try { val defaultListeners = config.getList("akka.event-handlers") match { - case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil + case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil case listeners ⇒ listeners } - defaultListeners foreach { listenerName ⇒ - try { - ReflectiveAccess.getClassFor[Actor](listenerName) match { - case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) - case Left(exception) ⇒ throw exception + defaultListeners foreach { + listenerName ⇒ + try { + ReflectiveAccess.getClassFor[Actor](listenerName) match { + case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + listenerName + + "] due to [" + e.toString + "]", e) } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]", e) - } } info(this, "Starting up EventHandler") } catch { @@ -145,7 +146,7 @@ object EventHandler extends ListenerManagement { notifyListeners(event) } - def notify[T <: Event: ClassManifest](event: ⇒ T) { + def notify[T <: Event : ClassManifest](event: ⇒ T) { if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) } @@ -181,6 +182,7 @@ object EventHandler extends ListenerManagement { if (level >= InfoLevel) notifyListeners(Info(instance, message)) } + def debug(instance: AnyRef, message: ⇒ String) { if (level >= DebugLevel) notifyListeners(Debug(instance, message)) } @@ -194,7 +196,7 @@ object EventHandler extends ListenerManagement { def isDebugEnabled = level >= DebugLevel def stackTraceFor(e: Throwable) = { - import java.io.{ StringWriter, PrintWriter } + import java.io.{StringWriter, PrintWriter} val sw = new StringWriter val pw = new PrintWriter(sw) e.printStackTrace(pw) @@ -210,6 +212,7 @@ object EventHandler extends ListenerManagement { } class DefaultListener extends Actor { + import java.text.SimpleDateFormat import java.util.Date diff --git a/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala index 95492a30f5..f214e12f52 100644 --- a/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -6,6 +6,7 @@ package akka.remote import akka.actor.{ Actor, BootableActorLoaderService } import akka.util.{ ReflectiveAccess, Bootable } +import akka.event.EventHandler /** * This bundle/service is responsible for booting up and shutting down the remote actors facility. @@ -23,14 +24,19 @@ trait BootableRemoteActorService extends Bootable { abstract override def onLoad() { if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) { + EventHandler.info(this, "Initializing Remote Actors Service...") startRemoteService() + EventHandler.info(this, "Remote Actors Service initialized") } super.onLoad() } abstract override def onUnload() { + EventHandler.info(this, "Shutting down Remote Actors Service") + Actor.remote.shutdown() if (remoteServerThread.isAlive) remoteServerThread.join(1000) + EventHandler.info(this, "Remote Actors Service has been shut down") super.onUnload() } } diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a339a1b8b6..98b560e0a9 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,24 +4,22 @@ package akka.remote.netty -import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future } -import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } +import akka.dispatch.{ActorPromise, DefaultPromise, Promise} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ import akka.remoteinterface._ import akka.actor.{ - PoisonPill, - LocalActorRef, - Actor, - RemoteActorRef, - ActorRef, - IllegalActorStateException, - RemoteActorSystemMessage, - uuidFrom, - Uuid, - Death, - LifeCycleMessage +PoisonPill, +Actor, +RemoteActorRef, +ActorRef, +IllegalActorStateException, +RemoteActorSystemMessage, +uuidFrom, +Uuid, +LifeCycleMessage } import akka.actor.Actor._ import akka.config.Config @@ -30,23 +28,22 @@ import akka.util._ import akka.event.EventHandler import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } +import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup, ChannelGroupFuture} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } -import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } -import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } -import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } -import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } +import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap} +import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} +import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} +import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} +import org.jboss.netty.handler.timeout.{ReadTimeoutHandler, ReadTimeoutException} +import org.jboss.netty.handler.execution.{OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler} +import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ import java.net.InetSocketAddress -import java.lang.reflect.InvocationTargetException -import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } +import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import java.util.concurrent._ import akka.AkkaException @@ -66,7 +63,8 @@ object RemoteEncoder { } } -trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement ⇒ +trait NettyRemoteClientModule extends RemoteClientModule { + self: ListenerManagement ⇒ private val remoteClients = new HashMap[Address, RemoteClient] private val remoteActors = new Index[Address, Uuid] private val lock = new ReadWriteGuard @@ -82,7 +80,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)) private[akka] def withClientFor[T]( - address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = { + address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = { // loader.foreach(MessageSerializer.setClassLoader(_)) val key = Address(address) lock.readLock.lock @@ -94,7 +92,8 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem lock.writeLock.lock //Lock upgrade, not supported natively try { try { - remoteClients.get(key) match { //Recheck for addition, race between upgrades + remoteClients.get(key) match { + //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) @@ -102,24 +101,30 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem remoteClients += key -> client client } - } finally { lock.readLock.lock } //downgrade - } finally { lock.writeLock.unlock } + } finally { + lock.readLock.lock + } //downgrade + } finally { + lock.writeLock.unlock + } } fun(c) - } finally { lock.readLock.unlock } + } finally { + lock.readLock.unlock + } } def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { remoteClients.remove(Address(address)) match { case Some(client) ⇒ client.shutdown() - case None ⇒ false + case None ⇒ false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { remoteClients.get(Address(address)) match { case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) - case None ⇒ false + case None ⇒ false } } @@ -133,7 +138,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } def shutdownRemoteClients() = lock withWriteGuard { - remoteClients.foreach({ case (addr, client) ⇒ client.shutdown() }) + remoteClients.foreach({ + case (addr, client) ⇒ client.shutdown() + }) remoteClients.clear() } } @@ -143,9 +150,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that * reuses an already established connection. */ -abstract class RemoteClient private[akka] ( - val module: NettyRemoteClientModule, - val remoteAddress: InetSocketAddress) { +abstract class RemoteClient private[akka]( + val module: NettyRemoteClientModule, + val remoteAddress: InetSocketAddress) { val useTransactionLog = config.getBool("akka.cluster.client.buffering.retry-message-send-on-failure", true) val transactionLogCapacity = config.getInt("akka.cluster.client.buffering.capacity", -1) @@ -189,13 +196,13 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef): Option[Promise[T]] = + message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[Promise[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef): Option[Promise[T]] = send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build, senderFuture) @@ -204,8 +211,8 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[Promise[T]]): Option[Promise[T]] = { + request: RemoteMessageProtocol, + senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) @@ -266,20 +273,23 @@ abstract class RemoteClient private[akka] ( } } - private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log + private[remote] def sendPendingRequests() = pendingRequests synchronized { + // ensure only one thread at a time can flush the log val nrOfMessages = pendingRequests.size if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) var pendingRequest = pendingRequests.peek while (pendingRequest ne null) { val (isOneWay, futureUuid, message) = pendingRequest - if (isOneWay) { // sendOneWay + if (isOneWay) { + // sendOneWay val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() if (!future.isCancelled && !future.isSuccess) { notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) throw future.getCause } - } else { // sendRequestReply + } else { + // sendRequestReply val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() if (future.isCancelled) futures.remove(futureUuid) // Clean up future @@ -300,9 +310,10 @@ abstract class RemoteClient private[akka] ( * * @author Jonas Bonér */ -class ActiveRemoteClient private[akka] ( - module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, - val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) { +class ActiveRemoteClient private[akka]( + module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, + val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) { + import RemoteClientSettings._ //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @@ -318,6 +329,7 @@ class ActiveRemoteClient private[akka] ( private var reconnectionTimeWindowStart = 0L def notifyListeners(msg: ⇒ Any): Unit = notifyListenersFun(msg) + def currentChannel = connection.getChannel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { @@ -330,15 +342,18 @@ class ActiveRemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) + EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress)) + + // Wait until the connection attempt succeeds or fails. connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) + EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this) false } else { - //Send cookie val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SECURE_COOKIE.nonEmpty) @@ -365,12 +380,16 @@ class ActiveRemoteClient private[akka] ( } match { case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ + EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + openChannels.remove(connection.getChannel) connection.getChannel.close connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) + EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress),this) + false } else { //Send cookie @@ -387,6 +406,8 @@ class ActiveRemoteClient private[akka] ( //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { + EventHandler.info(this, "Shutting down [%s]".format(name)) + notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() timer = null @@ -396,6 +417,8 @@ class ActiveRemoteClient private[akka] ( bootstrap = null connection = null pendingRequests.clear() + + EventHandler.info(this, "[%s] has been shut down".format(name)) } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -403,7 +426,11 @@ class ActiveRemoteClient private[akka] ( reconnectionTimeWindowStart = System.currentTimeMillis true } else { - /*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 + val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 + if (timeLeft) { + EventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft)) + } + timeLeft } } @@ -414,12 +441,12 @@ class ActiveRemoteClient private[akka] ( * @author Jonas Bonér */ class ActiveRemoteClientPipelineFactory( - name: String, - futures: ConcurrentMap[Uuid, Promise[_]], - bootstrap: ClientBootstrap, - remoteAddress: InetSocketAddress, - timer: HashedWheelTimer, - client: ActiveRemoteClient) extends ChannelPipelineFactory { + name: String, + futures: ConcurrentMap[Uuid, Promise[_]], + bootstrap: ClientBootstrap, + remoteAddress: InetSocketAddress, + timer: HashedWheelTimer, + client: ActiveRemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit) @@ -429,7 +456,7 @@ class ActiveRemoteClientPipelineFactory( val protobufEnc = new ProtobufEncoder val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { case "zlib" ⇒ (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) - case _ ⇒ (Nil, Nil) + case _ ⇒ (Nil, Nil) } val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) @@ -443,12 +470,12 @@ class ActiveRemoteClientPipelineFactory( */ @ChannelHandler.Sharable class ActiveRemoteClientHandler( - val name: String, - val futures: ConcurrentMap[Uuid, Promise[_]], - val bootstrap: ClientBootstrap, - val remoteAddress: InetSocketAddress, - val timer: HashedWheelTimer, - val client: ActiveRemoteClient) + val name: String, + val futures: ConcurrentMap[Uuid, Promise[_]], + val bootstrap: ClientBootstrap, + val remoteAddress: InetSocketAddress, + val timer: HashedWheelTimer, + val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { @@ -457,11 +484,17 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ val rcp = arp.getInstruction rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ spawn { client.module.shutdownClientConnection(remoteAddress) } + case CommandType.SHUTDOWN ⇒ spawn { + client.module.shutdownClientConnection(remoteAddress) + } } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) + + EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply)) + EventHandler.debug(this, "Trying to map back to future: %s".format(replyUuid)) + val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]] if (reply.hasMessage) { @@ -492,13 +525,16 @@ class ActiveRemoteClientHandler( } } }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) - } else spawn { client.module.shutdownClientConnection(remoteAddress) } + } else spawn { + client.module.shutdownClientConnection(remoteAddress) + } } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) + EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) client.resetReconnectionTimeWindow } catch { case e: Throwable ⇒ @@ -510,12 +546,20 @@ class ActiveRemoteClientHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) + EventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress)) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + if (event.getCause ne null) + EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this) + else + EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event)) + event.getCause match { case e: ReadTimeoutException ⇒ - spawn { client.module.shutdownClientConnection(remoteAddress) } + spawn { + client.module.shutdownClientConnection(remoteAddress) + } case e ⇒ client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) event.getChannel.close //FIXME Is this the correct behavior? @@ -550,17 +594,18 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with def optimizeLocalScoped_?() = optimizeLocal.get protected[akka] def actorFor( - actorAddress: String, - timeout: Long, - host: String, - port: Int, - loader: Option[ClassLoader]): ActorRef = { + actorAddress: String, + timeout: Long, + host: String, + port: Int, + loader: Option[ClassLoader]): ActorRef = { val homeInetSocketAddress = this.address if (optimizeLocalScoped_?) { if ((host == homeInetSocketAddress.getAddress.getHostAddress || host == homeInetSocketAddress.getHostName) && - port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals? + port == homeInetSocketAddress.getPort) { + //TODO: switch to InetSocketAddress.equals? val localRef = findActorByAddressOrUuid(actorAddress, actorAddress) if (localRef ne null) return localRef //Code significantly simpler with the return statement } @@ -575,7 +620,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with } class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { + import RemoteServerSettings._ + val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) @@ -626,14 +673,14 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, } } -trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule ⇒ - import RemoteServerSettings._ +trait NettyRemoteServerModule extends RemoteServerModule { + self: RemoteModule ⇒ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { case Some(server) ⇒ server.address - case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress + case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } def name = currentServer.get match { @@ -650,6 +697,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { + EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) + currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) } } catch { @@ -662,8 +711,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule def shutdownServerModule() = guard withGuard { _isRunning switchOff { - currentServer.getAndSet(None) foreach { instance ⇒ - instance.shutdown() + currentServer.getAndSet(None) foreach { + instance ⇒ + EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port)) + + instance.shutdown() } } } @@ -707,7 +759,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID). */ def unregister(actorRef: ActorRef): Unit = guard withGuard { + if (_isRunning.isOn) { + EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid)) + actors.remove(actorRef.address, actorRef) actorsByUuid.remove(actorRef.uuid.toString, actorRef) } @@ -719,7 +774,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * NOTE: You need to call this method if you have registered an actor by a custom ID. */ def unregister(id: String): Unit = guard withGuard { + if (_isRunning.isOn) { + EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id)) + if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) else { val actorRef = actors get id @@ -735,7 +793,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * NOTE: You need to call this method if you have registered an actor by a custom ID. */ def unregisterPerSession(id: String): Unit = { + if (_isRunning.isOn) { + EventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id)) + actorsFactories.remove(id) } } @@ -745,11 +806,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * @author Jonas Bonér */ class RemoteServerPipelineFactory( - val name: String, - val openChannels: ChannelGroup, - val executor: ExecutionHandler, - val loader: Option[ClassLoader], - val server: NettyRemoteServerModule) extends ChannelPipelineFactory { + val name: String, + val openChannels: ChannelGroup, + val executor: ExecutionHandler, + val loader: Option[ClassLoader], + val server: NettyRemoteServerModule) extends ChannelPipelineFactory { + import RemoteServerSettings._ def getPipeline: ChannelPipeline = { @@ -759,7 +821,7 @@ class RemoteServerPipelineFactory( val protobufEnc = new ProtobufEncoder val (enc, dec) = COMPRESSION_SCHEME match { case "zlib" ⇒ (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) - case _ ⇒ (Nil, Nil) + case _ ⇒ (Nil, Nil) } val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) @@ -799,10 +861,11 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si */ @ChannelHandler.Sharable class RemoteServerHandler( - val name: String, - val openChannels: ChannelGroup, - val applicationLoader: Option[ClassLoader], - val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { + val name: String, + val openChannels: ChannelGroup, + val applicationLoader: Option[ClassLoader], + val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { + import RemoteServerSettings._ // applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY @@ -819,7 +882,7 @@ class RemoteServerHandler( } else if (!future.isSuccess) { val socketAddress = future.getChannel.getRemoteAddress match { case i: InetSocketAddress ⇒ Some(i) - case _ ⇒ None + case _ ⇒ None } server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress)) } @@ -835,6 +898,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug(this,"Remote client [%s] connected to [%s]".format(clientAddress, server.name)) + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) } @@ -842,12 +907,18 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) + // stop all session actors for ( map ← Option(sessionActors.remove(event.getChannel)); actor ← collectionAsScalaIterable(map.values) ) { - try { actor ! PoisonPill } catch { case e: Exception ⇒ } + try { + actor ! PoisonPill + } catch { + case e: Exception ⇒ EventHandler.error(e, "Couldn't stop %s".format(actor),this) + } } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -855,6 +926,8 @@ class RemoteServerHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name),this) + server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } @@ -870,6 +943,8 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this) + event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) } @@ -877,7 +952,7 @@ class RemoteServerHandler( private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = ctx.getChannel.getRemoteAddress match { case inet: InetSocketAddress ⇒ Some(inet) - case _ ⇒ None + case _ ⇒ None } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { @@ -891,8 +966,13 @@ class RemoteServerHandler( private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo + + EventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid)) + val actorRef = - try { createActor(actorInfo, channel) } catch { + try { + createActor(actorInfo, channel) + } catch { case e: SecurityException ⇒ EventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e, request)) @@ -905,7 +985,8 @@ class RemoteServerHandler( if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None - message match { // first match on system messages + message match { + // first match on system messages case RemoteActorSystemMessage.Stop ⇒ if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop() @@ -920,22 +1001,22 @@ class RemoteServerHandler( request.getActorInfo.getTimeout, new ActorPromise(request.getActorInfo.getTimeout). onComplete(_.value.get match { - case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) - case r: Right[_,_] ⇒ - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - Some(actorRef), - Right(request.getUuid), - actorInfo.getAddress, - actorInfo.getTimeout, - r.asInstanceOf[Either[Throwable,Any]], - isOneWay = true, - Some(actorRef)) + case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) + case r: Right[_, _] ⇒ + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(actorRef), + Right(request.getUuid), + actorInfo.getAddress, + actorInfo.getTimeout, + r.asInstanceOf[Either[Throwable, Any]], + isOneWay = true, + Some(actorRef)) - // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - write(channel, RemoteEncoder.encode(messageBuilder.build)) - })) + write(channel, RemoteEncoder.encode(messageBuilder.build)) + })) } } @@ -985,7 +1066,7 @@ class RemoteServerHandler( private def findSessionActor(id: String, channel: Channel): ActorRef = sessionActors.get(channel) match { case null ⇒ null - case map ⇒ map get id + case map ⇒ map get id } private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index d8b1293bc6..810f813efb 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -21,6 +21,7 @@ import java.net.InetSocketAddress import com.google.protobuf.ByteString import com.eaio.uuid.UUID +import akka.event.EventHandler /** * Module for local actor serialization. @@ -142,6 +143,8 @@ object ActorSerialization { overriddenUuid: Option[UUID], loader: Option[ClassLoader]): ActorRef = { + EventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) + val lifeCycle = if (protocol.hasLifeCycle) { protocol.getLifeCycle.getLifeCycle match { @@ -243,11 +246,17 @@ object RemoteActorSerialization { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - RemoteActorRef( + EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) + + val ref = RemoteActorRef( JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], protocol.getAddress, protocol.getTimeout, loader) + + EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) + + ref } /** @@ -263,6 +272,9 @@ object RemoteActorSerialization { case _ ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } + + EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) + RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setAddress(actor.address) From 1006fa61db50a9ad56408d5ee152d77747d4083c Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Fri, 22 Jul 2011 08:18:34 +0300 Subject: [PATCH 02/10] ticket 1043 --- akka-docs/scala/fault-tolerance.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index b610bff96f..507c3a3b88 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -228,11 +228,11 @@ A child actor can tell the supervising actor to unlink him by sending him the 'U .. code-block:: scala - if (supervisor.isDefined) supervisor.get ! Unlink(this) + if (supervisor.isDefined) supervisor.get ! Unlink(self) // Or shorter using 'foreach': - supervisor.foreach(_ ! Unlink(this)) + supervisor.foreach(_ ! Unlink(self)) The supervising actor's side of things ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ From cfa8856fcd663edea16b5a189f6a050b2802a867 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jul 2011 14:06:39 +0200 Subject: [PATCH 03/10] Ticket 981: Adjusted how report files are stored and result logged --- .../common/BenchResultRepository.scala | 43 ++++++++++++++---- .../performance/trading/common/Report.scala | 44 +++++-------------- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala index 2f9ea89dd8..c14bb62f18 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala @@ -9,10 +9,10 @@ import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.text.SimpleDateFormat import java.util.Date - import scala.collection.mutable.{ Map ⇒ MutableMap } - import akka.event.EventHandler +import java.io.PrintWriter +import java.io.FileWriter trait BenchResultRepository { def add(stats: Stats) @@ -23,6 +23,10 @@ trait BenchResultRepository { def getWithHistorical(name: String, load: Int): Seq[Stats] + def saveHtmlReport(content: String, name: String): Unit + + def htmlReportUrl(name: String): String + } object BenchResultRepository { @@ -34,8 +38,10 @@ class FileBenchResultRepository extends BenchResultRepository { private val statsByName = MutableMap[String, Seq[Stats]]() private val baselineStats = MutableMap[Key, Stats]() private val historicalStats = MutableMap[Key, Seq[Stats]]() - private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") - private def dirExists: Boolean = new File(dir).exists + private val serDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/ser" + private def serDirExists: Boolean = new File(serDir).exists + private val htmlDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/html" + private def htmlDirExists: Boolean = new File(htmlDir).exists protected val maxHistorical = 7 case class Key(name: String, load: Int) @@ -64,10 +70,10 @@ class FileBenchResultRepository extends BenchResultRepository { } private def loadFiles() { - if (dirExists) { + if (serDirExists) { val files = for { - f ← new File(dir).listFiles + f ← new File(serDir).listFiles if f.isFile if f.getName.endsWith(".ser") } yield f @@ -86,11 +92,11 @@ class FileBenchResultRepository extends BenchResultRepository { } private def save(stats: Stats) { - new File(dir).mkdirs - if (!dirExists) return + new File(serDir).mkdirs + if (!serDirExists) return val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" - val f = new File(dir, name) + val f = new File(serDir, name) var out: ObjectOutputStream = null try { out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f))) @@ -127,5 +133,24 @@ class FileBenchResultRepository extends BenchResultRepository { loadFiles() + def saveHtmlReport(content: String, fileName: String) { + new File(htmlDir).mkdirs + if (!htmlDirExists) return + val f = new File(htmlDir, fileName) + var writer: PrintWriter = null + try { + writer = new PrintWriter(new FileWriter(f)) + writer.print(content) + writer.flush() + } catch { + case e: Exception ⇒ + EventHandler.error(this, "Failed to save report to [%s], due to [%s]". + format(f.getAbsolutePath, e.getMessage)) + } finally { + if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + } + } + + def htmlReportUrl(fileName: String): String = new File(htmlDir, fileName).getAbsolutePath } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala index 9160fa631e..7ec32a5904 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala @@ -10,16 +10,13 @@ class Report( resultRepository: BenchResultRepository, compareResultWith: Option[String] = None) { - private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") - - private def dirExists: Boolean = new File(dir).exists - private def log = System.getProperty("benchmark.logResult", "false").toBoolean + private def log = System.getProperty("benchmark.logResult", "true").toBoolean val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") - def html(statistics: Seq[Stats]): Unit = if (dirExists) { + def html(statistics: Seq[Stats]): Unit = { val current = statistics.last val sb = new StringBuilder @@ -29,7 +26,8 @@ class Report( sb.append("

%s

\n".format(title)) sb.append("
\n")
-    sb.append(formatResultsTable(statistics))
+    val resultTable = formatResultsTable(statistics)
+    sb.append(resultTable)
     sb.append("\n
\n") sb.append(img(percentilesChart(current))) @@ -43,15 +41,17 @@ class Report( comparePercentilesChart(stats).foreach(url ⇒ sb.append(img(url))) } - if (dirExists) { - val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) - val name = current.name + "--" + timestamp + ".html" - write(sb.toString, name) + val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) + val reportName = current.name + "--" + timestamp + ".html" + resultRepository.saveHtmlReport(sb.toString, reportName) + + if (log) { + EventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) } } - private def img(url: String): String = { + def img(url: String): String = { """""".format( url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" } @@ -59,7 +59,6 @@ class Report( def percentilesChart(stats: Stats): String = { val chartTitle = stats.name + " Percentiles (microseconds)" val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) chartUrl } @@ -70,7 +69,6 @@ class Report( } yield { val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) chartUrl } } @@ -81,7 +79,6 @@ class Report( val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) Some(chartUrl) } else { None @@ -91,7 +88,6 @@ class Report( def latencyAndThroughputChart(stats: Stats): String = { val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) chartUrl } @@ -115,8 +111,6 @@ class Report( statsSeq.map(formatStats(_)).mkString("\n") + "\n" + line + "\n" - if (log) EventHandler.info(this, formattedStats) - formattedStats } @@ -146,22 +140,6 @@ class Report( } - def write(content: String, fileName: String) { - val f = new File(dir, fileName) - var writer: PrintWriter = null - try { - writer = new PrintWriter(new FileWriter(f)) - writer.print(content) - writer.flush() - } catch { - case e: Exception ⇒ - EventHandler.error(this, "Failed to save report to [%s], due to [%s]". - format(f.getAbsolutePath, e.getMessage)) - } finally { - if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } - } - } - def header(title: String) = """| | From fcfc0bd0777842463361ec1b1300db99cf42adc5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jul 2011 14:58:14 +0200 Subject: [PATCH 04/10] Ticket 981: Added mean to latency and througput chart --- .../trading/common/GoogleChartBuilder.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala index d7f6c965a3..98d40a266e 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala @@ -144,11 +144,11 @@ object GoogleChartBuilder { sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767") sb.append("&") sb.append("chco=") - val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900") + val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900") sb.append(seriesColors.mkString(",")) sb.append("&") // legend - sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput") + sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput") sb.append("&") sb.append("chdlp=b") @@ -160,6 +160,7 @@ object GoogleChartBuilder { sb.append("chls=1|1|1") sb.append("&") + // margins sb.append("chma=5,5,5,25") sb.append("&") @@ -181,6 +182,11 @@ object GoogleChartBuilder { } sb.append(percentileSeries.mkString("|")) + sb.append("|") + sb.append(loadStr).append("|") + val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",") + sb.append(meanSeries) + sb.append("|") val maxTps: Double = statistics.map(_.tps).max sb.append(loadStr).append("|") @@ -192,7 +198,7 @@ object GoogleChartBuilder { // y range sb.append("&") - sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,") + sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,") .append(formatDouble(maxTps)) sb.append("&") @@ -203,6 +209,9 @@ object GoogleChartBuilder { sb.append(",") } sb.append(minLoad).append(",").append(maxLoad) + sb.append(",0,").append(formatDouble(maxValue)) + sb.append(",") + sb.append(minLoad).append(",").append(maxLoad) sb.append(",0,").append(formatDouble(maxTps)) sb.append("&") From 9874f5e863895bbede31fd1ba1106991ecfc0175 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Jul 2011 15:23:41 +0200 Subject: [PATCH 05/10] Ticket 981: Added mean to percentiles and mean chart --- .../trading/common/GoogleChartBuilder.scala | 16 +++++++++---- .../performance/trading/common/Report.scala | 24 +++++++++---------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala index 98d40a266e..301227affa 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala @@ -15,9 +15,9 @@ object GoogleChartBuilder { val ChartHeight = 400 /** - * Builds a bar chart for all percentiles in the statistics. + * Builds a bar chart for all percentiles and the mean in the statistics. */ - def percentilChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { + def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: akka.performance.trading.common.Stats ⇒ String): String = { if (statistics.isEmpty) return "" val current = statistics.last @@ -38,6 +38,7 @@ object GoogleChartBuilder { sb.append("&") // labels percentileLabels(current.percentiles, sb) + sb.append("|mean") sb.append("|2:|min|mean|median") sb.append("&") // label positions @@ -63,7 +64,7 @@ object GoogleChartBuilder { // data series val maxValue = statistics.map(_.percentiles.last._2).max sb.append("chd=t:") - dataSeries(statistics.map(_.percentiles), sb) + dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb) // y range sb.append("&") @@ -98,13 +99,18 @@ object GoogleChartBuilder { sb.append(s) } - private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], sb: StringBuilder) { - val series = + private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) { + val percentileSeries = for { percentiles ← allPercentiles } yield { percentiles.values.mkString(",") } + + val series = + for ((s, m) ← percentileSeries.zip(meanValues)) + yield s + "," + formatDouble(m) + sb.append(series.mkString("|")) } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala index 7ec32a5904..5e8dc6c2a2 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala @@ -30,15 +30,15 @@ class Report( sb.append(resultTable) sb.append("\n\n") - sb.append(img(percentilesChart(current))) + sb.append(img(percentilesAndMeanChart(current))) sb.append(img(latencyAndThroughputChart(current))) for (stats ← statistics) { - compareWithHistoricalPercentiliesChart(stats).foreach(url ⇒ sb.append(img(url))) + compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) } for (stats ← statistics) { - comparePercentilesChart(stats).foreach(url ⇒ sb.append(img(url))) + comparePercentilesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) } val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) @@ -56,28 +56,28 @@ class Report( url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" } - def percentilesChart(stats: Stats): String = { - val chartTitle = stats.name + " Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") + def percentilesAndMeanChart(stats: Stats): String = { + val chartTitle = stats.name + " Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") chartUrl } - def comparePercentilesChart(stats: Stats): Seq[String] = { + def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = { for { compareName ← compareResultWith.toSeq compareStats ← resultRepository.get(compareName, stats.load) } yield { - val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) + val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name) chartUrl } } - def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = { + def compareWithHistoricalPercentiliesAndMeanChart(stats: Stats): Option[String] = { val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) if (withHistorical.size > 1) { - val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, + val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(withHistorical, chartTitle, stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) Some(chartUrl) } else { From 4105cd949d002159b3b11e428d30b59400e05a49 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Jul 2011 08:13:12 +0200 Subject: [PATCH 06/10] Ticket 981: Better initialization of MatchingEngineRouting --- .../trading/common/OrderReceiver.scala | 27 +++++++------------ .../trading/common/TradingSystem.scala | 12 ++++++--- .../trading/oneway/OneWayOrderReceiver.scala | 4 +-- .../trading/oneway/OneWayTradingSystem.scala | 2 +- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index e9162299d1..869c186524 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -7,11 +7,13 @@ import akka.event.EventHandler trait OrderReceiver { type ME - val matchingEngines: List[ME] - var matchingEnginePartitionsIsStale = true var matchingEngineForOrderbook: Map[String, ME] = Map() - def refreshMatchingEnginePartitions() { + def refreshMatchingEnginePartitions(routing: MatchingEngineRouting[ME]) { + + val matchingEngines: List[ME] = routing.mapping.keys.toList + def supportedOrderbooks(me: ME): List[String] = routing.mapping(me) + val m = Map() ++ (for { me ← matchingEngines @@ -19,14 +21,11 @@ trait OrderReceiver { } yield (orderbookSymbol, me)) matchingEngineForOrderbook = m - matchingEnginePartitionsIsStale = false } - def supportedOrderbooks(me: ME): List[String] - } -class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher]) +class AkkaOrderReceiver(disp: Option[MessageDispatcher]) extends Actor with OrderReceiver { type ME = ActorRef @@ -34,21 +33,13 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp self.dispatcher = d } - override val matchingEngines: List[ActorRef] = matchingEngineRouting.keys.toList - - override def preStart() { - refreshMatchingEnginePartitions() - } - def receive = { + case routing@MatchingEngineRouting(mapping) ⇒ + refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) case order: Order ⇒ placeOrder(order) case unknown ⇒ EventHandler.warning(this, "Received unknown message: " + unknown) } - override def supportedOrderbooks(me: ActorRef): List[String] = { - matchingEngineRouting(me) - } - def placeOrder(order: Order) = { val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) matchingEngine match { @@ -60,3 +51,5 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp } } } + +case class MatchingEngineRouting[ME](mapping: Map[ME, List[String]]) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala index 44951879c5..ed822cf1be 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala @@ -75,7 +75,7 @@ class AkkaTradingSystem extends TradingSystem { (1 to 10).toList map (i ⇒ createOrderReceiver()) } - def matchingEngineRouting: Map[ActorRef, List[String]] = { + def matchingEngineRouting: MatchingEngineRouting[ActorRef] = { val rules = for { info ← matchingEngines @@ -84,11 +84,11 @@ class AkkaTradingSystem extends TradingSystem { (info.primary, orderbookSymbols) } - Map() ++ rules + MatchingEngineRouting(Map() ++ rules) } def createOrderReceiver() = - actorOf(new AkkaOrderReceiver(matchingEngineRouting, orDispatcher)) + actorOf(new AkkaOrderReceiver(orDispatcher)) override def start() { for (MatchingEngineInfo(p, s, o) ← matchingEngines) { @@ -97,7 +97,11 @@ class AkkaTradingSystem extends TradingSystem { s.foreach(_.start()) s.foreach(p ! _) } - orderReceivers.foreach(_.start()) + val routing = matchingEngineRouting + for (or ← orderReceivers) { + or.start() + or ! routing + } } override def shutdown() { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala index d64639d3fa..16e3a41048 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala @@ -6,8 +6,8 @@ import akka.event.EventHandler import akka.performance.trading.domain._ import akka.performance.trading.common.AkkaOrderReceiver -class OneWayOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher]) - extends AkkaOrderReceiver(matchingEngineRouting, disp) { +class OneWayOrderReceiver(disp: Option[MessageDispatcher]) + extends AkkaOrderReceiver(disp) { override def placeOrder(order: Order) = { val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala index d6fcafbf7c..57737e9c0e 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala @@ -11,6 +11,6 @@ class OneWayTradingSystem extends AkkaTradingSystem { actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher)) override def createOrderReceiver() = - actorOf(new OneWayOrderReceiver(matchingEngineRouting, orDispatcher)) + actorOf(new OneWayOrderReceiver(orDispatcher)) } \ No newline at end of file From 031253f16dedc7751e4c97bc76e8a322e4ffbfcb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Jul 2011 08:57:59 +0200 Subject: [PATCH 07/10] Ticket 981: Include system info in report --- .../performance/trading/common/Report.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala index 5e8dc6c2a2..880711b8f2 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala @@ -5,6 +5,7 @@ import java.io.PrintWriter import java.io.FileWriter import akka.event.EventHandler import java.util.Date +import java.lang.management.ManagementFactory class Report( resultRepository: BenchResultRepository, @@ -41,6 +42,11 @@ class Report( comparePercentilesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) } + sb.append("
\n") + sb.append("
\n")
+    sb.append(systemInformation)
+    sb.append("\n
\n") + val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) val reportName = current.name + "--" + timestamp + ".html" resultRepository.saveHtmlReport(sb.toString, reportName) @@ -140,6 +146,51 @@ class Report( } + def systemInformation: String = { + val runtime = ManagementFactory.getRuntimeMXBean + val os = ManagementFactory.getOperatingSystemMXBean + val threads = ManagementFactory.getThreadMXBean + val mem = ManagementFactory.getMemoryMXBean + val heap = mem.getHeapMemoryUsage + + val sb = new StringBuilder + + sb.append("Benchmark properties: ") + import scala.collection.JavaConversions._ + val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString) + for (name ← propNames if name.startsWith("benchmark.")) { + sb.append(name).append("=").append(System.getProperty(name)).append(" ") + } + sb.append("\n") + + sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion) + sb.append("\n") + sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor). + append(" ").append(runtime.getVmVersion) + sb.append("\n") + val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString(" ") + sb.append("Args: ").append(args) + sb.append("\n") + sb.append("Processors: ").append(os.getAvailableProcessors) + sb.append("\n") + sb.append("Load average: ").append(os.getSystemLoadAverage) + sb.append("\n") + sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")") + sb.append("\n") + sb.append("Heap: ").append(formatDouble(heap.getUsed.toDouble / 1024 / 1024)). + append(" (").append(formatDouble(heap.getInit.toDouble / 1024 / 1024)). + append(" - "). + append(formatDouble(heap.getMax.toDouble / 1024 / 1024)). + append(")").append(" MB") + sb.append("\n") + + sb.toString + } + + def formatDouble(value: Double): String = { + new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString + } + def header(title: String) = """| | From 82710595d81f11fec7392eecd0ced42318c46b8a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Jul 2011 09:11:45 +0200 Subject: [PATCH 08/10] Ticket 981: Moved general stuff to separate package --- .../trading/common/PerformanceTest.scala | 6 +++--- .../BenchResultRepository.scala | 10 ++++++---- .../GoogleChartBuilder.scala | 7 +++---- .../{trading/common => workbench}/Report.scala | 16 +++++++++------- .../{trading/common => workbench}/Stats.scala | 2 +- 5 files changed, 22 insertions(+), 19 deletions(-) rename akka-actor-tests/src/test/scala/akka/performance/{trading/common => workbench}/BenchResultRepository.scala (98%) rename akka-actor-tests/src/test/scala/akka/performance/{trading/common => workbench}/GoogleChartBuilder.scala (97%) rename akka-actor-tests/src/test/scala/akka/performance/{trading/common => workbench}/Report.scala (97%) rename akka-actor-tests/src/test/scala/akka/performance/{trading/common => workbench}/Stats.scala (88%) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index 69a7b4bd08..3c160a09ab 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -1,7 +1,5 @@ package akka.performance.trading.common -import java.text.SimpleDateFormat -import java.util.Date import java.util.Random import scala.collection.immutable.TreeMap @@ -12,11 +10,13 @@ import org.junit.After import org.junit.Before import org.scalatest.junit.JUnitSuite -import akka.event.EventHandler import akka.performance.trading.domain.Ask import akka.performance.trading.domain.Bid import akka.performance.trading.domain.Order import akka.performance.trading.domain.TotalTradeCounter +import akka.performance.workbench.BenchResultRepository +import akka.performance.workbench.Report +import akka.performance.workbench.Stats trait PerformanceTest extends JUnitSuite { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala similarity index 98% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index c14bb62f18..0c8e5f0cb2 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -1,18 +1,20 @@ -package akka.performance.trading.common +package akka.performance.workbench import java.io.BufferedInputStream import java.io.BufferedOutputStream import java.io.File import java.io.FileInputStream import java.io.FileOutputStream +import java.io.FileWriter import java.io.ObjectInputStream import java.io.ObjectOutputStream +import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ Map ⇒ MutableMap } + +import scala.collection.mutable.{Map => MutableMap} + import akka.event.EventHandler -import java.io.PrintWriter -import java.io.FileWriter trait BenchResultRepository { def add(stats: Stats) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala similarity index 97% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala index 301227affa..a2d2a381a8 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala @@ -1,10 +1,9 @@ -package akka.performance.trading.common +package akka.performance.workbench import java.io.UnsupportedEncodingException import java.net.URLEncoder + import scala.collection.immutable.TreeMap -import java.util.Locale -import java.util.Formatter /** * Generates URLs to Google Chart API http://code.google.com/apis/chart/ @@ -17,7 +16,7 @@ object GoogleChartBuilder { /** * Builds a bar chart for all percentiles and the mean in the statistics. */ - def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: akka.performance.trading.common.Stats ⇒ String): String = { + def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { if (statistics.isEmpty) return "" val current = statistics.last diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala similarity index 97% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index 880711b8f2..2be216fc7a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -1,11 +1,13 @@ -package akka.performance.trading.common -import java.io.File -import java.text.SimpleDateFormat -import java.io.PrintWriter -import java.io.FileWriter -import akka.event.EventHandler -import java.util.Date +package akka.performance.workbench + import java.lang.management.ManagementFactory +import java.text.SimpleDateFormat +import java.util.Date + +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.JavaConversions.enumerationAsScalaIterator + +import akka.event.EventHandler class Report( resultRepository: BenchResultRepository, diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala similarity index 88% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala index 1b1b854cb0..4a15a2c10e 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala @@ -1,4 +1,4 @@ -package akka.performance.trading.common +package akka.performance.workbench import scala.collection.immutable.TreeMap From eccee3d12f513eac019c130f3a79840a69e93f3d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Jul 2011 10:00:57 +0200 Subject: [PATCH 09/10] Ticket 981: minor --- .../akka/performance/trading/common/BenchmarkScenarios.scala | 2 ++ .../src/test/scala/akka/performance/workbench/Stats.scala | 1 + 2 files changed, 3 insertions(+) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala index 6cbd6ee4ca..5442deacd5 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala @@ -29,12 +29,14 @@ trait BenchmarkScenarios extends PerformanceTest { def complexScenario80 = complexScenario(80) @Test def complexScenario100 = complexScenario(100) + /* @Test def complexScenario200 = complexScenario(200) @Test def complexScenario300 = complexScenario(300) @Test def complexScenario400 = complexScenario(400) + */ def complexScenario(numberOfClients: Int) { Assume.assumeTrue(numberOfClients >= minClients) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala index 4a15a2c10e..c307d997e3 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala @@ -2,6 +2,7 @@ package akka.performance.workbench import scala.collection.immutable.TreeMap +@SerialVersionUID(1L) case class Stats( name: String, load: Int, From a5bbbb572a0f4ae01e276d804b7241b4fe14f1ae Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Jul 2011 10:01:25 +0200 Subject: [PATCH 10/10] Ticket 981: Include akka properties in report --- .../akka/performance/workbench/Report.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index 2be216fc7a..dce6017203 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -8,6 +8,8 @@ import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.enumerationAsScalaIterator import akka.event.EventHandler +import akka.config.Config +import akka.config.Config.config class Report( resultRepository: BenchResultRepository, @@ -157,11 +159,11 @@ class Report( val sb = new StringBuilder - sb.append("Benchmark properties: ") + sb.append("Benchmark properties:") import scala.collection.JavaConversions._ val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString) - for (name ← propNames if name.startsWith("benchmark.")) { - sb.append(name).append("=").append(System.getProperty(name)).append(" ") + for (name ← propNames if name.startsWith("benchmark")) { + sb.append("\n ").append(name).append("=").append(System.getProperty(name)) } sb.append("\n") @@ -170,9 +172,6 @@ class Report( sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor). append(" ").append(runtime.getVmVersion) sb.append("\n") - val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString(" ") - sb.append("Args: ").append(args) - sb.append("\n") sb.append("Processors: ").append(os.getAvailableProcessors) sb.append("\n") sb.append("Load average: ").append(os.getSystemLoadAverage) @@ -186,6 +185,17 @@ class Report( append(")").append(" MB") sb.append("\n") + val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString("\n ") + sb.append("Args:\n ").append(args) + sb.append("\n") + + sb.append("Akka version: ").append(Config.CONFIG_VERSION) + sb.append("\n") + sb.append("Akka config:") + for (key ← config.keys) { + sb.append("\n ").append(key).append("=").append(config(key)) + } + sb.toString }