diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index ecb0d2f497..79ae48a9a7 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -199,6 +199,25 @@ object Actor extends Logging { } handler(body) } + + /** + * Use to create an anonymous event-driven remote actor. + * The actor is started when created. + * Example: + *
+   * import Actor._
+   *
+   * val a = actor("localhost", 9999)  {
+   *   case msg => ... // handle message
+   * }
+   * 
+ */ + def actor(hostname: String, port: Int)(body: PartialFunction[Any, Unit]): Actor = new Actor() { + makeRemote(hostname, port) + start + def receive = body + } + } /** @@ -672,14 +691,11 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def link(actor: Actor) = { - if (_isRunning) { - getLinkedActors.add(actor) - if (actor._supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") - actor._supervisor = Some(this) - Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this) - } else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + getLinkedActors.add(actor) + if (actor._supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails") + actor._supervisor = Some(this) + Actor.log.debug("Linking actor [%s] to actor [%s]", actor, this) } /** @@ -688,14 +704,11 @@ trait Actor extends TransactionManagement { * To be invoked from within the actor itself. */ protected[this] def unlink(actor: Actor) = { - if (_isRunning) { - if (!getLinkedActors.contains(actor)) throw new IllegalStateException( - "Actor [" + actor + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actor) - actor._supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actor, this) - } else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + if (!getLinkedActors.contains(actor)) throw new IllegalStateException( + "Actor [" + actor + "] is not a linked actor, can't unlink") + getLinkedActors.remove(actor) + actor._supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actor, this) } /** diff --git a/akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala b/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala similarity index 96% rename from akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala rename to akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index 308b7773ba..b577f0ba72 100644 --- a/akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala +++ b/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -5,7 +5,7 @@ import junit.framework.TestCase import org.junit.{Test, Before} import org.junit.Assert._ -import se.scalablesolutions.akka.actor.Transactor +import se.scalablesolutions.akka.actor.{Actor, Transactor} /** * A persistent actor based on Redis storage. @@ -85,15 +85,14 @@ class AccountActor extends Transactor { } } -@serializable class PersistentFailerActor extends Actor { - makeTransactionRequired +@serializable class PersistentFailerActor extends Transactor { def receive = { case "Failure" => throw new RuntimeException("expected") } } -class RedisPersistentActorTest extends TestCase { +class RedisPersistentActorSpec extends TestCase { @Test def testSuccessfulDebit = { val bactor = new AccountActor diff --git a/akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala b/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala similarity index 99% rename from akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala rename to akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index 262bda47eb..cd91d4a591 100644 --- a/akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala +++ b/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -11,7 +11,7 @@ import se.scalablesolutions.akka.serialization.Serializable import RedisStorageBackend._ @RunWith(classOf[JUnitRunner]) -class RedisStorageBackendTest extends +class RedisStorageBackendSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { diff --git a/akka-samples-chat/README b/akka-samples-chat/README index 5003ffc128..57084eeef6 100644 --- a/akka-samples-chat/README +++ b/akka-samples-chat/README @@ -8,4 +8,4 @@ To run the sample: - scala> import se.scalablesolutions.akka.sample.chat._ - scala> Runner.run 5. See the chat simulation run -6. Run it again if you like +6. Run it again to see full speed after first initialization diff --git a/akka-samples-chat/src/main/scala/ChatServer.scala b/akka-samples-chat/src/main/scala/ChatServer.scala index 5a34b614d7..76dc175f82 100644 --- a/akka-samples-chat/src/main/scala/ChatServer.scala +++ b/akka-samples-chat/src/main/scala/ChatServer.scala @@ -6,9 +6,12 @@ package se.scalablesolutions.akka.sample.chat import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor} import se.scalablesolutions.akka.remote.RemoteServer +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.{OneForOneStrategy} +import scala.collection.mutable.HashMap + /****************************************************************************** To run the sample: 1. Run 'mvn install' (builds and deploys jar to AKKA_HOME/deploy) @@ -18,7 +21,7 @@ import se.scalablesolutions.akka.config.{OneForOneStrategy} - scala> import se.scalablesolutions.akka.sample.chat._ - scala> Runner.run 5. See the chat simulation run - 6. Run it again if you like + 6. Run it again to see full speed after first initialization ******************************************************************************/ /** @@ -36,21 +39,20 @@ case class ChatMessage(from: String, message: String) extends Event */ class ChatClient(val name: String) { import Actor.Sender.Self - def login = ChatServer ! Login(name) - def logout = ChatServer ! Logout(name) - def post(message: String) = ChatServer ! ChatMessage(name, name + ": " + message) - def chatLog: ChatLog = (ChatServer !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer")) + def login = ChatService ! Login(name) + def logout = ChatService ! Logout(name) + def post(message: String) = ChatService ! ChatMessage(name, name + ": " + message) + def chatLog: ChatLog = (ChatService !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer")) } /** * Internal chat client session. */ class Session(user: String, storage: Actor) extends Actor { - lifeCycle = Some(LifeCycle(Permanent)) private val loginTime = System.currentTimeMillis private var userLog: List[String] = Nil - log.info("New session for user [%s] has been created", user) + log.info("New session for user [%s] has been created at [%s]", user, loginTime) def receive = { case msg @ ChatMessage(from, message) => @@ -82,67 +84,105 @@ class Storage extends Actor { } /** - * Chat server. Manages sessions and redirects all other messages to the Session for the client. + * Implements user session management. + *

+ * Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. */ -object ChatServer extends Actor { - id = "ChatServer" - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Exception]) +trait SessionManagement { this: Actor => - private var storage: Storage = _ - private var sessions = Map[String, Actor]() - - log.info("Chat service is starting up...") - - override def init = storage = spawnLink(classOf[Storage]) + val storage: Storage // needs someone to provide the Storage + val sessions = new HashMap[String, Actor] - def receive = sessionManagement orElse chatting - - private def sessionManagement: PartialFunction[Any, Unit] = { + protected def sessionManagement: PartialFunction[Any, Unit] = { case Login(username) => log.info("User [%s] has logged in", username) val session = new Session(username, storage) - startLink(session) - sessions = sessions + (username -> session) + session.start + sessions += (username -> session) case Logout(username) => log.info("User [%s] has logged out", username) val session = sessions(username) - unlink(session) session.stop - sessions = sessions - username + sessions -= username } - private def chatting: PartialFunction[Any, Unit] = { + protected def shutdownSessions = + sessions.foreach { case (_, session) => session.stop } +} + +/** + * Implements chat management, e.g. chat message dispatch. + *

+ * Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. + */ +trait ChatManagement { this: Actor => + val sessions: HashMap[String, Actor] // needs someone to provide the Session map + + protected def chatManagement: PartialFunction[Any, Unit] = { case msg @ ChatMessage(from, _) => sessions(from) ! msg case msg @ GetChatLog(from) => sessions(from) forward msg } +} + +/** + * Chat server. Manages sessions and redirects all other messages to the Session for the client. + */ +trait ChatServer extends Actor { + id = "ChatServer" // setting ID to make sure there is only one single ChatServer on the remote node + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = List(classOf[Exception]) + + val storage: Storage = spawnLink(classOf[Storage]) // starts and links Storage + + log.info("Chat service is starting up...") + + // actor message handler + def receive = sessionManagement orElse chatManagement + + // abstract methods to be defined somewhere else + protected def chatManagement: PartialFunction[Any, Unit] + protected def sessionManagement: PartialFunction[Any, Unit] + protected def shutdownSessions: Unit + override def shutdown = { - sessions.foreach { case (_, session) => - log.info("Chat server is shutting down...") - unlink(session) - session.stop - } + log.info("Chat server is shutting down...") + shutdownSessions unlink(storage) storage.stop } } +/** + * Object encapsulating the full Chat Service. + */ +object ChatService extends ChatServer with SessionManagement with ChatManagement + +/** + * Boot class for running the ChatService in the Akka microkernel. + *

+ * Configures supervision of the ChatService for fault-tolerance. + */ class Boot { val factory = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), Supervise( - ChatServer, + ChatService, LifeCycle(Permanent)) :: Nil)) factory.newInstance.start } +/** + * Test runner emulating a chat session. + */ object Runner { - ChatServer.makeRemote("localhost", 9999) - ChatServer.start + + // create a handle to the remote ChatService + ChatService.makeRemote("localhost", 9999) + ChatService.start def run = { val client = new ChatClient("jonas") diff --git a/pom.xml b/pom.xml index 446bf56b85..74aac9e4a7 100755 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,10 @@ maven-surefire-plugin 2.4.2 + + **/*Test.java + + akka.home