diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index c8fc3af74d..ecb0d2f497 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -611,18 +611,15 @@ trait Actor extends TransactionManagement { /** * Get the dispatcher for this actor. */ - def dispatcher: MessageDispatcher = - if (_isRunning) messageDispatcher - else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") - + def dispatcher: MessageDispatcher = messageDispatcher /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized { + def dispatcher_=(md: MessageDispatcher): Unit = synchronized { if (!_isRunning) { messageDispatcher.unregister(this) - messageDispatcher = dispatcher + messageDispatcher = md messageDispatcher.register(this) _isEventBased = messageDispatcher.isInstanceOf[ExecutorBasedEventDrivenDispatcher] } else throw new IllegalArgumentException( diff --git a/akka-samples-chat/src/main/scala/ChatServer.scala b/akka-samples-chat/src/main/scala/ChatServer.scala index f5d7888a16..508005a724 100644 --- a/akka-samples-chat/src/main/scala/ChatServer.scala +++ b/akka-samples-chat/src/main/scala/ChatServer.scala @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.{OneForOneStrategy} /** - * ChatServer's internal messages. + * ChatServer's internal events. */ sealed trait Event case class Login(user: String) extends Event @@ -20,87 +20,98 @@ case class ChatLog(log: List[String]) extends Event case class ChatMessage(from: String, message: String) extends Event /** - *
- * curl http://localhost:9998/chat
- * 
- * Or browse to the URL from a web browser. + * Chat client. */ -//@Path("/chat") -object ChatServer extends Actor { - faultHandler = Some(OneForOneStrategy(5, 5000)) - trapExit = List(classOf[Exception]) +class ChatClient(val name: String) { + import Actor.Sender.Self + def login = ChatServer ! Login(name) + def logout = ChatServer ! Logout(name) + def chatLog: ChatLog = (ChatServer !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer")) + def post(message: String) = ChatServer ! ChatMessage(name, name + ": " + message) +} - log.info("Chat server is starting up...") - - private var sessions = Map[String, Actor]() - private var chatLog: List[String] = Nil - - class Session(user: String) extends Actor { - lifeCycle = Some(LifeCycle(Permanent)) - private val loginTime = System.currentTimeMillis - private var userLog: List[String] = Nil - - log.info("New session for user [%s] has been created", user) - - def receive = { - case ChatMessage(from, message) => userLog ::= message - case chatLog @ ChatLog(_) => reply(chatLog) - } - } - - def receive = chatting orElse sessionManagement +/** + * Internal chat client session. + */ +class Session(user: String, storage: Actor) extends Actor { + lifeCycle = Some(LifeCycle(Permanent)) + private val loginTime = System.currentTimeMillis + private var userLog: List[String] = Nil - private def sessionManagement: PartialFunction[Any, Unit] = { - case Login(user) => - log.info("User [%s] has logged in", user) - val session = new Session(user) - startLink(session) - sessions = sessions + (user -> session) - - case Logout(user) => - log.info("User [%s] has logged out", user) - val session = sessions(user) - unlink(session) - session.stop - sessions = sessions - user - } - - private def chatting: PartialFunction[Any, Unit] = { + log.info("New session for user [%s] has been created", user) + + def receive = { case msg @ ChatMessage(from, message) => - log.debug("New chat message [%s]", message) - chatLog ::= message - sessions(from).forward(msg) - - case GetChatLog(from) => //reply(ChatLog(chatLog.reverse)) - sessions(from).forward(ChatLog(chatLog.reverse)) - } - - override def shutdown = sessions.foreach { case (user, session) => - log.info("Chat server is shutting down...") - unlink(session) - session.stop + userLog ::= message + storage ! msg + + case msg @ GetChatLog(_) => + storage forward msg } } -class User extends Actor { me: User => - private var name: String = "unknown" +/** + * Chat storage holding the chat log. + */ +class Storage extends Actor { + lifeCycle = Some(LifeCycle(Permanent)) + private var chatLog: List[String] = Nil - def login(n: String) = { - name = n - ChatServer ! Login(name) + log.info("Chat storage is starting up...") + + def receive = { + case msg @ ChatMessage(from, message) => + log.debug("New chat message [%s]", message) + chatLog ::= message + + case GetChatLog(_) => + reply(ChatLog(chatLog.reverse)) } +} - def logout = ChatServer ! Logout(name) +/** + * Chat server. Manages sessions and redirects all other messages to the Session for the client. + */ +object ChatServer extends Actor { + id = "ChatServer" + faultHandler = Some(OneForOneStrategy(5, 5000)) + trapExit = List(classOf[Exception]) + + private var storage: Storage = _ + private var sessions = Map[String, Actor]() - def chatLog: List[String] = { - val chatLog: ChatLog = (ChatServer !! GetChatLog(name)).getOrElse(throw new Exception("Couldn't get the chat log from ChatServer")) - chatLog.log + log.info("Chat service is starting up...") + + override def init = storage = spawnLink(classOf[Storage]) + + def receive = sessionManagement orElse chatting + + private def sessionManagement: PartialFunction[Any, Unit] = { + case Login(username) => + log.info("User [%s] has logged in", username) + val session = new Session(username, storage) + startLink(session) + sessions = sessions + (username -> session) + + case Logout(username) => + log.info("User [%s] has logged out", username) + val session = sessions(username) + unlink(session) + session.stop + sessions = sessions - username + } + + private def chatting: PartialFunction[Any, Unit] = { + case msg @ ChatMessage(from, _) => sessions(from) ! msg + case msg @ GetChatLog(from) => sessions(from) forward msg } - def post(message: String) = ChatServer ! ChatMessage(name, name + ": " + message) - - def receive = { - case _ => log.error("User does not respond to messages") + override def shutdown = sessions.foreach { case (_, session) => + log.info("Chat server is shutting down...") + unlink(session) + session.stop + unlink(storage) + storage.stop } } @@ -110,30 +121,26 @@ class Boot { RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), Supervise( ChatServer, - LifeCycle(Permanent)) + LifeCycle(Permanent)) :: Nil)) factory.newInstance.start } -object ChatBot { -// val server = new RemoteServer -// server.start("localhost", 1999) +object Runner { + ChatServer.makeRemote("localhost", 9999) + ChatServer.start def run = { - ChatServer.makeRemote("localhost", 9999) - ChatServer.start - - val user = new User + val client = new ChatClient("jonas") - user.start - user.login("jonas") + client.login - user.post("Hi there") - println("CHAT LOG: " + user.chatLog) + client.post("Hi there") + println("CHAT LOG: " + client.chatLog.log) - user.post("Hi again") - println("CHAT LOG: " + user.chatLog) + client.post("Hi again") + println("CHAT LOG: " + client.chatLog.log) - user.logout + client.logout } } \ No newline at end of file