From c882b2c5e1bf31158d5da012fdd27bc6d1718cee Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 Apr 2011 13:09:19 +0200 Subject: [PATCH] Adjusted chat sample to run with latest, and without Redis --- akka-samples/akka-sample-chat/Buildfile | 48 -------- akka-samples/akka-sample-chat/README | 32 ++--- .../src/main/scala/ChatServer.scala | 116 +++++++++++------- project/build/AkkaProject.scala | 4 + 4 files changed, 88 insertions(+), 112 deletions(-) delete mode 100644 akka-samples/akka-sample-chat/Buildfile diff --git a/akka-samples/akka-sample-chat/Buildfile b/akka-samples/akka-sample-chat/Buildfile deleted file mode 100644 index 814e6e4149..0000000000 --- a/akka-samples/akka-sample-chat/Buildfile +++ /dev/null @@ -1,48 +0,0 @@ -require 'buildr/scala' - -VERSION_NUMBER = "0.6" -GROUP = "se.scalablesolutions.akka" - -repositories.remote << "http://www.ibiblio.org/maven2/" -repositories.remote << "http://www.lag.net/repo" -repositories.remote << "http://multiverse.googlecode.com/svn/maven-repository/releases" - -AKKA = group('akka-remote', 'akka-comet', 'akka-util','akka-kernel', 'akka-rest', 'akka-util-java', - 'akka-security','akka-persistence-common', 'akka-persistence-redis', - 'akka-amqp', - :under=> 'se.scalablesolutions.akka', - :version => '0.6') -ASPECTJ = "org.codehaus.aspectwerkz:aspectwerkz-nodeps-jdk5:jar:2.1" -SBINARY = "sbinary:sbinary:jar:0.3" -COMMONS_IO = "commons-io:commons-io:jar:1.4" -CONFIGGY = "net.lag:configgy:jar:1.4.7" -JACKSON = group('jackson-core-asl', 'jackson-mapper-asl', - :under=> 'org.codehaus.jackson', - :version => '1.2.1') -MULTIVERSE = "org.multiverse:multiverse-alpha:jar:jar-with-dependencies:0.3" -NETTY = "org.jboss.netty:netty:jar:3.2.0.ALPHA2" -PROTOBUF = "com.google.protobuf:protobuf-java:jar:2.2.0" -REDIS = "com.redis:redisclient:jar:1.0.1" -SJSON = "sjson.json:sjson:jar:0.3" - -Project.local_task "run" - -desc "Akka Chat Sample Module" -define "akka-sample-chat" do - project.version = VERSION_NUMBER - project.group = GROUP - - compile.with AKKA, CONFIGGY - - p artifact(MULTIVERSE).to_s - - package(:jar) - - task "run" do - Java.java "scala.tools.nsc.MainGenericRunner", - :classpath => [ compile.dependencies, compile.target, - ASPECTJ, COMMONS_IO, JACKSON, NETTY, MULTIVERSE, PROTOBUF, REDIS, - SBINARY, SJSON], - :java_args => ["-server"] - end -end \ No newline at end of file diff --git a/akka-samples/akka-sample-chat/README b/akka-samples/akka-sample-chat/README index dff045d6f8..2c812593fa 100644 --- a/akka-samples/akka-sample-chat/README +++ b/akka-samples/akka-sample-chat/README @@ -1,32 +1,26 @@ Akka Chat Client/Server Sample Application -First we need to download, build and start up Redis: +How to run the sample: -1. Download Redis from http://code.google.com/p/redis/downloads/list. -2. Step into the distribution. -3. Build: ‘make install’. -4. Run: ‘./redis-server’. -For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart. - -Then to run the sample: - -1. Install the Redis network storage. Download it from [http://code.google.com/p/redis/]. -2. Open up a shell and start up an instance of Redis. -3. Fire up two shells. For each of them: +1. Fire up two shells. For each of them: - Step down into to the root of the Akka distribution. - Set 'export AKKA_HOME=. - Run 'sbt console' to start up a REPL (interpreter). -4. In the first REPL you get execute: +2. In the first REPL you get execute: - scala> import sample.chat._ - scala> import akka.actor.Actor._ - scala> val chatService = actorOf[ChatService].start -5. In the second REPL you get execute: +3. In the second REPL you get execute: - scala> import sample.chat._ - - scala> Runner.run -6. See the chat simulation run. -7. Run it again to see full speed after first initialization. - -Now you could test client reconnect by killing the console running the ChatService and start it up again. See the client reconnect take place in the REPL shell. + - scala> ClientRunner.run +4. See the chat simulation run. +5. Run it again to see full speed after first initialization. +6. In the client REPL, or in a new REPL, you can also create your own client + - scala> import sample.chat._ + - scala> val myClient = new ChatClient("") + - scala> myClient.login + - scala> myClient.post("Can I join?") + - scala> println("CHAT LOG:\n\t" + myClient.chatLog.log.mkString("\n\t")) That’s it. Have fun. diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index aa34824bab..695e463815 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -6,27 +6,16 @@ package sample.chat import scala.collection.mutable.HashMap -import akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor} -import akka.remote.{RemoteNode, RemoteClient} -import akka.persistence.common.PersistentVector -import akka.persistence.redis.RedisStorage +import akka.actor.{SupervisorFactory, Actor, ActorRef} import akka.stm._ import akka.config.Supervision.{OneForOneStrategy,Permanent} -import akka.util.Logging import Actor._ +import akka.event.EventHandler /****************************************************************************** Akka Chat Client/Server Sample Application -First we need to download, build and start up Redis: - -1. Download Redis from http://code.google.com/p/redis/downloads/list. -2. Step into the distribution. -3. Build: ‘make install’. -4. Run: ‘./redis-server’. -For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart. - -Then to run the sample: +How to run the sample: 1. Fire up two shells. For each of them: - Step down into to the root of the Akka distribution. @@ -38,9 +27,16 @@ Then to run the sample: - scala> val chatService = actorOf[ChatService].start 3. In the second REPL you get execute: - scala> import sample.chat._ - - scala> Runner.run + - scala> ClientRunner.run 4. See the chat simulation run. 5. Run it again to see full speed after first initialization. +6. In the client REPL, or in a new REPL, you can also create your own client + - scala> import sample.chat._ + - scala> val myClient = new ChatClient("") + - scala> myClient.login + - scala> myClient.post("Can I join?") + - scala> println("CHAT LOG:\n\t" + myClient.chatLog.log.mkString("\n\t")) + That’s it. Have fun. @@ -60,7 +56,7 @@ case class ChatMessage(from: String, message: String) extends Event * Chat client. */ class ChatClient(val name: String) { - val chat = RemoteClient.actorFor("chat:service", "localhost", 2552) + val chat = Actor.remote.actorFor("chat:service", "localhost", 2552) def login = chat ! Login(name) def logout = chat ! Logout(name) @@ -75,7 +71,7 @@ class Session(user: String, storage: ActorRef) extends Actor { private val loginTime = System.currentTimeMillis private var userLog: List[String] = Nil - log.info("New session for user [%s] has been created at [%s]", user, loginTime) + EventHandler.info(this, "New session for user [%s] has been created at [%s]".format(user, loginTime)) def receive = { case msg @ ChatMessage(from, message) => @@ -93,19 +89,18 @@ class Session(user: String, storage: ActorRef) extends Actor { trait ChatStorage extends Actor /** - * Redis-backed chat storage implementation. + * Memory-backed chat storage implementation. */ -class RedisChatStorage extends ChatStorage { +class MemoryChatStorage extends ChatStorage { self.lifeCycle = Permanent - val CHAT_LOG = "akka.chat.log" - private var chatLog = RedisStorage.getVector(CHAT_LOG) + private var chatLog = TransactionalVector[Array[Byte]]() - log.info("Redis-based chat storage is starting up...") + EventHandler.info(this, "Memory-based chat storage is starting up...") def receive = { case msg @ ChatMessage(from, message) => - log.debug("New chat message [%s]", message) + EventHandler.debug(this, "New chat message [%s]".format(message)) atomic { chatLog + message.getBytes("UTF-8") } case GetChatLog(_) => @@ -113,7 +108,7 @@ class RedisChatStorage extends ChatStorage { self.reply(ChatLog(messageList)) } - override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG) + override def postRestart(reason: Throwable) = chatLog = TransactionalVector() } /** @@ -128,13 +123,13 @@ trait SessionManagement { this: Actor => protected def sessionManagement: Receive = { case Login(username) => - log.info("User [%s] has logged in", username) + EventHandler.info(this, "User [%s] has logged in".format(username)) val session = actorOf(new Session(username, storage)) session.start sessions += (username -> session) case Logout(username) => - log.info("User [%s] has logged out", username) + EventHandler.info(this, "User [%s] has logged out".format(username)) val session = sessions(username) session.stop sessions -= username @@ -153,16 +148,25 @@ trait ChatManagement { this: Actor => val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map protected def chatManagement: Receive = { - case msg @ ChatMessage(from, _) => sessions(from) ! msg - case msg @ GetChatLog(from) => sessions(from) forward msg + case msg @ ChatMessage(from, _) => getSession(from).foreach(_ ! msg) + case msg @ GetChatLog(from) => getSession(from).foreach(_ forward msg) + } + + private def getSession(from: String) : Option[ActorRef] = { + if (sessions.contains(from)) + Some(sessions(from)) + else { + EventHandler.info(this, "Session expired for %s".format(from)) + None + } } } /** - * Creates and links a RedisChatStorage. + * Creates and links a MemoryChatStorage. */ -trait RedisChatStorageFactory { this: Actor => - val storage = this.self.spawnLink[RedisChatStorage] // starts and links ChatStorage +trait MemoryChatStorageFactory { this: Actor => + val storage = this.self.spawnLink[MemoryChatStorage] // starts and links ChatStorage } /** @@ -172,10 +176,10 @@ trait ChatServer extends Actor { self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000) val storage: ActorRef - log.info("Chat server is starting up...") + EventHandler.info(this, "Chat server is starting up...") // actor message handler - def receive = sessionManagement orElse chatManagement + def receive: Receive = sessionManagement orElse chatManagement // abstract methods to be defined somewhere else protected def chatManagement: Receive @@ -183,7 +187,7 @@ trait ChatServer extends Actor { protected def shutdownSessions(): Unit override def postStop = { - log.info("Chat server is shutting down...") + EventHandler.info(this, "Chat server is shutting down...") shutdownSessions self.unlink(storage) storage.stop @@ -201,28 +205,50 @@ class ChatService extends ChatServer with SessionManagement with ChatManagement with - RedisChatStorageFactory { + MemoryChatStorageFactory { override def preStart = { - RemoteNode.start("localhost", 2552) - RemoteNode.register("chat:service", self) + remote.start("localhost", 2552); + remote.register("chat:service", self) //Register the actor with the specified service id + } +} + +/** + * Test runner starting ChatService. + */ +object ServerRunner { + + def main(args: Array[String]): Unit = ServerRunner.run + + def run = { + actorOf[ChatService].start } } /** * Test runner emulating a chat session. */ -object Runner { +object ClientRunner { + + def main(args: Array[String]): Unit = ClientRunner.run + def run = { - val client = new ChatClient("jonas") - client.login + val client1 = new ChatClient("jonas") + client1.login + val client2 = new ChatClient("patrik") + client2.login - client.post("Hi there") - println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t")) + client1.post("Hi there") + println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t")) - client.post("Hi again") - println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t")) + client2.post("Hello") + println("CHAT LOG:\n\t" + client2.chatLog.log.mkString("\n\t")) - client.logout + client1.post("Hi again") + println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t")) + + client1.logout + client2.logout } } + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 061084bac0..fa2dd9268d 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -370,6 +370,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) + class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) + class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) { @@ -381,6 +383,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaSampleFSMProject(_), akka_actor) lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote", new AkkaSampleRemoteProject(_), akka_remote) + lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat", + new AkkaSampleChatProject(_), akka_remote) lazy val publishRelease = { val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)