2009-08-02 12:33:42 +02:00
/* *
* Copyright ( C ) 2009 Scalable Solutions .
*/
2009-07-12 23:09:54 +02:00
package sample.scala
2009-12-18 21:26:03 +01:00
import se.scalablesolutions.akka.actor. { Transactor , SupervisorFactory , Actor }
2009-12-05 20:59:15 +01:00
import se.scalablesolutions.akka.state. { CassandraStorage , TransactionalState }
2009-09-03 11:02:21 +02:00
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
2009-07-26 19:12:22 +02:00
2009-10-14 12:59:05 +02:00
import java.lang.Integer
2009-12-07 08:13:34 +01:00
import java.nio.ByteBuffer
2009-08-02 12:58:20 +02:00
import javax.ws.rs.core.MultivaluedMap
2009-12-02 21:08:29 +01:00
import javax.ws.rs. { GET , POST , Path , Produces , WebApplicationException , Consumes , PathParam }
2009-07-12 23:09:54 +02:00
2009-11-25 01:58:35 +01:00
import org.atmosphere.annotation. { Broadcast , Suspend }
2009-08-02 12:58:20 +02:00
import org.atmosphere.util.XSSHtmlFilter
2009-12-07 08:13:34 +01:00
import org.atmosphere.cpr. { Broadcaster , BroadcastFilter }
2009-12-02 21:08:29 +01:00
import org.atmosphere.jersey.Broadcastable
2009-07-26 19:12:22 +02:00
2009-07-12 23:09:54 +02:00
class Boot {
2009-11-20 08:29:31 +01:00
val factory = SupervisorFactory (
SupervisorConfig (
2009-12-02 21:08:29 +01:00
RestartStrategy ( OneForOne , 3 , 100 , List ( classOf [ Exception ] ) ) ,
2009-11-20 08:29:31 +01:00
Supervise (
new SimpleService ,
LifeCycle ( Permanent ) ) : :
Supervise (
new Chat ,
LifeCycle ( Permanent ) ) : :
Supervise (
new PersistentSimpleService ,
2009-12-02 21:08:29 +01:00
LifeCycle ( Permanent ) ) : :
2009-12-18 21:26:03 +01:00
Supervise (
2009-12-02 21:08:29 +01:00
new PubSub ,
2009-11-20 08:29:31 +01:00
LifeCycle ( Permanent ) )
: : Nil ) )
factory . newInstance . start
2009-07-12 23:09:54 +02:00
}
/* *
* Try service out by invoking ( multiple times ) :
* < pre >
* curl http : //localhost:9998/scalacount
* </ pre >
* Or browse to the URL from a web browser .
2009-07-26 19:12:22 +02:00
*/
2009-07-12 23:09:54 +02:00
@Path ( "/scalacount" )
2009-12-18 21:26:03 +01:00
class SimpleService extends Transactor {
2009-08-28 17:40:06 +02:00
case object Tick
2009-10-23 22:37:28 +01:00
private val KEY = "COUNTER"
private var hasStartedTicking = false
2009-09-03 11:02:21 +02:00
private val storage = TransactionalState . newMap [ String , Integer ]
2009-08-28 17:40:06 +02:00
@GET
@Produces ( Array ( "text/html" ) )
def count = ( this !! Tick ) . getOrElse ( < error > Error in counter </ error > )
2009-11-20 08:29:31 +01:00
def receive = {
2009-08-28 17:40:06 +02:00
case Tick => if ( hasStartedTicking ) {
val counter = storage . get ( KEY ) . get . asInstanceOf [ Integer ] . intValue
storage . put ( KEY , new Integer ( counter + 1 ) )
reply ( < success > Tick : { counter + 1 } </ success > )
} else {
storage . put ( KEY , new Integer ( 0 ) )
hasStartedTicking = true
reply ( < success > Tick : 0 </ success > )
}
}
}
2009-12-18 21:26:03 +01:00
@Path ( "/pubsub/" )
class PubSub extends Actor {
case class Msg ( topic : String , message : String )
@GET
@Suspend
@Produces ( Array ( "text/plain;charset=ISO-8859-1" ) )
@Path ( "/topic/{topic}/" )
def subscribe ( @PathParam ( "topic" ) topic : Broadcaster ) : Broadcastable = new Broadcastable ( "" , topic )
@GET
@Broadcast
@Path ( "/topic/{topic}/{message}/" )
@Produces ( Array ( "text/plain;charset=ISO-8859-1" ) )
def say ( @PathParam ( "topic" ) topic : Broadcaster , @PathParam ( "message" ) message : String ) : Broadcastable = new Broadcastable ( message , topic )
def receive = { case _ => }
}
2009-08-28 17:40:06 +02:00
/* *
* Try service out by invoking ( multiple times ) :
* < pre >
* curl http : //localhost:9998/persistentscalacount
* </ pre >
* Or browse to the URL from a web browser .
*/
@Path ( "/persistentscalacount" )
class PersistentSimpleService extends Actor {
makeTransactionRequired
2009-08-01 17:40:16 +02:00
case object Tick
2009-10-23 22:37:28 +01:00
private val KEY = "COUNTER"
private var hasStartedTicking = false
2009-12-05 20:59:15 +01:00
private val storage = CassandraStorage . newMap
2009-08-01 17:40:16 +02:00
@GET
@Produces ( Array ( "text/html" ) )
def count = ( this !! Tick ) . getOrElse ( < error > Error in counter </ error > )
2009-11-20 08:29:31 +01:00
def receive = {
2009-08-01 17:40:16 +02:00
case Tick => if ( hasStartedTicking ) {
2009-12-05 20:59:15 +01:00
val bytes = storage . get ( KEY . getBytes ) . get
val counter = ByteBuffer . wrap ( bytes ) . getInt
storage . put ( KEY . getBytes , ByteBuffer . allocate ( 4 ) . putInt ( counter + 1 ) . array )
2009-08-02 12:33:42 +02:00
reply ( < success > Tick : { counter + 1 } </ success > )
2009-08-01 17:40:16 +02:00
} else {
2009-12-05 20:59:15 +01:00
storage . put ( KEY . getBytes , Array ( 0. toByte ) )
2009-08-01 17:40:16 +02:00
hasStartedTicking = true
reply ( < success > Tick : 0 </ success > )
2009-07-26 19:12:22 +02:00
}
2009-08-01 17:40:16 +02:00
}
2009-07-25 21:35:47 +02:00
}
2009-07-26 19:12:22 +02:00
@Path ( "/chat" )
2009-12-18 21:26:03 +01:00
class Chat extends Transactor {
2009-08-01 17:40:16 +02:00
case class Chat ( val who : String , val what : String , val msg : String )
2009-07-26 19:12:22 +02:00
2009-08-02 12:33:42 +02:00
@Suspend
@GET
@Produces ( Array ( "text/html" ) )
2009-10-12 23:14:56 +02:00
def suspend = {
val s = new StringBuilder
s append "<!-- "
for ( i <- 1 to 10 ) s append "Comet is a programming technique that enables web servers to send data to the client without having any need for the client to request it. "
s append " -->"
s toString
}
2009-08-02 12:33:42 +02:00
2009-11-20 08:29:31 +01:00
def receive = {
2009-08-01 17:40:16 +02:00
case Chat ( who , what , msg ) => {
what match {
case "login" => reply ( "System Message__" + who + " has joined." )
case "post" => reply ( "" + who + "__" + msg )
case _ => throw new WebApplicationException ( 422 )
}
}
case x => log . info ( "recieve unknown: " + x )
}
2009-09-22 23:49:34 +02:00
@Broadcast ( Array ( classOf [ XSSHtmlFilter ] , classOf [ JsonpFilter ] ) )
2009-08-01 17:40:16 +02:00
@Consumes ( Array ( "application/x-www-form-urlencoded" ) )
@POST
@Produces ( Array ( "text/html" ) )
2009-12-03 09:14:39 +01:00
def publishMessage ( form : MultivaluedMap [ String , String ] ) =
( this !! Chat ( form . getFirst ( "name" ) ,
form . getFirst ( "action" ) ,
form . getFirst ( "message" ) ) ) . getOrElse ( "System__error" )
2009-08-01 17:40:16 +02:00
}
2009-07-29 20:17:40 +02:00
2009-08-01 17:40:16 +02:00
class JsonpFilter extends BroadcastFilter [ String ] with Logging {
2009-09-22 23:49:34 +02:00
def filter ( an : AnyRef ) = {
val m = an . toString
2009-08-01 17:40:16 +02:00
var name = m
var message = ""
2009-07-29 20:17:40 +02:00
2009-08-01 17:40:16 +02:00
if ( m . indexOf ( "__" ) > 0 ) {
name = m . substring ( 0 , m . indexOf ( "__" ) )
message = m . substring ( m . indexOf ( "__" ) + 2 )
}
2009-08-02 12:58:20 +02:00
( "<script type='text/javascript'>\n (window.app || window.parent.app).update({ name: \"" +
2009-12-03 09:14:39 +01:00
name + "\", message: \"" + message + "\" }); \n</script>\n" )
2009-08-01 17:40:16 +02:00
}
2009-08-02 16:14:12 +02:00
}