Adjusted chat sample to run with latest, and without Redis
This commit is contained in:
parent
f9ced6820b
commit
c882b2c5e1
4 changed files with 88 additions and 112 deletions
|
|
@ -1,48 +0,0 @@
|
|||
require 'buildr/scala'
|
||||
|
||||
VERSION_NUMBER = "0.6"
|
||||
GROUP = "se.scalablesolutions.akka"
|
||||
|
||||
repositories.remote << "http://www.ibiblio.org/maven2/"
|
||||
repositories.remote << "http://www.lag.net/repo"
|
||||
repositories.remote << "http://multiverse.googlecode.com/svn/maven-repository/releases"
|
||||
|
||||
AKKA = group('akka-remote', 'akka-comet', 'akka-util','akka-kernel', 'akka-rest', 'akka-util-java',
|
||||
'akka-security','akka-persistence-common', 'akka-persistence-redis',
|
||||
'akka-amqp',
|
||||
:under=> 'se.scalablesolutions.akka',
|
||||
:version => '0.6')
|
||||
ASPECTJ = "org.codehaus.aspectwerkz:aspectwerkz-nodeps-jdk5:jar:2.1"
|
||||
SBINARY = "sbinary:sbinary:jar:0.3"
|
||||
COMMONS_IO = "commons-io:commons-io:jar:1.4"
|
||||
CONFIGGY = "net.lag:configgy:jar:1.4.7"
|
||||
JACKSON = group('jackson-core-asl', 'jackson-mapper-asl',
|
||||
:under=> 'org.codehaus.jackson',
|
||||
:version => '1.2.1')
|
||||
MULTIVERSE = "org.multiverse:multiverse-alpha:jar:jar-with-dependencies:0.3"
|
||||
NETTY = "org.jboss.netty:netty:jar:3.2.0.ALPHA2"
|
||||
PROTOBUF = "com.google.protobuf:protobuf-java:jar:2.2.0"
|
||||
REDIS = "com.redis:redisclient:jar:1.0.1"
|
||||
SJSON = "sjson.json:sjson:jar:0.3"
|
||||
|
||||
Project.local_task "run"
|
||||
|
||||
desc "Akka Chat Sample Module"
|
||||
define "akka-sample-chat" do
|
||||
project.version = VERSION_NUMBER
|
||||
project.group = GROUP
|
||||
|
||||
compile.with AKKA, CONFIGGY
|
||||
|
||||
p artifact(MULTIVERSE).to_s
|
||||
|
||||
package(:jar)
|
||||
|
||||
task "run" do
|
||||
Java.java "scala.tools.nsc.MainGenericRunner",
|
||||
:classpath => [ compile.dependencies, compile.target,
|
||||
ASPECTJ, COMMONS_IO, JACKSON, NETTY, MULTIVERSE, PROTOBUF, REDIS,
|
||||
SBINARY, SJSON],
|
||||
:java_args => ["-server"]
|
||||
end
|
||||
end
|
||||
|
|
@ -1,32 +1,26 @@
|
|||
Akka Chat Client/Server Sample Application
|
||||
|
||||
First we need to download, build and start up Redis:
|
||||
How to run the sample:
|
||||
|
||||
1. Download Redis from http://code.google.com/p/redis/downloads/list.
|
||||
2. Step into the distribution.
|
||||
3. Build: ‘make install’.
|
||||
4. Run: ‘./redis-server’.
|
||||
For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
|
||||
|
||||
Then to run the sample:
|
||||
|
||||
1. Install the Redis network storage. Download it from [http://code.google.com/p/redis/].
|
||||
2. Open up a shell and start up an instance of Redis.
|
||||
3. Fire up two shells. For each of them:
|
||||
1. Fire up two shells. For each of them:
|
||||
- Step down into to the root of the Akka distribution.
|
||||
- Set 'export AKKA_HOME=<root of distribution>.
|
||||
- Run 'sbt console' to start up a REPL (interpreter).
|
||||
4. In the first REPL you get execute:
|
||||
2. In the first REPL you get execute:
|
||||
- scala> import sample.chat._
|
||||
- scala> import akka.actor.Actor._
|
||||
- scala> val chatService = actorOf[ChatService].start
|
||||
5. In the second REPL you get execute:
|
||||
3. In the second REPL you get execute:
|
||||
- scala> import sample.chat._
|
||||
- scala> Runner.run
|
||||
6. See the chat simulation run.
|
||||
7. Run it again to see full speed after first initialization.
|
||||
|
||||
Now you could test client reconnect by killing the console running the ChatService and start it up again. See the client reconnect take place in the REPL shell.
|
||||
- scala> ClientRunner.run
|
||||
4. See the chat simulation run.
|
||||
5. Run it again to see full speed after first initialization.
|
||||
6. In the client REPL, or in a new REPL, you can also create your own client
|
||||
- scala> import sample.chat._
|
||||
- scala> val myClient = new ChatClient("<your name>")
|
||||
- scala> myClient.login
|
||||
- scala> myClient.post("Can I join?")
|
||||
- scala> println("CHAT LOG:\n\t" + myClient.chatLog.log.mkString("\n\t"))
|
||||
|
||||
That’s it. Have fun.
|
||||
|
||||
|
|
|
|||
|
|
@ -6,27 +6,16 @@ package sample.chat
|
|||
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
import akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor}
|
||||
import akka.remote.{RemoteNode, RemoteClient}
|
||||
import akka.persistence.common.PersistentVector
|
||||
import akka.persistence.redis.RedisStorage
|
||||
import akka.actor.{SupervisorFactory, Actor, ActorRef}
|
||||
import akka.stm._
|
||||
import akka.config.Supervision.{OneForOneStrategy,Permanent}
|
||||
import akka.util.Logging
|
||||
import Actor._
|
||||
import akka.event.EventHandler
|
||||
|
||||
/******************************************************************************
|
||||
Akka Chat Client/Server Sample Application
|
||||
|
||||
First we need to download, build and start up Redis:
|
||||
|
||||
1. Download Redis from http://code.google.com/p/redis/downloads/list.
|
||||
2. Step into the distribution.
|
||||
3. Build: ‘make install’.
|
||||
4. Run: ‘./redis-server’.
|
||||
For details on how to set up Redis server have a look at http://code.google.com/p/redis/wiki/QuickStart.
|
||||
|
||||
Then to run the sample:
|
||||
How to run the sample:
|
||||
|
||||
1. Fire up two shells. For each of them:
|
||||
- Step down into to the root of the Akka distribution.
|
||||
|
|
@ -38,9 +27,16 @@ Then to run the sample:
|
|||
- scala> val chatService = actorOf[ChatService].start
|
||||
3. In the second REPL you get execute:
|
||||
- scala> import sample.chat._
|
||||
- scala> Runner.run
|
||||
- scala> ClientRunner.run
|
||||
4. See the chat simulation run.
|
||||
5. Run it again to see full speed after first initialization.
|
||||
6. In the client REPL, or in a new REPL, you can also create your own client
|
||||
- scala> import sample.chat._
|
||||
- scala> val myClient = new ChatClient("<your name>")
|
||||
- scala> myClient.login
|
||||
- scala> myClient.post("Can I join?")
|
||||
- scala> println("CHAT LOG:\n\t" + myClient.chatLog.log.mkString("\n\t"))
|
||||
|
||||
|
||||
That’s it. Have fun.
|
||||
|
||||
|
|
@ -60,7 +56,7 @@ case class ChatMessage(from: String, message: String) extends Event
|
|||
* Chat client.
|
||||
*/
|
||||
class ChatClient(val name: String) {
|
||||
val chat = RemoteClient.actorFor("chat:service", "localhost", 2552)
|
||||
val chat = Actor.remote.actorFor("chat:service", "localhost", 2552)
|
||||
|
||||
def login = chat ! Login(name)
|
||||
def logout = chat ! Logout(name)
|
||||
|
|
@ -75,7 +71,7 @@ class Session(user: String, storage: ActorRef) extends Actor {
|
|||
private val loginTime = System.currentTimeMillis
|
||||
private var userLog: List[String] = Nil
|
||||
|
||||
log.info("New session for user [%s] has been created at [%s]", user, loginTime)
|
||||
EventHandler.info(this, "New session for user [%s] has been created at [%s]".format(user, loginTime))
|
||||
|
||||
def receive = {
|
||||
case msg @ ChatMessage(from, message) =>
|
||||
|
|
@ -93,19 +89,18 @@ class Session(user: String, storage: ActorRef) extends Actor {
|
|||
trait ChatStorage extends Actor
|
||||
|
||||
/**
|
||||
* Redis-backed chat storage implementation.
|
||||
* Memory-backed chat storage implementation.
|
||||
*/
|
||||
class RedisChatStorage extends ChatStorage {
|
||||
class MemoryChatStorage extends ChatStorage {
|
||||
self.lifeCycle = Permanent
|
||||
val CHAT_LOG = "akka.chat.log"
|
||||
|
||||
private var chatLog = RedisStorage.getVector(CHAT_LOG)
|
||||
private var chatLog = TransactionalVector[Array[Byte]]()
|
||||
|
||||
log.info("Redis-based chat storage is starting up...")
|
||||
EventHandler.info(this, "Memory-based chat storage is starting up...")
|
||||
|
||||
def receive = {
|
||||
case msg @ ChatMessage(from, message) =>
|
||||
log.debug("New chat message [%s]", message)
|
||||
EventHandler.debug(this, "New chat message [%s]".format(message))
|
||||
atomic { chatLog + message.getBytes("UTF-8") }
|
||||
|
||||
case GetChatLog(_) =>
|
||||
|
|
@ -113,7 +108,7 @@ class RedisChatStorage extends ChatStorage {
|
|||
self.reply(ChatLog(messageList))
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable) = chatLog = RedisStorage.getVector(CHAT_LOG)
|
||||
override def postRestart(reason: Throwable) = chatLog = TransactionalVector()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -128,13 +123,13 @@ trait SessionManagement { this: Actor =>
|
|||
|
||||
protected def sessionManagement: Receive = {
|
||||
case Login(username) =>
|
||||
log.info("User [%s] has logged in", username)
|
||||
EventHandler.info(this, "User [%s] has logged in".format(username))
|
||||
val session = actorOf(new Session(username, storage))
|
||||
session.start
|
||||
sessions += (username -> session)
|
||||
|
||||
case Logout(username) =>
|
||||
log.info("User [%s] has logged out", username)
|
||||
EventHandler.info(this, "User [%s] has logged out".format(username))
|
||||
val session = sessions(username)
|
||||
session.stop
|
||||
sessions -= username
|
||||
|
|
@ -153,16 +148,25 @@ trait ChatManagement { this: Actor =>
|
|||
val sessions: HashMap[String, ActorRef] // needs someone to provide the Session map
|
||||
|
||||
protected def chatManagement: Receive = {
|
||||
case msg @ ChatMessage(from, _) => sessions(from) ! msg
|
||||
case msg @ GetChatLog(from) => sessions(from) forward msg
|
||||
case msg @ ChatMessage(from, _) => getSession(from).foreach(_ ! msg)
|
||||
case msg @ GetChatLog(from) => getSession(from).foreach(_ forward msg)
|
||||
}
|
||||
|
||||
private def getSession(from: String) : Option[ActorRef] = {
|
||||
if (sessions.contains(from))
|
||||
Some(sessions(from))
|
||||
else {
|
||||
EventHandler.info(this, "Session expired for %s".format(from))
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates and links a RedisChatStorage.
|
||||
* Creates and links a MemoryChatStorage.
|
||||
*/
|
||||
trait RedisChatStorageFactory { this: Actor =>
|
||||
val storage = this.self.spawnLink[RedisChatStorage] // starts and links ChatStorage
|
||||
trait MemoryChatStorageFactory { this: Actor =>
|
||||
val storage = this.self.spawnLink[MemoryChatStorage] // starts and links ChatStorage
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -172,10 +176,10 @@ trait ChatServer extends Actor {
|
|||
self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5, 5000)
|
||||
val storage: ActorRef
|
||||
|
||||
log.info("Chat server is starting up...")
|
||||
EventHandler.info(this, "Chat server is starting up...")
|
||||
|
||||
// actor message handler
|
||||
def receive = sessionManagement orElse chatManagement
|
||||
def receive: Receive = sessionManagement orElse chatManagement
|
||||
|
||||
// abstract methods to be defined somewhere else
|
||||
protected def chatManagement: Receive
|
||||
|
|
@ -183,7 +187,7 @@ trait ChatServer extends Actor {
|
|||
protected def shutdownSessions(): Unit
|
||||
|
||||
override def postStop = {
|
||||
log.info("Chat server is shutting down...")
|
||||
EventHandler.info(this, "Chat server is shutting down...")
|
||||
shutdownSessions
|
||||
self.unlink(storage)
|
||||
storage.stop
|
||||
|
|
@ -201,28 +205,50 @@ class ChatService extends
|
|||
ChatServer with
|
||||
SessionManagement with
|
||||
ChatManagement with
|
||||
RedisChatStorageFactory {
|
||||
MemoryChatStorageFactory {
|
||||
override def preStart = {
|
||||
RemoteNode.start("localhost", 2552)
|
||||
RemoteNode.register("chat:service", self)
|
||||
remote.start("localhost", 2552);
|
||||
remote.register("chat:service", self) //Register the actor with the specified service id
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test runner starting ChatService.
|
||||
*/
|
||||
object ServerRunner {
|
||||
|
||||
def main(args: Array[String]): Unit = ServerRunner.run
|
||||
|
||||
def run = {
|
||||
actorOf[ChatService].start
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test runner emulating a chat session.
|
||||
*/
|
||||
object Runner {
|
||||
object ClientRunner {
|
||||
|
||||
def main(args: Array[String]): Unit = ClientRunner.run
|
||||
|
||||
def run = {
|
||||
val client = new ChatClient("jonas")
|
||||
|
||||
client.login
|
||||
val client1 = new ChatClient("jonas")
|
||||
client1.login
|
||||
val client2 = new ChatClient("patrik")
|
||||
client2.login
|
||||
|
||||
client.post("Hi there")
|
||||
println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t"))
|
||||
client1.post("Hi there")
|
||||
println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t"))
|
||||
|
||||
client.post("Hi again")
|
||||
println("CHAT LOG:\n\t" + client.chatLog.log.mkString("\n\t"))
|
||||
client2.post("Hello")
|
||||
println("CHAT LOG:\n\t" + client2.chatLog.log.mkString("\n\t"))
|
||||
|
||||
client.logout
|
||||
client1.post("Hi again")
|
||||
println("CHAT LOG:\n\t" + client1.chatLog.log.mkString("\n\t"))
|
||||
|
||||
client1.logout
|
||||
client2.logout
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -370,6 +370,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
class AkkaSampleRemoteProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
|
||||
|
||||
class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
|
||||
|
||||
class AkkaSampleFSMProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
|
||||
|
||||
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {
|
||||
|
|
@ -381,6 +383,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
new AkkaSampleFSMProject(_), akka_actor)
|
||||
lazy val akka_sample_remote = project("akka-sample-remote", "akka-sample-remote",
|
||||
new AkkaSampleRemoteProject(_), akka_remote)
|
||||
lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
|
||||
new AkkaSampleChatProject(_), akka_remote)
|
||||
|
||||
lazy val publishRelease = {
|
||||
val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue