fixing a screwy merge from master... readding files git deleted for some unknown reason
This commit is contained in:
parent
1a23d36297
commit
adf8b63469
7 changed files with 818 additions and 208 deletions
77
akka-http/src/main/scala/AkkaLoader.scala
Normal file
77
akka-http/src/main/scala/AkkaLoader.scala
Normal file
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.servlet
|
||||||
|
|
||||||
|
import akka.config.Config
|
||||||
|
import akka.util.{Logging, Bootable}
|
||||||
|
import akka.actor.Actor
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This class is responsible for booting up a stack of bundles and then shutting them down
|
||||||
|
*/
|
||||||
|
class AkkaLoader extends Logging {
|
||||||
|
@volatile private var hasBooted = false
|
||||||
|
|
||||||
|
@volatile private var _bundles: Option[Bootable] = None
|
||||||
|
|
||||||
|
def bundles = _bundles;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Boot initializes the specified bundles
|
||||||
|
*/
|
||||||
|
def boot(withBanner: Boolean, b : Bootable): Unit = synchronized {
|
||||||
|
if (!hasBooted) {
|
||||||
|
if (withBanner) printBanner
|
||||||
|
log.info("Starting Akka...")
|
||||||
|
b.onLoad
|
||||||
|
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
|
||||||
|
log.info("Akka started successfully")
|
||||||
|
hasBooted = true
|
||||||
|
_bundles = Some(b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Shutdown, well, shuts down the bundles used in boot
|
||||||
|
*/
|
||||||
|
def shutdown = synchronized {
|
||||||
|
if (hasBooted) {
|
||||||
|
log.info("Shutting down Akka...")
|
||||||
|
_bundles.foreach(_.onUnload)
|
||||||
|
_bundles = None
|
||||||
|
Actor.shutdownHook.run
|
||||||
|
log.info("Akka succesfully shut down")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def printBanner = {
|
||||||
|
log.info("==================================================")
|
||||||
|
log.info(" t")
|
||||||
|
log.info(" t t t")
|
||||||
|
log.info(" t t tt t")
|
||||||
|
log.info(" tt t t tt t")
|
||||||
|
log.info(" t ttttttt t ttt t")
|
||||||
|
log.info(" t tt ttt t ttt t")
|
||||||
|
log.info(" t t ttt t ttt t t")
|
||||||
|
log.info(" tt t ttt ttt ttt t")
|
||||||
|
log.info(" t t ttt ttt t tt t")
|
||||||
|
log.info(" t ttt ttt t t")
|
||||||
|
log.info(" tt ttt ttt t")
|
||||||
|
log.info(" ttt ttt")
|
||||||
|
log.info(" tttttttt ttt ttt ttt ttt tttttttt")
|
||||||
|
log.info(" ttt tt ttt ttt ttt ttt ttt ttt")
|
||||||
|
log.info(" ttt ttt ttt ttt ttt ttt ttt ttt")
|
||||||
|
log.info(" ttt ttt ttt ttt ttt tt ttt ttt")
|
||||||
|
log.info(" tttt ttttttttt tttttttt tttt")
|
||||||
|
log.info(" ttttttttt ttt ttt ttt ttt ttttttttt")
|
||||||
|
log.info(" ttt ttt ttt ttt ttt ttt ttt ttt")
|
||||||
|
log.info(" ttt ttt ttt ttt ttt ttt ttt ttt")
|
||||||
|
log.info(" ttt tt ttt ttt ttt ttt ttt ttt")
|
||||||
|
log.info(" tttttttt ttt ttt ttt ttt tttttttt")
|
||||||
|
log.info("==================================================")
|
||||||
|
log.info(" Running version %s", Config.VERSION)
|
||||||
|
log.info("==================================================")
|
||||||
|
}
|
||||||
|
}
|
||||||
29
akka-http/src/main/scala/DefaultAkkaLoader.scala
Normal file
29
akka-http/src/main/scala/DefaultAkkaLoader.scala
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http
|
||||||
|
|
||||||
|
import akka.config.Config
|
||||||
|
import akka.util.{Logging, Bootable}
|
||||||
|
import akka.camel.CamelService
|
||||||
|
import akka.remote.BootableRemoteActorService
|
||||||
|
import akka.actor.BootableActorLoaderService
|
||||||
|
import akka.servlet.AkkaLoader
|
||||||
|
|
||||||
|
class DefaultAkkaLoader extends AkkaLoader {
|
||||||
|
def boot(): Unit = boot(true,
|
||||||
|
new EmbeddedAppServer with BootableActorLoaderService
|
||||||
|
with BootableRemoteActorService
|
||||||
|
with CamelService)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Can be used to boot Akka
|
||||||
|
*
|
||||||
|
* java -cp ... akka.http.Main
|
||||||
|
*/
|
||||||
|
object Main extends DefaultAkkaLoader {
|
||||||
|
def main(args: Array[String]) = boot
|
||||||
|
}
|
||||||
72
akka-http/src/main/scala/EmbeddedAppServer.scala
Normal file
72
akka-http/src/main/scala/EmbeddedAppServer.scala
Normal file
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http
|
||||||
|
|
||||||
|
import javax.ws.rs.core.UriBuilder
|
||||||
|
import javax.servlet.ServletConfig
|
||||||
|
import java.io.File
|
||||||
|
|
||||||
|
import akka.actor.BootableActorLoaderService
|
||||||
|
import akka.util.{Bootable, Logging}
|
||||||
|
//import akka.comet.AkkaServlet
|
||||||
|
|
||||||
|
import org.eclipse.jetty.xml.XmlConfiguration
|
||||||
|
import org.eclipse.jetty.server.{Handler, Server}
|
||||||
|
import org.eclipse.jetty.server.handler.{HandlerList, HandlerCollection, ContextHandler}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles the Akka Comet Support (load/unload)
|
||||||
|
*/
|
||||||
|
trait EmbeddedAppServer extends Bootable with Logging {
|
||||||
|
self : BootableActorLoaderService =>
|
||||||
|
|
||||||
|
import akka.config.Config._
|
||||||
|
|
||||||
|
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
|
||||||
|
val REST_PORT = config.getInt("akka.rest.port", 9998)
|
||||||
|
|
||||||
|
protected var server: Option[Server] = None
|
||||||
|
|
||||||
|
abstract override def onLoad = {
|
||||||
|
super.onLoad
|
||||||
|
if (config.getBool("akka.rest.service", true)) {
|
||||||
|
log.info("Attempting to start Akka REST service (Jersey)")
|
||||||
|
|
||||||
|
System.setProperty("jetty.port",REST_PORT.toString)
|
||||||
|
System.setProperty("jetty.host",REST_HOSTNAME)
|
||||||
|
System.setProperty("jetty.home",HOME.getOrElse(throwNoAkkaHomeException) + "/deploy/root")
|
||||||
|
|
||||||
|
val configuration = new XmlConfiguration(
|
||||||
|
new File(HOME.getOrElse(throwNoAkkaHomeException) + "/config/microkernel-server.xml").toURI.toURL)
|
||||||
|
|
||||||
|
server = Option(configuration.configure.asInstanceOf[Server]) map { s => //Set the correct classloader to our contexts
|
||||||
|
applicationLoader foreach { loader =>
|
||||||
|
//We need to provide the correct classloader to the servlets
|
||||||
|
def setClassLoader(handlers: Seq[Handler]): Unit = {
|
||||||
|
handlers foreach {
|
||||||
|
case c: ContextHandler => c.setClassLoader(loader)
|
||||||
|
case c: HandlerCollection => setClassLoader(c.getHandlers)
|
||||||
|
case _ =>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setClassLoader(s.getHandlers)
|
||||||
|
}
|
||||||
|
//Start the server
|
||||||
|
s.start()
|
||||||
|
s
|
||||||
|
}
|
||||||
|
log.info("Akka REST service started (Jersey)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract override def onUnload = {
|
||||||
|
super.onUnload
|
||||||
|
server foreach { t => {
|
||||||
|
log.info("Shutting down REST service (Jersey)")
|
||||||
|
t.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
34
akka-http/src/main/scala/Initializer.scala
Normal file
34
akka-http/src/main/scala/Initializer.scala
Normal file
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.servlet
|
||||||
|
|
||||||
|
import akka.remote.BootableRemoteActorService
|
||||||
|
import akka.actor.BootableActorLoaderService
|
||||||
|
import akka.camel.CamelService
|
||||||
|
import akka.config.Config
|
||||||
|
import akka.util.{Logging, Bootable}
|
||||||
|
|
||||||
|
import javax.servlet.{ServletContextListener, ServletContextEvent}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class can be added to web.xml mappings as a listener to start and postStop Akka.
|
||||||
|
*
|
||||||
|
*<web-app>
|
||||||
|
* ...
|
||||||
|
* <listener>
|
||||||
|
* <listener-class>akka.servlet.Initializer</listener-class>
|
||||||
|
* </listener>
|
||||||
|
* ...
|
||||||
|
*</web-app>
|
||||||
|
*/
|
||||||
|
class Initializer extends ServletContextListener {
|
||||||
|
lazy val loader = new AkkaLoader
|
||||||
|
|
||||||
|
def contextDestroyed(e: ServletContextEvent): Unit =
|
||||||
|
loader.shutdown
|
||||||
|
|
||||||
|
def contextInitialized(e: ServletContextEvent): Unit =
|
||||||
|
loader.boot(true, new BootableActorLoaderService with BootableRemoteActorService with CamelService)
|
||||||
|
}
|
||||||
41
akka-http/src/main/scala/ListWriter.scala
Normal file
41
akka-http/src/main/scala/ListWriter.scala
Normal file
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
package akka.rest
|
||||||
|
|
||||||
|
import java.io.OutputStream
|
||||||
|
import akka.serialization.Serializer
|
||||||
|
import javax.ws.rs.core.{MultivaluedMap, MediaType}
|
||||||
|
import javax.ws.rs.ext.{MessageBodyWriter, Provider}
|
||||||
|
import javax.ws.rs.Produces
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes Lists of JSON serializable objects.
|
||||||
|
*/
|
||||||
|
@Provider
|
||||||
|
@Produces(Array("application/json"))
|
||||||
|
class ListWriter extends MessageBodyWriter[List[_]] {
|
||||||
|
|
||||||
|
def isWriteable(aClass: Class[_],
|
||||||
|
aType: java.lang.reflect.Type,
|
||||||
|
annotations: Array[java.lang.annotation.Annotation],
|
||||||
|
mediaType: MediaType) =
|
||||||
|
classOf[List[_]].isAssignableFrom(aClass) || aClass == ::.getClass
|
||||||
|
|
||||||
|
def getSize(list: List[_],
|
||||||
|
aClass: Class[_],
|
||||||
|
aType: java.lang.reflect.Type,
|
||||||
|
annotations: Array[java.lang.annotation.Annotation],
|
||||||
|
mediaType: MediaType) =
|
||||||
|
-1L
|
||||||
|
|
||||||
|
def writeTo(list: List[_],
|
||||||
|
aClass: Class[_],
|
||||||
|
aType: java.lang.reflect.Type,
|
||||||
|
annotations: Array[java.lang.annotation.Annotation],
|
||||||
|
mediaType: MediaType,
|
||||||
|
stringObjectMultivaluedMap: MultivaluedMap[String, Object],
|
||||||
|
outputStream: OutputStream): Unit =
|
||||||
|
if (list.isEmpty) outputStream.write(" ".getBytes)
|
||||||
|
else outputStream.write(Serializer.ScalaJSON.toBinary(list))
|
||||||
|
}
|
||||||
565
akka-http/src/main/scala/Security.scala
Normal file
565
akka-http/src/main/scala/Security.scala
Normal file
|
|
@ -0,0 +1,565 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2007-2008 WorldWide Conferencing, LLC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions
|
||||||
|
* and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AKKA AAS (Authentication and Authorization Service)
|
||||||
|
* Rework of lift's (www.liftweb.com) HTTP Authentication module
|
||||||
|
* All cred to the Lift team (www.liftweb.com), especially David Pollak and Tim Perrett
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.security
|
||||||
|
|
||||||
|
import akka.actor.{Scheduler, Actor, ActorRef, ActorRegistry, IllegalActorStateException}
|
||||||
|
import akka.actor.Actor._
|
||||||
|
import akka.config.Config
|
||||||
|
import akka.util.Logging
|
||||||
|
|
||||||
|
import com.sun.jersey.api.model.AbstractMethod
|
||||||
|
import com.sun.jersey.spi.container.{ResourceFilterFactory, ContainerRequest, ContainerRequestFilter, ContainerResponse, ContainerResponseFilter, ResourceFilter}
|
||||||
|
import com.sun.jersey.core.util.Base64
|
||||||
|
|
||||||
|
import javax.ws.rs.core.{SecurityContext, Context, Response}
|
||||||
|
import javax.ws.rs.WebApplicationException
|
||||||
|
import javax.annotation.security.{DenyAll, PermitAll, RolesAllowed}
|
||||||
|
import java.security.Principal
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
case object OK
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Authenticate represents a message to authenticate a request
|
||||||
|
*/
|
||||||
|
case class Authenticate(val req: ContainerRequest, val rolesAllowed: List[String])
|
||||||
|
|
||||||
|
/**
|
||||||
|
* User info represents a sign-on with associated credentials/roles
|
||||||
|
*/
|
||||||
|
case class UserInfo(val username: String, val password: String, val roles: List[String])
|
||||||
|
|
||||||
|
trait Credentials
|
||||||
|
|
||||||
|
case class BasicCredentials(username: String, password: String) extends Credentials
|
||||||
|
|
||||||
|
case class DigestCredentials(method: String,
|
||||||
|
userName: String,
|
||||||
|
realm: String,
|
||||||
|
nonce: String,
|
||||||
|
uri: String,
|
||||||
|
qop: String,
|
||||||
|
nc: String,
|
||||||
|
cnonce: String,
|
||||||
|
response: String,
|
||||||
|
opaque: String) extends Credentials
|
||||||
|
|
||||||
|
case class SpnegoCredentials(token: Array[Byte]) extends Credentials
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Jersey Filter for invocation intercept and authorization/authentication
|
||||||
|
*/
|
||||||
|
class AkkaSecurityFilterFactory extends ResourceFilterFactory with Logging {
|
||||||
|
class Filter(actor: ActorRef, rolesAllowed: Option[List[String]])
|
||||||
|
extends ResourceFilter with ContainerRequestFilter with Logging {
|
||||||
|
|
||||||
|
override def getRequestFilter: ContainerRequestFilter = this
|
||||||
|
|
||||||
|
override def getResponseFilter: ContainerResponseFilter = null
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Here's where the magic happens. The request is authenticated by
|
||||||
|
* sending a request for authentication to the configured authenticator actor
|
||||||
|
*/
|
||||||
|
override def filter(request: ContainerRequest): ContainerRequest =
|
||||||
|
rolesAllowed match {
|
||||||
|
case Some(roles) => {
|
||||||
|
val result = (authenticator !! Authenticate(request, roles)).as[AnyRef]
|
||||||
|
result match {
|
||||||
|
case Some(OK) => request
|
||||||
|
case Some(r) if r.isInstanceOf[Response] =>
|
||||||
|
throw new WebApplicationException(r.asInstanceOf[Response])
|
||||||
|
case None => throw new WebApplicationException(408)
|
||||||
|
case unknown => {
|
||||||
|
log.warning("Authenticator replied with unexpected result [%s]", unknown);
|
||||||
|
throw new WebApplicationException(Response.Status.INTERNAL_SERVER_ERROR)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case None => throw new WebApplicationException(Response.Status.FORBIDDEN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy val authenticatorFQN = {
|
||||||
|
val auth = Config.config.getString("akka.rest.authenticator", "N/A")
|
||||||
|
if (auth == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.authenticator' is not defined in 'akka.conf'")
|
||||||
|
auth
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Currently we always take the first, since there usually should be at most one authentication actor, but a round-robin
|
||||||
|
* strategy could be implemented in the future
|
||||||
|
*/
|
||||||
|
def authenticator: ActorRef = ActorRegistry.actorsFor(authenticatorFQN).head
|
||||||
|
|
||||||
|
def mkFilter(roles: Option[List[String]]): java.util.List[ResourceFilter] =
|
||||||
|
java.util.Collections.singletonList(new Filter(authenticator, roles))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The create method is invoked for each resource, and we look for javax.annotation.security annotations
|
||||||
|
* and create the appropriate Filter configurations for each.
|
||||||
|
*/
|
||||||
|
override def create(am: AbstractMethod): java.util.List[ResourceFilter] = {
|
||||||
|
|
||||||
|
//DenyAll takes precedence
|
||||||
|
if (am.isAnnotationPresent(classOf[DenyAll]))
|
||||||
|
return mkFilter(None)
|
||||||
|
|
||||||
|
//Method-level RolesAllowed takes precedence
|
||||||
|
val ra = am.getAnnotation(classOf[RolesAllowed])
|
||||||
|
|
||||||
|
if (ra ne null)
|
||||||
|
return mkFilter(Some(ra.value.toList))
|
||||||
|
|
||||||
|
//PermitAll takes precedence over resource-level RolesAllowed annotation
|
||||||
|
if (am.isAnnotationPresent(classOf[PermitAll]))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
//Last but not least, the resource-level RolesAllowed
|
||||||
|
val cra = am.getResource.getAnnotation(classOf[RolesAllowed])
|
||||||
|
if (cra ne null)
|
||||||
|
return mkFilter(Some(cra.value.toList))
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AuthenticationActor is the super-trait for actors doing Http authentication
|
||||||
|
* It defines the common ground and the flow of execution
|
||||||
|
*/
|
||||||
|
trait AuthenticationActor[C <: Credentials] extends Actor {
|
||||||
|
type Req = ContainerRequest
|
||||||
|
|
||||||
|
//What realm does the authentication use?
|
||||||
|
def realm: String
|
||||||
|
|
||||||
|
//Creates a response to signal unauthorized
|
||||||
|
def unauthorized: Response
|
||||||
|
|
||||||
|
//Used to extract information from the request, returns None if no credentials found
|
||||||
|
def extractCredentials(r: Req): Option[C]
|
||||||
|
|
||||||
|
//returns None is unverified
|
||||||
|
def verify(c: Option[C]): Option[UserInfo]
|
||||||
|
|
||||||
|
//Contruct a new SecurityContext from the supplied parameters
|
||||||
|
def mkSecurityContext(r: Req, user: UserInfo): SecurityContext
|
||||||
|
|
||||||
|
//This is the default security context factory
|
||||||
|
def mkDefaultSecurityContext(r: Req, u: UserInfo, scheme: String): SecurityContext = {
|
||||||
|
val n = u.username
|
||||||
|
val p = new Principal {def getName = n}
|
||||||
|
|
||||||
|
new SecurityContext {
|
||||||
|
def getAuthenticationScheme = scheme
|
||||||
|
def getUserPrincipal = p
|
||||||
|
def isSecure = r.isSecure
|
||||||
|
def isUserInRole(role: String) = u.roles.exists(_ == role)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for the execution flow of authentication
|
||||||
|
*
|
||||||
|
* Credentials are extracted and verified from the request,
|
||||||
|
* and a se3curity context is created for the ContainerRequest
|
||||||
|
* this should ensure good integration with current Jersey security
|
||||||
|
*/
|
||||||
|
protected val authenticate: Receive = {
|
||||||
|
case Authenticate(req, roles) => {
|
||||||
|
verify(extractCredentials(req)) match {
|
||||||
|
case Some(u: UserInfo) => {
|
||||||
|
req.setSecurityContext(mkSecurityContext(req, u))
|
||||||
|
if (roles.exists(req.isUserInRole(_))) self.reply(OK)
|
||||||
|
else self.reply(Response.status(Response.Status.FORBIDDEN).build)
|
||||||
|
}
|
||||||
|
case _ => self.reply(unauthorized)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def receive = authenticate
|
||||||
|
|
||||||
|
//returns the string value of the "Authorization"-header of the request
|
||||||
|
def auth(r: Req) = r.getHeaderValue("Authorization")
|
||||||
|
|
||||||
|
//Turns the aforementioned header value into an option
|
||||||
|
def authOption(r: Req): Option[String] = {
|
||||||
|
val a = auth(r)
|
||||||
|
if ((a ne null) && a.length > 0) Some(a) else None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This trait implements the logic for Http Basic authentication
|
||||||
|
* mix this trait into a class to create an authenticator
|
||||||
|
* Don't forget to set the authenticator FQN in the rest-part of the akka config
|
||||||
|
*/
|
||||||
|
trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] {
|
||||||
|
override def unauthorized =
|
||||||
|
Response.status(401).header("WWW-Authenticate", "Basic realm=\"" + realm + "\"").build
|
||||||
|
|
||||||
|
override def extractCredentials(r: Req): Option[BasicCredentials] = {
|
||||||
|
val Authorization = """(.*):(.*)""".r
|
||||||
|
|
||||||
|
authOption(r) match {
|
||||||
|
case Some(token) => {
|
||||||
|
val authResponse = new String(Base64.decode(token.substring(6).getBytes))
|
||||||
|
authResponse match {
|
||||||
|
case Authorization(username, password) => Some(BasicCredentials(username, password))
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def mkSecurityContext(r: Req, u: UserInfo): SecurityContext =
|
||||||
|
mkDefaultSecurityContext(r, u, SecurityContext.BASIC_AUTH)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This trait implements the logic for Http Digest authentication mix this trait into a
|
||||||
|
* class to create an authenticator. Don't forget to set the authenticator FQN in the
|
||||||
|
* rest-part of the akka config
|
||||||
|
*/
|
||||||
|
trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging {
|
||||||
|
import LiftUtils._
|
||||||
|
|
||||||
|
private object InvalidateNonces
|
||||||
|
|
||||||
|
//Holds the generated nonces for the specified validity period
|
||||||
|
val nonceMap = mkNonceMap
|
||||||
|
|
||||||
|
//Discards old nonces
|
||||||
|
protected val invalidateNonces: Receive = {
|
||||||
|
case InvalidateNonces =>
|
||||||
|
val ts = System.currentTimeMillis
|
||||||
|
nonceMap.filter(tuple => (ts - tuple._2) < nonceValidityPeriod)
|
||||||
|
case unknown =>
|
||||||
|
log.error("Don't know what to do with: ", unknown)
|
||||||
|
}
|
||||||
|
|
||||||
|
//Schedule the invalidation of nonces
|
||||||
|
Scheduler.schedule(self, InvalidateNonces, noncePurgeInterval, noncePurgeInterval, TimeUnit.MILLISECONDS)
|
||||||
|
|
||||||
|
//authenticate or invalidate nonces
|
||||||
|
override def receive = authenticate orElse invalidateNonces
|
||||||
|
|
||||||
|
override def unauthorized: Response = {
|
||||||
|
val nonce = randomString(64)
|
||||||
|
nonceMap.put(nonce, System.currentTimeMillis)
|
||||||
|
unauthorized(nonce, "auth", randomString(64))
|
||||||
|
}
|
||||||
|
|
||||||
|
def unauthorized(nonce: String, qop: String, opaque: String): Response = {
|
||||||
|
Response.status(401).header(
|
||||||
|
"WWW-Authenticate",
|
||||||
|
"Digest realm=\"" + realm + "\", " +
|
||||||
|
"qop=\"" + qop + "\", " +
|
||||||
|
"nonce=\"" + nonce + "\", " +
|
||||||
|
"opaque=\"" + opaque + "\"").build
|
||||||
|
}
|
||||||
|
|
||||||
|
//Tests wether the specified credentials are valid
|
||||||
|
def validate(auth: DigestCredentials, user: UserInfo): Boolean = {
|
||||||
|
def h(s: String) = hexEncode(md5(s.getBytes("UTF-8")))
|
||||||
|
|
||||||
|
val ha1 = h(auth.userName + ":" + auth.realm + ":" + user.password)
|
||||||
|
val ha2 = h(auth.method + ":" + auth.uri)
|
||||||
|
|
||||||
|
val response = h(
|
||||||
|
ha1 + ":" + auth.nonce + ":" +
|
||||||
|
auth.nc + ":" + auth.cnonce + ":" +
|
||||||
|
auth.qop + ":" + ha2)
|
||||||
|
|
||||||
|
(response == auth.response) && (nonceMap.getOrElse(auth.nonce, -1) != -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def verify(odc: Option[DigestCredentials]): Option[UserInfo] = odc match {
|
||||||
|
case Some(dc) => {
|
||||||
|
userInfo(dc.userName) match {
|
||||||
|
case Some(u) if validate(dc, u) =>
|
||||||
|
nonceMap.get(dc.nonce).map(t => (System.currentTimeMillis - t) < nonceValidityPeriod).map(_ => u)
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
|
||||||
|
override def extractCredentials(r: Req): Option[DigestCredentials] = {
|
||||||
|
authOption(r).map(s => {
|
||||||
|
val ? = splitNameValuePairs(s.substring(7, s.length))
|
||||||
|
DigestCredentials(r.getMethod.toUpperCase,
|
||||||
|
?("username"), ?("realm"), ?("nonce"),
|
||||||
|
?("uri"), ?("qop"), ?("nc"),
|
||||||
|
?("cnonce"), ?("response"), ?("opaque"))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
override def mkSecurityContext(r: Req, u: UserInfo): SecurityContext =
|
||||||
|
mkDefaultSecurityContext(r, u, SecurityContext.DIGEST_AUTH)
|
||||||
|
|
||||||
|
//Mandatory overrides
|
||||||
|
def userInfo(username: String): Option[UserInfo]
|
||||||
|
|
||||||
|
def mkNonceMap: scala.collection.mutable.Map[String, Long]
|
||||||
|
|
||||||
|
//Optional overrides
|
||||||
|
def nonceValidityPeriod = 60 * 1000 //ms
|
||||||
|
def noncePurgeInterval = 2 * 60 * 1000 //ms
|
||||||
|
}
|
||||||
|
|
||||||
|
import java.security.Principal
|
||||||
|
import java.security.PrivilegedActionException
|
||||||
|
import java.security.PrivilegedExceptionAction
|
||||||
|
|
||||||
|
import javax.security.auth.login.AppConfigurationEntry
|
||||||
|
import javax.security.auth.login.Configuration
|
||||||
|
import javax.security.auth.login.LoginContext
|
||||||
|
import javax.security.auth.Subject
|
||||||
|
import javax.security.auth.kerberos.KerberosPrincipal
|
||||||
|
|
||||||
|
import org.ietf.jgss.GSSContext
|
||||||
|
import org.ietf.jgss.GSSCredential
|
||||||
|
import org.ietf.jgss.GSSManager
|
||||||
|
|
||||||
|
trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] with Logging {
|
||||||
|
override def unauthorized =
|
||||||
|
Response.status(401).header("WWW-Authenticate", "Negotiate").build
|
||||||
|
|
||||||
|
// for some reason the jersey Base64 class does not work with kerberos
|
||||||
|
// but the commons Base64 does
|
||||||
|
import org.apache.commons.codec.binary.Base64
|
||||||
|
override def extractCredentials(r: Req): Option[SpnegoCredentials] = {
|
||||||
|
val AuthHeader = """Negotiate\s(.*)""".r
|
||||||
|
|
||||||
|
authOption(r) match {
|
||||||
|
case Some(AuthHeader(token)) =>
|
||||||
|
Some(SpnegoCredentials(Base64.decodeBase64(token.trim.getBytes)))
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
override def verify(odc: Option[SpnegoCredentials]): Option[UserInfo] = odc match {
|
||||||
|
case Some(dc) => {
|
||||||
|
try {
|
||||||
|
val principal = Subject.doAs(this.serviceSubject, new KerberosValidateAction(dc.token));
|
||||||
|
val user = stripRealmFrom(principal)
|
||||||
|
Some(UserInfo(user, null, rolesFor(user)))
|
||||||
|
} catch {
|
||||||
|
case e: PrivilegedActionException => {
|
||||||
|
log.error(e, "Action not allowed")
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case _ => None
|
||||||
|
}
|
||||||
|
|
||||||
|
override def mkSecurityContext(r: Req, u: UserInfo): SecurityContext =
|
||||||
|
mkDefaultSecurityContext(r, u, SecurityContext.CLIENT_CERT_AUTH) // the security context does not know about spnego/kerberos
|
||||||
|
// not sure whether to use a constant from the security context or something like "SPNEGO/Kerberos"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the roles for the given user
|
||||||
|
*/
|
||||||
|
def rolesFor(user: String): List[String]
|
||||||
|
|
||||||
|
// Kerberos
|
||||||
|
|
||||||
|
/**
|
||||||
|
* strips the realm from a kerberos principal name, returning only the user part
|
||||||
|
*/
|
||||||
|
private def stripRealmFrom(principal: String): String = principal.split("@")(0)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* principal name for the HTTP kerberos service, i.e HTTP/ { server } @ { realm }
|
||||||
|
*/
|
||||||
|
lazy val servicePrincipal = {
|
||||||
|
val p = Config.config.getString("akka.rest.kerberos.servicePrincipal", "N/A")
|
||||||
|
if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.servicePrincipal' is not defined in 'akka.conf'")
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* keytab location with credentials for the service principal
|
||||||
|
*/
|
||||||
|
lazy val keyTabLocation = {
|
||||||
|
val p = Config.config.getString("akka.rest.kerberos.keyTabLocation", "N/A")
|
||||||
|
if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.keyTabLocation' is not defined in 'akka.conf'")
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy val kerberosDebug = {
|
||||||
|
val p = Config.config.getString("akka.rest.kerberos.kerberosDebug", "N/A")
|
||||||
|
if (p == "N/A") throw new IllegalActorStateException("The config option 'akka.rest.kerberos.kerberosDebug' is not defined in 'akka.conf'")
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* is not used by this authenticator, so accept an empty value
|
||||||
|
*/
|
||||||
|
lazy val realm = Config.config.getString("akka.rest.kerberos.realm", "")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify the kerberos token from a client with the server
|
||||||
|
*/
|
||||||
|
class KerberosValidateAction(kerberosTicket: Array[Byte]) extends PrivilegedExceptionAction[String] {
|
||||||
|
def run = {
|
||||||
|
val context = GSSManager.getInstance().createContext(null.asInstanceOf[GSSCredential])
|
||||||
|
context.acceptSecContext(kerberosTicket, 0, kerberosTicket.length)
|
||||||
|
val user = context.getSrcName().toString()
|
||||||
|
context.dispose()
|
||||||
|
user
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// service principal login to kerberos on startup
|
||||||
|
|
||||||
|
val serviceSubject = servicePrincipalLogin
|
||||||
|
|
||||||
|
/**
|
||||||
|
* acquire an initial ticket from the kerberos server for the HTTP service
|
||||||
|
*/
|
||||||
|
def servicePrincipalLogin = {
|
||||||
|
val loginConfig = new LoginConfig(
|
||||||
|
new java.net.URL(this.keyTabLocation).toExternalForm(),
|
||||||
|
this.servicePrincipal,
|
||||||
|
this.kerberosDebug)
|
||||||
|
val princ = new java.util.HashSet[Principal](1)
|
||||||
|
princ.add(new KerberosPrincipal(this.servicePrincipal))
|
||||||
|
val sub = new Subject(false, princ, new java.util.HashSet[Object], new java.util.HashSet[Object])
|
||||||
|
val lc = new LoginContext("", sub, null, loginConfig)
|
||||||
|
lc.login()
|
||||||
|
lc.getSubject()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* this class simulates a login-config.xml
|
||||||
|
*/
|
||||||
|
class LoginConfig(keyTabLocation: String, servicePrincipal: String, debug: String) extends Configuration {
|
||||||
|
override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
|
||||||
|
val options = new java.util.HashMap[String, String]
|
||||||
|
options.put("useKeyTab", "true")
|
||||||
|
options.put("keyTab", this.keyTabLocation)
|
||||||
|
options.put("principal", this.servicePrincipal)
|
||||||
|
options.put("storeKey", "true")
|
||||||
|
options.put("doNotPrompt", "true")
|
||||||
|
options.put("isInitiator", "true")
|
||||||
|
options.put("debug", debug)
|
||||||
|
|
||||||
|
Array(new AppConfigurationEntry(
|
||||||
|
"com.sun.security.auth.module.Krb5LoginModule",
|
||||||
|
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
|
||||||
|
options))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright 2006-2010 WorldWide Conferencing, LLC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
object LiftUtils {
|
||||||
|
import java.security.{MessageDigest,SecureRandom}
|
||||||
|
val random = new SecureRandom()
|
||||||
|
|
||||||
|
def md5(in: Array[Byte]): Array[Byte] = (MessageDigest.getInstance("MD5")).digest(in)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a random string of a given size
|
||||||
|
* @param size size of the string to create. Must be a positive or nul integer
|
||||||
|
* @return the generated string
|
||||||
|
*/
|
||||||
|
def randomString(size: Int): String = {
|
||||||
|
def addChar(pos: Int, lastRand: Int, sb: StringBuilder): StringBuilder = {
|
||||||
|
if (pos >= size) sb
|
||||||
|
else {
|
||||||
|
val randNum = if ((pos % 6) == 0) random.nextInt else lastRand
|
||||||
|
sb.append((randNum & 0x1f) match {
|
||||||
|
case n if n < 26 => ('A' + n).toChar
|
||||||
|
case n => ('0' + (n - 26)).toChar
|
||||||
|
})
|
||||||
|
addChar(pos + 1, randNum >> 5, sb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addChar(0, 0, new StringBuilder(size)).toString
|
||||||
|
}
|
||||||
|
|
||||||
|
/** encode a Byte array as hexadecimal characters */
|
||||||
|
def hexEncode(in: Array[Byte]): String = {
|
||||||
|
val sb = new StringBuilder
|
||||||
|
val len = in.length
|
||||||
|
def addDigit(in: Array[Byte], pos: Int, len: Int, sb: StringBuilder) {
|
||||||
|
if (pos < len) {
|
||||||
|
val b: Int = in(pos)
|
||||||
|
val msb = (b & 0xf0) >> 4
|
||||||
|
val lsb = (b & 0x0f)
|
||||||
|
sb.append((if (msb < 10) ('0' + msb).asInstanceOf[Char] else ('a' + (msb - 10)).asInstanceOf[Char]))
|
||||||
|
sb.append((if (lsb < 10) ('0' + lsb).asInstanceOf[Char] else ('a' + (lsb - 10)).asInstanceOf[Char]))
|
||||||
|
addDigit(in, pos + 1, len, sb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addDigit(in, 0, len, sb)
|
||||||
|
sb.toString
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splits a string of the form <name1=value1, name2=value2, ... > and unquotes the quoted values.
|
||||||
|
* The result is a Map[String, String]
|
||||||
|
*/
|
||||||
|
def splitNameValuePairs(props: String): Map[String, String] = {
|
||||||
|
/**
|
||||||
|
* If str is surrounded by quotes it return the content between the quotes
|
||||||
|
*/
|
||||||
|
def unquote(str: String) = {
|
||||||
|
if ((str ne null) && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"')
|
||||||
|
str.substring(1, str.length - 1)
|
||||||
|
else
|
||||||
|
str
|
||||||
|
}
|
||||||
|
|
||||||
|
val list = props.split(",").toList.map(in => {
|
||||||
|
val pair = in match { case null => Nil case s => s.split("=").toList.map(_.trim).filter(_.length > 0) }
|
||||||
|
(pair(0), unquote(pair(1)))
|
||||||
|
})
|
||||||
|
val map: Map[String, String] = Map.empty
|
||||||
|
(map /: list)((m, next) => m + (next))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,208 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package sample.rest.scala
|
|
||||||
|
|
||||||
import akka.actor.{SupervisorFactory, Actor}
|
|
||||||
import akka.actor.Actor._
|
|
||||||
import akka.stm._
|
|
||||||
import akka.stm.TransactionalMap
|
|
||||||
import akka.persistence.cassandra.CassandraStorage
|
|
||||||
import akka.config.Supervision._
|
|
||||||
import akka.util.Logging
|
|
||||||
import scala.xml.NodeSeq
|
|
||||||
import java.lang.Integer
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import javax.ws.rs.core.MultivaluedMap
|
|
||||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
|
||||||
import akka.actor.ActorRegistry.actorFor
|
|
||||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
|
||||||
import org.atmosphere.util.XSSHtmlFilter
|
|
||||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
|
||||||
import org.atmosphere.jersey.Broadcastable
|
|
||||||
|
|
||||||
class Boot {
|
|
||||||
val factory = SupervisorFactory(
|
|
||||||
SupervisorConfig(
|
|
||||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
|
||||||
Supervise(
|
|
||||||
actorOf[SimpleServiceActor],
|
|
||||||
Permanent) ::
|
|
||||||
Supervise(
|
|
||||||
actorOf[ChatActor],
|
|
||||||
Permanent) ::
|
|
||||||
Supervise(
|
|
||||||
actorOf[PersistentSimpleServiceActor],
|
|
||||||
Permanent)
|
|
||||||
:: Nil))
|
|
||||||
factory.newInstance.start
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Try service out by invoking (multiple times):
|
|
||||||
* <pre>
|
|
||||||
* curl http://localhost:9998/scalacount
|
|
||||||
* </pre>
|
|
||||||
* Or browse to the URL from a web browser.
|
|
||||||
*/
|
|
||||||
@Path("/scalacount")
|
|
||||||
class SimpleService {
|
|
||||||
@GET
|
|
||||||
@Produces(Array("text/html"))
|
|
||||||
def count = {
|
|
||||||
//Fetch the first actor of type SimpleServiceActor
|
|
||||||
//Send it the "Tick" message and expect a NodeSeq back
|
|
||||||
val result = for{a <- actorFor[SimpleServiceActor]
|
|
||||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
|
||||||
//Return either the resulting NodeSeq or a default one
|
|
||||||
result getOrElse <error>Error in counter</error>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class SimpleServiceActor extends Actor {
|
|
||||||
private val KEY = "COUNTER"
|
|
||||||
private var hasStartedTicking = false
|
|
||||||
private val storage = TransactionalMap[String, Integer]()
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case "Tick" => if (hasStartedTicking) {
|
|
||||||
val count = atomic {
|
|
||||||
val current = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
|
||||||
val updated = current + 1
|
|
||||||
storage.put(KEY, new Integer(updated))
|
|
||||||
updated
|
|
||||||
}
|
|
||||||
self.reply(<success>Tick:{count}</success>)
|
|
||||||
} else {
|
|
||||||
atomic {
|
|
||||||
storage.put(KEY, new Integer(0))
|
|
||||||
}
|
|
||||||
hasStartedTicking = true
|
|
||||||
self.reply(<success>Tick: 0</success>)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Path("/pubsub/")
|
|
||||||
class PubSub {
|
|
||||||
@GET
|
|
||||||
@Suspend
|
|
||||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
|
||||||
@Path("/topic/{topic}/")
|
|
||||||
def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
|
|
||||||
|
|
||||||
@GET
|
|
||||||
@Broadcast
|
|
||||||
@Path("/topic/{topic}/{message}/")
|
|
||||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
|
||||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Try service out by invoking (multiple times):
|
|
||||||
* <pre>
|
|
||||||
* curl http://localhost:9998/persistentscalacount
|
|
||||||
* </pre>
|
|
||||||
* Or browse to the URL from a web browser.
|
|
||||||
*/
|
|
||||||
@Path("/persistentscalacount")
|
|
||||||
class PersistentSimpleService {
|
|
||||||
@GET
|
|
||||||
@Produces(Array("text/html"))
|
|
||||||
def count = {
|
|
||||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
|
||||||
//Send it the "Tick" message and expect a NodeSeq back
|
|
||||||
val result = for{a <- actorFor[PersistentSimpleServiceActor]
|
|
||||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
|
||||||
//Return either the resulting NodeSeq or a default one
|
|
||||||
result getOrElse <error>Error in counter</error>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class PersistentSimpleServiceActor extends Actor {
|
|
||||||
private val KEY = "COUNTER"
|
|
||||||
private var hasStartedTicking = false
|
|
||||||
private lazy val storage = CassandraStorage.newMap
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case "Tick" => if (hasStartedTicking) {
|
|
||||||
val count = atomic {
|
|
||||||
val bytes = storage.get(KEY.getBytes).get
|
|
||||||
val current = Integer.parseInt(new String(bytes, "UTF8"))
|
|
||||||
val updated = current + 1
|
|
||||||
storage.put(KEY.getBytes, (updated).toString.getBytes)
|
|
||||||
updated
|
|
||||||
}
|
|
||||||
// val bytes = storage.get(KEY.getBytes).get
|
|
||||||
// val counter = ByteBuffer.wrap(bytes).getInt
|
|
||||||
// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
|
||||||
self.reply(<success>Tick:{count}</success>)
|
|
||||||
} else {
|
|
||||||
atomic {
|
|
||||||
storage.put(KEY.getBytes, "0".getBytes)
|
|
||||||
}
|
|
||||||
// storage.put(KEY.getBytes, Array(0.toByte))
|
|
||||||
hasStartedTicking = true
|
|
||||||
self.reply(<success>Tick: 0</success>)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Path("/chat")
|
|
||||||
class Chat {
|
|
||||||
import ChatActor.ChatMsg
|
|
||||||
@Suspend
|
|
||||||
@GET
|
|
||||||
@Produces(Array("text/html"))
|
|
||||||
def suspend = ()
|
|
||||||
|
|
||||||
@POST
|
|
||||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
|
||||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
|
||||||
@Produces(Array("text/html"))
|
|
||||||
def publishMessage(form: MultivaluedMap[String, String]) = {
|
|
||||||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
|
||||||
//Fetch the first actor of type ChatActor
|
|
||||||
//Send it the "Tick" message and expect a NodeSeq back
|
|
||||||
val result = for{a <- actorFor[ChatActor]
|
|
||||||
r <- (a !! msg).as[String]} yield r
|
|
||||||
//Return either the resulting String or a default one
|
|
||||||
result getOrElse "System__error"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
object ChatActor {
|
|
||||||
case class ChatMsg(val who: String, val what: String, val msg: String)
|
|
||||||
}
|
|
||||||
|
|
||||||
class ChatActor extends Actor with Logging {
|
|
||||||
import ChatActor.ChatMsg
|
|
||||||
def receive = {
|
|
||||||
case ChatMsg(who, what, msg) => {
|
|
||||||
what match {
|
|
||||||
case "login" => self.reply("System Message__" + who + " has joined.")
|
|
||||||
case "post" => self.reply("" + who + "__" + msg)
|
|
||||||
case _ => throw new WebApplicationException(422)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case x => log.info("recieve unknown: " + x)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class JsonpFilter extends BroadcastFilter with Logging {
|
|
||||||
def filter(an: AnyRef) = {
|
|
||||||
val m = an.toString
|
|
||||||
var name = m
|
|
||||||
var message = ""
|
|
||||||
|
|
||||||
if (m.indexOf("__") > 0) {
|
|
||||||
name = m.substring(0, m.indexOf("__"))
|
|
||||||
message = m.substring(m.indexOf("__") + 2)
|
|
||||||
}
|
|
||||||
|
|
||||||
new BroadcastFilter.BroadcastAction("<script type='text/javascript'>\n (window.app || window.parent.app).update({ name: \"" +
|
|
||||||
name + "\", message: \"" + message + "\" }); \n</script>\n")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue