diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index a63af38fc9..ed021c1e16 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -66,7 +66,7 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { /** * @author Jonas Bonér */ -object Actor extends Logging { +object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) @@ -115,7 +115,7 @@ object Actor extends Logging { * * */ - def actor[A](body: => Unit) = { + def actor[A](body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start @@ -129,7 +129,7 @@ object Actor extends Logging { /** * Use to create an anonymous event-driven actor with a body but no message loop block. *
- * This actor can not respond to any messages but can be used as a simple way to + * This actor can not respond to any messages but can be used as a simple way to * spawn a lightweight thread to process some task. * * The actor is started when created. @@ -183,7 +183,7 @@ object Actor extends Logging { * } * */ - def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { + def actor[A](lifeCycleConfig: LifeCycle)(body: => Unit) = { def handler[A](body: Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { lifeCycle = Some(lifeCycleConfig) @@ -213,11 +213,11 @@ trait Actor extends TransactionManagement { ActorRegistry.register(this) implicit protected val self: Actor = this - + // FIXME http://www.assembla.com/spaces/akka/tickets/56-Change-UUID-generation-for-the-TransactionManagement-trait private[akka] var _uuid = Uuid.newUuid.toString def uuid = _uuid - + // ==================================== // private fields // ==================================== @@ -226,7 +226,7 @@ trait Actor extends TransactionManagement { @volatile private var _isShutDown: Boolean = false private var _hotswap: Option[PartialFunction[Any, Unit]] = None private var _config: Option[AnyRef] = None - private val _remoteFlagLock = new ReadWriteLock + private val _remoteFlagLock = new ReadWriteLock private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[Actor]] = None private[akka] var _mailbox: MessageQueue = _ @@ -244,7 +244,7 @@ trait Actor extends TransactionManagement { * * This sender reference can be used together with the '!' method for request/reply * message exchanges and which is in many ways better than using the '!!' method - * which will make the sender wait for a reply using a *blocking* future. + * which will make the sender wait for a reply using a *blocking* future. */ protected[this] var sender: Option[Actor] = None @@ -284,7 +284,7 @@ trait Actor extends TransactionManagement { * * You can override it so it fits the specific use-case that the actor is used for. * See the se.scalablesolutions.akka.dispatch.Dispatchers class for the different - * dispatchers available. + * dispatchers available. * * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. @@ -301,7 +301,7 @@ trait Actor extends TransactionManagement { * Set trapExit to the list of exception classes that the actor should be able to trap * from the actor it is supervising. When the supervising actor throws these exceptions * then they will trigger a restart. - * + * *
* // trap all exceptions
* trapExit = List(classOf[Throwable])
@@ -448,7 +448,7 @@ trait Actor extends TransactionManagement {
*
* If invoked from within an actor then the actor reference is implicitly passed on as the implicit 'sender' argument.
*
- *
+ *
* This actor 'sender' reference is then available in the receiving actor in the 'sender' member variable.
*
* actor ! message
@@ -514,7 +514,7 @@ trait Actor extends TransactionManagement {
getResultOrThrowException(future)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
-
+
/**
* Sends a message asynchronously and waits on a future for a reply message.
*
@@ -530,7 +530,7 @@ trait Actor extends TransactionManagement {
def !: Option[T] = !
/**
- * This method is evil and has been removed. Use '!!' with a timeout instead.
+ * This method is evil and has been removed. Use '!!' with a timeout instead.
*/
def !?[T](message: AnyRef): T = throw new UnsupportedOperationException(
"'!?' is evil and has been removed. Use '!!' with a timeout instead")
@@ -576,7 +576,7 @@ trait Actor extends TransactionManagement {
} else throw new IllegalArgumentException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
-
+
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
@@ -744,7 +744,7 @@ trait Actor extends TransactionManagement {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long):
CompletableFutureResult = _remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (_remoteAddress.isDefined) {
- val requestBuilder = RemoteRequest.newBuilder
+ val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
.setTarget(this.getClass.getName)
.setTimeout(timeout)
@@ -804,7 +804,7 @@ trait Actor extends TransactionManagement {
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
setTransaction(messageHandle.tx)
-
+
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
sender = messageHandle.sender
@@ -820,7 +820,7 @@ trait Actor extends TransactionManagement {
decrementTransaction
}
}
-
+
try {
if (isTransactionRequiresNew && !isTransactionInScope) {
if (senderFuture.isEmpty) throw new StmException(
diff --git a/akka-actors/src/test/scala/MemoryTest.scala b/akka-actors/src/test/scala/MemoryTest.scala
new file mode 100644
index 0000000000..083b964bc7
--- /dev/null
+++ b/akka-actors/src/test/scala/MemoryTest.scala
@@ -0,0 +1,32 @@
+package se.scalablesolutions.akka.actor
+
+import junit.framework.TestCase
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+import scala.collection.mutable.HashSet
+
+class MemoryFootprintTest extends JUnitSuite {
+ class Mem extends Actor {
+ def receive = {
+ case _ => {}
+ }
+ }
+
+ @Test
+ def shouldCreateManyActors = {
+ /* println("============== MEMORY TEST ==============")
+ val actors = new HashSet[Actor]
+ println("Total memory: " + Runtime.getRuntime.totalMemory)
+ (1 until 1000000).foreach {i =>
+ val mem = new Mem
+ actors += mem
+ if ((i % 100000) == 0) {
+ println("Nr actors: " + i)
+ println("Total memory: " + (Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory))
+ }
+ }
+ */
+ assert(true)
+ }
+}
diff --git a/akka-amqp/src/main/scala/ExampleSession.scala b/akka-amqp/src/main/scala/ExampleSession.scala
index 158dbe46d0..998c520620 100644
--- a/akka-amqp/src/main/scala/ExampleSession.scala
+++ b/akka-amqp/src/main/scala/ExampleSession.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.amqp
-import se.scalablesolutions.akka.actor.Actor
+import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.Actor.Sender.Self
import com.rabbitmq.client.ConnectionParameters
@@ -30,10 +30,8 @@ object ExampleSession {
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
- consumer ! MessageConsumerListener("@george_bush", "direct", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@george_bush", "direct", actor {
+ case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, IM, None, None, 100)
producer ! Message("@jonas_boner: You sucked!!".getBytes, "direct")
@@ -41,15 +39,11 @@ object ExampleSession {
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
- consumer ! MessageConsumerListener("@george_bush", "", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@george_bush", "", actor {
+ case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
- consumer ! MessageConsumerListener("@barack_obama", "", new Actor() {
- def receive = {
- case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
- }
+ consumer ! MessageConsumerListener("@barack_obama", "", actor {
+ case Message(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
val producer = AMQP.newProducer(CONFIG, HOSTNAME, PORT, CHAT, None, None, 100)
producer ! Message("@jonas_boner: I'm going surfing".getBytes, "")