From e5f232badcc02a2b32633dd29d2eece828fefbbc Mon Sep 17 00:00:00 2001 From: jboner Date: Tue, 1 Dec 2009 10:09:29 +0100 Subject: [PATCH] added memory footprint test --- akka-actors/src/main/scala/actor/Actor.scala | 34 +++++++++---------- akka-actors/src/test/scala/MemoryTest.scala | 32 +++++++++++++++++ akka-amqp/src/main/scala/ExampleSession.scala | 20 ++++------- 3 files changed, 56 insertions(+), 30 deletions(-) create mode 100644 akka-actors/src/test/scala/MemoryTest.scala diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index a63af38fc9..ed021c1e16 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -66,7 +66,7 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { /** * @author Jonas Bonér */ -object Actor extends Logging { +object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) @@ -115,7 +115,7 @@ object Actor extends Logging { * * */ - def actor[A](body: => Unit) = { + def actor[A](body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start @@ -129,7 +129,7 @@ object Actor extends Logging { /** * Use to create an anonymous event-driven actor with a body but no message loop block. *

- * This actor can not respond to any messages but can be used as a simple way to + * This actor can not respond to any messages but can be used as a simple way to * spawn a lightweight thread to process some task. *

* The actor is started when created. @@ -183,7 +183,7 @@ object Actor extends Logging { * } * */ - def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { + def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { lifeCycle = Some(lifeCycleConfig) @@ -213,11 +213,11 @@ trait Actor extends TransactionManagement { ActorRegistry.register(this) implicit protected val self: Actor = this - + // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid - + // ==================================== // private fields // ==================================== @@ -226,7 +226,7 @@ trait Actor extends TransactionManagement { @volatile private var _isShutDown: Boolean = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _config: Option[AnyRef] = None - private val _remoteFlagLock = new ReadWriteLock + private val _remoteFlagLock = new ReadWriteLock private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[Actor]] = None private[akka] var _mailbox: MessageQueue = _ @@ -244,7 +244,7 @@ trait Actor extends TransactionManagement { *

* This sender reference can be used together with the '!' method for request/reply * message exchanges and which is in many ways better than using the '!!' method - * which will make the sender wait for a reply using a *blocking* future. + * which will make the sender wait for a reply using a *blocking* future. */ protected[this] var sender: Option[Actor] = None @@ -284,7 +284,7 @@ trait Actor extends TransactionManagement { *

* You can override it so it fits the specific use-case that the actor is used for. * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. + * dispatchers available. *

* The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. @@ -301,7 +301,7 @@ trait Actor extends TransactionManagement { * Set trapExit to the list of exception classes that the actor should be able to trap * from the actor it is supervising. When the supervising actor throws these exceptions * then they will trigger a restart. - *

+ *

*

    * // trap all exceptions
    * trapExit = List(classOf[Throwable])
@@ -448,7 +448,7 @@ trait Actor extends TransactionManagement {
    *
    * If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
    * 

- * + * * This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable. *

    *   actor ! message
@@ -514,7 +514,7 @@ trait Actor extends TransactionManagement {
     getResultOrThrowException(future)
   } else throw new IllegalStateException(
     "Actor has not been started, you need to invoke 'actor.start' before using it")
-  
+
   /**
    * Sends a message asynchronously and waits on a future for a reply message.
    * 

@@ -530,7 +530,7 @@ trait Actor extends TransactionManagement { def !![T](message: AnyRef): Option[T] = !![T](message, timeout) /** - * This method is evil and has been removed. Use '!!' with a timeout instead. + * This method is evil and has been removed. Use '!!' with a timeout instead. */ def !?[T](message: AnyRef): T = throw new UnsupportedOperationException( "'!?' is evil and has been removed. Use '!!' with a timeout instead") @@ -576,7 +576,7 @@ trait Actor extends TransactionManagement { } else throw new IllegalArgumentException( "Can not swap dispatcher for " + toString + " after it has been started") } - + /** * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host. */ @@ -744,7 +744,7 @@ trait Actor extends TransactionManagement { private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(timeout) @@ -804,7 +804,7 @@ trait Actor extends TransactionManagement { private def transactionalDispatch[T](messageHandle: MessageInvocation) = { setTransaction(messageHandle.tx) - + val message = messageHandle.message //serializeMessage(messageHandle.message) senderFuture = messageHandle.future sender = messageHandle.sender @@ -820,7 +820,7 @@ trait Actor extends TransactionManagement { decrementTransaction } } - + try { if (isTransactionRequiresNew && !isTransactionInScope) { if (senderFuture.isEmpty) throw new StmException( diff --git a/akka-actors/src/test/scala/MemoryTest.scala b/akka-actors/src/test/scala/MemoryTest.scala new file mode 100644 index 0000000000..083b964bc7 --- /dev/null +++ b/akka-actors/src/test/scala/MemoryTest.scala @@ -0,0 +1,32 @@ +package se.scalablesolutions.akka.actor + +import junit.framework.TestCase + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import scala.collection.mutable.HashSet + +class MemoryFootprintTest extends JUnitSuite { + class Mem extends Actor { + def receive = { + case _ => {} + } + } + + @Test + def shouldCreateManyActors = { + /* println("============== MEMORY TEST ==============") + val actors = new HashSet[Actor] + println("Total memory: " + Runtime.getRuntime.totalMemory) + (1 until 1000000).foreach {i => + val mem = new Mem + actors += mem + if ((i % 100000) == 0) { + println("Nr actors: " + i) + println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory)) + } + } + */ + assert(true) + } +} diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala index 158dbe46d0..998c520620 100644 --- a/akka-amqp/src/main/scala/ExampleSession.scala +++ b/akka-amqp/src/main/scala/ExampleSession.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.amqp -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.actor.Actor.Sender.Self import com.rabbitmq.client.ConnectionParameters @@ -30,10 +30,8 @@ object ExampleSession { def direct = { val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() { - def receive = { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - } + consumer ! MessageConsumerListener("@george_bush", "direct", actor { + case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) }) val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100) producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct") @@ -41,15 +39,11 @@ object ExampleSession { def fanout = { val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]()) - consumer ! MessageConsumerListener("@george_bush", "", new Actor() { - def receive = { - case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - } + consumer ! MessageConsumerListener("@george_bush", "", actor { + case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) }) - consumer ! MessageConsumerListener("@barack_obama", "", new Actor() { - def receive = { - case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) - } + consumer ! MessageConsumerListener("@barack_obama", "", actor { + case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]])) }) val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")