diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala new file mode 100644 index 0000000000..c5621361fb --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisPubSubServer.scala @@ -0,0 +1,42 @@ +package se.scalablesolutions.akka.persistence.redis + +import se.scalablesolutions.akka.actor.Actor +import com.redis._ + +sealed trait Msg +case class Subscribe(channels: Array[String]) extends Msg +case class Register(callback: PubSubMessage => Any) extends Msg +case class Unsubscribe(channels: Array[String]) extends Msg +case object UnsubscribeAll extends Msg +case class Publish(channel: String, msg: String) extends Msg + +class Subscriber(client: RedisClient) extends Actor { + var callback: PubSubMessage => Any = { m => } + + def receive = { + case Subscribe(channels) => + client.subscribe(channels.head, channels.tail: _*)(callback) + reply(true) + + case Register(cb) => + callback = cb + reply(true) + + case Unsubscribe(channels) => + client.unsubscribe(channels.head, channels.tail: _*) + reply(true) + + case UnsubscribeAll => + client.unsubscribe + reply(true) + } +} + +class Publisher(client: RedisClient) extends Actor { + def receive = { + case Publish(channel, message) => + client.publish(channel, message) + reply(true) + } +} + diff --git a/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala b/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala new file mode 100644 index 0000000000..ee14c2880d --- /dev/null +++ b/akka-samples/akka-sample-pubsub/src/main/scala/RedisPubSub.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB . + */ + +package sample.pubsub + +import com.redis.{RedisClient, PubSubMessage, S, U, M} +import se.scalablesolutions.akka.persistence.redis._ + +/** + * Sample Akka application for Redis PubSub + * + * Prerequisite: Need Redis Server running (the version that supports pubsub) + * + * 1. Download redis from http://github.com/antirez/redis + * 2. build using "make" + * 3. Run server as ./redis-server + * + * For running this sample application :- + * + * 1. Open a shell and set AKKA_HOME to the distribution root + * 2. cd $AKKA_HOME + * 3. sbt console + * 4. import sample.pubsub._ + * 5. Sub.sub("a", "b") // starts Subscription server & subscribes to channels "a" and "b" + * + * 6. Open up another shell similarly as the above and set AKKA_HOME + * 7. cd $AKKA_HOME + * 8. sbt console + * 9. import sample.pubsub._ + * 10. Pub.publish("a", "hello") // the first shell should get the message + * 11. Pub.publish("c", "hi") // the first shell should NOT get this message + * + * 12. Open up a redis-client from where you installed redis and issue a publish command + * ./redis-cli publish a "hi there" ## the first shell should get the message + * + * 13. Go back to the first shell + * 14. Sub.unsub("a") // should unsubscribe the first shell from channel "a" + * + * 15. Study the callback function defined below. It supports many other message formats. + * In the second shell window do the following: + * scala> Pub.publish("b", "+c") // will subscribe the first window to channel "c" + * scala> Pub.publish("b", "+d") // will subscribe the first window to channel "d" + * scala> Pub.publish("b", "-c") // will unsubscribe the first window from channel "c" + * scala> Pub.publish("b", "exit") // will unsubscribe the first window from all channels + */ + +object Pub { + println("starting publishing service ..") + val r = new RedisClient("localhost", 6379) + val p = new Publisher(r) + p.start + + def publish(channel: String, message: String) = { + p ! Publish(channel, message) + } +} + +object Sub { + println("starting subscription service ..") + val r = new RedisClient("localhost", 6379) + val s = new Subscriber(r) + s.start + s ! Register(callback) + + def sub(channels: String*) = { + s ! Subscribe(channels.toArray) + } + + def unsub(channels: String*) = { + s ! Unsubscribe(channels.toArray) + } + + def callback(pubsub: PubSubMessage) = pubsub match { + case S(channel, no) => println("subscribed to " + channel + " and count = " + no) + case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) + case M(channel, msg) => + msg match { + // exit will unsubscribe from all channels and stop subscription service + case "exit" => + println("unsubscribe all ..") + r.unsubscribe + + // message "+x" will subscribe to channel x + case x if x startsWith "+" => + val s: Seq[Char] = x + s match { + case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } + } + + // message "-x" will unsubscribe from channel x + case x if x startsWith "-" => + val s: Seq[Char] = x + s match { + case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) + } + + // other message receive + case x => + println("received message on channel " + channel + " as : " + x) + } + } +} diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar index 0daede37f0..52125e6d52 100644 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar and b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 41a51b8c8c..b0466d5ec8 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -301,6 +301,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaSampleChatProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) + class AkkaSamplePubSubProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) class AkkaSampleLiftProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) { val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile" @@ -335,6 +336,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { class AkkaSamplesParentProject(info: ProjectInfo) extends ParentProject(info) { lazy val akka_sample_chat = project("akka-sample-chat", "akka-sample-chat", new AkkaSampleChatProject(_), akka_kernel) + lazy val akka_sample_pubsub = project("akka-sample-pubsub", "akka-sample-pubsub", + new AkkaSamplePubSubProject(_), akka_kernel) lazy val akka_sample_lift = project("akka-sample-lift", "akka-sample-lift", new AkkaSampleLiftProject(_), akka_kernel) lazy val akka_sample_rest_java = project("akka-sample-rest-java", "akka-sample-rest-java",