Redis client now implements pubsub. Also included a sample app for RedisPubSub in akka-samples

This commit is contained in:
Debasish Ghosh 2010-04-14 23:50:26 +05:30
parent 4cedc47ce0
commit 6b83b84ee9
4 changed files with 148 additions and 0 deletions

View file

@ -0,0 +1,42 @@
package se.scalablesolutions.akka.persistence.redis
import se.scalablesolutions.akka.actor.Actor
import com.redis._
sealed trait Msg
case class Subscribe(channels: Array[String]) extends Msg
case class Register(callback: PubSubMessage => Any) extends Msg
case class Unsubscribe(channels: Array[String]) extends Msg
case object UnsubscribeAll extends Msg
case class Publish(channel: String, msg: String) extends Msg
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
reply(true)
case Register(cb) =>
callback = cb
reply(true)
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
reply(true)
case UnsubscribeAll =>
client.unsubscribe
reply(true)
}
}
class Publisher(client: RedisClient) extends Actor {
def receive = {
case Publish(channel, message) =>
client.publish(channel, message)
reply(true)
}
}

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>.
*/
package sample.pubsub
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import se.scalablesolutions.akka.persistence.redis._
/**
* Sample Akka application for Redis PubSub
*
* Prerequisite: Need Redis Server running (the version that supports pubsub)
*
* 1. Download redis from http://github.com/antirez/redis
* 2. build using "make"
* 3. Run server as ./redis-server
*
* For running this sample application :-
*
* 1. Open a shell and set AKKA_HOME to the distribution root
* 2. cd $AKKA_HOME
* 3. sbt console
* 4. import sample.pubsub._
* 5. Sub.sub("a", "b") // starts Subscription server & subscribes to channels "a" and "b"
*
* 6. Open up another shell similarly as the above and set AKKA_HOME
* 7. cd $AKKA_HOME
* 8. sbt console
* 9. import sample.pubsub._
* 10. Pub.publish("a", "hello") // the first shell should get the message
* 11. Pub.publish("c", "hi") // the first shell should NOT get this message
*
* 12. Open up a redis-client from where you installed redis and issue a publish command
* ./redis-cli publish a "hi there" ## the first shell should get the message
*
* 13. Go back to the first shell
* 14. Sub.unsub("a") // should unsubscribe the first shell from channel "a"
*
* 15. Study the callback function defined below. It supports many other message formats.
* In the second shell window do the following:
* scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c"
* scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d"
* scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c"
* scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels
*/
object Pub {
println("starting publishing service ..")
val r = new RedisClient("localhost", 6379)
val p = new Publisher(r)
p.start
def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
object Sub {
println("starting subscription service ..")
val r = new RedisClient("localhost", 6379)
val s = new Subscriber(r)
s.start
s ! Register(callback)
def sub(channels: String*) = {
s ! Subscribe(channels.toArray)
}
def unsub(channels: String*) = {
s ! Unsubscribe(channels.toArray)
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
println("unsubscribe all ..")
r.unsubscribe
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
val s: Seq[Char] = x
s match {
case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
}
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
val s: Seq[Char] = x
s match {
case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
}
// other message receive
case x =>
println("received message on channel " + channel + " as : " + x)
}
}
}

View file

@ -301,6 +301,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaSampleLiftProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
@ -335,6 +336,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) {
lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat",
new AkkaSampleChatProject(_), akka_kernel)
lazy val akka_sample_pubsub = project("akka-sample-pubsub", "akka-sample-pubsub",
new AkkaSamplePubSubProject(_), akka_kernel)
lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift",
new AkkaSampleLiftProject(_), akka_kernel)
lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",