Merge branch 'master' of git-proxy:jboner/akka
This commit is contained in:
commit
aebdc773eb
14 changed files with 246 additions and 76 deletions
|
|
@ -148,9 +148,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn
|
|||
else targetByUuid(ep.uuid.get)
|
||||
|
||||
private def targetById(id: String) = ActorRegistry.actorsFor(id) match {
|
||||
case Nil => None
|
||||
case actor :: Nil => Some(actor)
|
||||
case actors => Some(actors.head)
|
||||
case actors if actors.length == 0 => None
|
||||
case actors => Some(actors(0))
|
||||
}
|
||||
|
||||
private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid)
|
||||
|
|
|
|||
|
|
@ -31,12 +31,13 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
|
|||
object ActorRegistry extends ListenerManagement {
|
||||
private val actorsByUUID = new ConcurrentHashMap[String, ActorRef]
|
||||
private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]]
|
||||
private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]]
|
||||
|
||||
private val Naught = Array[ActorRef]() //Nil for Arrays
|
||||
|
||||
/**
|
||||
* Returns all actors in the system.
|
||||
*/
|
||||
def actors: List[ActorRef] = filter(_ => true)
|
||||
def actors: Array[ActorRef] = filter(_ => true)
|
||||
|
||||
/**
|
||||
* Invokes a function for all actors.
|
||||
|
|
@ -46,16 +47,31 @@ object ActorRegistry extends ListenerManagement {
|
|||
while (elements.hasMoreElements) f(elements.nextElement)
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the function on all known actors until it returns Some
|
||||
* Returns None if the function never returns Some
|
||||
*/
|
||||
def find[T](f: (ActorRef) => Option[T]) : Option[T] = {
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
val result = f(elements.nextElement)
|
||||
|
||||
if(result.isDefined)
|
||||
return result
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message.
|
||||
*/
|
||||
def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): List[ActorRef] =
|
||||
def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): Array[ActorRef] =
|
||||
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message))
|
||||
|
||||
/**
|
||||
* Finds all actors that satisfy a predicate.
|
||||
*/
|
||||
def filter(p: ActorRef => Boolean): List[ActorRef] = {
|
||||
def filter(p: ActorRef => Boolean): Array[ActorRef] = {
|
||||
val all = new ListBuffer[ActorRef]
|
||||
val elements = actorsByUUID.elements
|
||||
while (elements.hasMoreElements) {
|
||||
|
|
@ -64,37 +80,34 @@ object ActorRegistry extends ListenerManagement {
|
|||
all += actorId
|
||||
}
|
||||
}
|
||||
all.toList
|
||||
all.toArray
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds all actors that are subtypes of the class passed in as the Manifest argument.
|
||||
*/
|
||||
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] =
|
||||
filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass))
|
||||
def actorsFor[T <: Actor](implicit manifest: Manifest[T]): Array[ActorRef] =
|
||||
actorsFor[T](manifest.erasure.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Finds any actor that matches T.
|
||||
*/
|
||||
def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] =
|
||||
actorsFor[T](manifest).headOption
|
||||
find(a => if(manifest.erasure.isAssignableFrom(a.actor.getClass)) Some(a) else None)
|
||||
|
||||
/**
|
||||
* Finds all actors of the exact type specified by the class passed in as the Class argument.
|
||||
* Finds all actors of type or sub-type specified by the class passed in as the Class argument.
|
||||
*/
|
||||
def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = {
|
||||
if (actorsByClassName.containsKey(clazz.getName)) {
|
||||
actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]]
|
||||
} else Nil
|
||||
}
|
||||
def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] =
|
||||
filter(a => clazz.isAssignableFrom(a.actor.getClass))
|
||||
|
||||
/**
|
||||
* Finds all actors that has a specific id.
|
||||
*/
|
||||
def actorsFor(id: String): List[ActorRef] = {
|
||||
def actorsFor(id: String): Array[ActorRef] = {
|
||||
if (actorsById.containsKey(id)) {
|
||||
actorsById.get(id).toArray.toList.asInstanceOf[List[ActorRef]]
|
||||
} else Nil
|
||||
actorsById.get(id).toArray(Naught)
|
||||
} else Naught
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -109,27 +122,26 @@ object ActorRegistry extends ListenerManagement {
|
|||
* Registers an actor in the ActorRegistry.
|
||||
*/
|
||||
def register(actor: ActorRef) = {
|
||||
// UUID
|
||||
actorsByUUID.put(actor.uuid, actor)
|
||||
|
||||
// ID
|
||||
val id = actor.id
|
||||
if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor)
|
||||
if (actorsById.containsKey(id)) actorsById.get(id).add(actor)
|
||||
|
||||
val set = actorsById get id
|
||||
if(set ne null)
|
||||
set add actor
|
||||
else {
|
||||
val set = new ConcurrentSkipListSet[ActorRef]
|
||||
set.add(actor)
|
||||
actorsById.put(id, set)
|
||||
val newSet = new ConcurrentSkipListSet[ActorRef]
|
||||
newSet add actor
|
||||
|
||||
val oldSet = actorsById.putIfAbsent(id,newSet)
|
||||
|
||||
//Parry for two simultaneous putIfAbsent(id,newSet)
|
||||
if(oldSet ne null)
|
||||
oldSet add actor
|
||||
}
|
||||
|
||||
// Class name
|
||||
val className = actor.actorClassName
|
||||
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor)
|
||||
else {
|
||||
val set = new ConcurrentSkipListSet[ActorRef]
|
||||
set.add(actor)
|
||||
actorsByClassName.put(className, set)
|
||||
}
|
||||
// UUID
|
||||
actorsByUUID.put(actor.uuid, actor)
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorRegistered(actor))
|
||||
|
|
@ -141,11 +153,11 @@ object ActorRegistry extends ListenerManagement {
|
|||
def unregister(actor: ActorRef) = {
|
||||
actorsByUUID remove actor.uuid
|
||||
|
||||
val id = actor.id
|
||||
if (actorsById.containsKey(id)) actorsById.get(id).remove(actor)
|
||||
val set = actorsById get actor.id
|
||||
if (set ne null)
|
||||
set remove actor
|
||||
|
||||
val className = actor.actorClassName
|
||||
if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor)
|
||||
//FIXME: safely remove set if empty, leaks memory
|
||||
|
||||
// notify listeners
|
||||
foreachListener(_ ! ActorUnregistered(actor))
|
||||
|
|
@ -159,7 +171,6 @@ object ActorRegistry extends ListenerManagement {
|
|||
foreach(_.stop)
|
||||
actorsByUUID.clear
|
||||
actorsById.clear
|
||||
actorsByClassName.clear
|
||||
log.info("All actors have been shut down and unregistered from ActorRegistry")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
|
|||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
// The actor will need a ConcurrentLinkedDeque based mailbox
|
||||
if( actorRef.mailbox == null ) {
|
||||
if( actorRef.mailbox eq null ) {
|
||||
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]()
|
||||
}
|
||||
super.register(actorRef)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
|
|||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
|
||||
import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
import java.net.{SocketAddress, InetSocketAddress}
|
||||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet}
|
||||
|
|
@ -251,21 +252,28 @@ class RemoteClientPipelineFactory(name: String,
|
|||
timer: HashedWheelTimer,
|
||||
client: RemoteClient) extends ChannelPipelineFactory {
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
|
||||
|
||||
val engine = RemoteServerSslContext.client.createSSLEngine()
|
||||
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
|
||||
engine.setUseClientMode(true)
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
|
||||
case _ => None
|
||||
val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder))
|
||||
case _ => (join(),join())
|
||||
}
|
||||
|
||||
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
|
||||
|
||||
val stages: Array[ChannelHandler] =
|
||||
zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
|
||||
.getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
|
||||
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
|
||||
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -344,9 +352,24 @@ class RemoteClientHandler(val name: String,
|
|||
}
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.listeners.toArray.foreach(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
def connect = {
|
||||
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
if(RemoteServer.SECURE){
|
||||
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
sslHandler.handshake().addListener( new ChannelFutureListener {
|
||||
def operationComplete(future : ChannelFuture) : Unit = {
|
||||
if(future.isSuccess)
|
||||
connect
|
||||
//else
|
||||
//FIXME: What is the correct action here?
|
||||
}
|
||||
})
|
||||
} else {
|
||||
connect
|
||||
}
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
|||
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
||||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
|
||||
|
|
@ -73,6 +75,32 @@ object RemoteServer {
|
|||
level
|
||||
}
|
||||
|
||||
val SECURE = {
|
||||
if(config.getBool("akka.remote.ssl.service",false)){
|
||||
|
||||
val properties = List(
|
||||
("key-store-type" ,"keyStoreType"),
|
||||
("key-store" ,"keyStore"),
|
||||
("key-store-pass" ,"keyStorePassword"),
|
||||
("trust-store-type","trustStoreType"),
|
||||
("trust-store" ,"trustStore"),
|
||||
("trust-store-pass","trustStorePassword")
|
||||
).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2))
|
||||
|
||||
//If property is not set, and we have a value from our akka.conf, use that value
|
||||
for{ p <- properties if System.getProperty(p._2) eq null
|
||||
c <- config.getString(p._1)
|
||||
} System.setProperty(p._2,c)
|
||||
|
||||
if(config.getBool("akka.remote.ssl.debug",false))
|
||||
System.setProperty("javax.net.debug","ssl")
|
||||
|
||||
true
|
||||
}
|
||||
else
|
||||
false
|
||||
}
|
||||
|
||||
object Address {
|
||||
def apply(hostname: String, port: Int) = new Address(hostname, port)
|
||||
}
|
||||
|
|
@ -282,7 +310,23 @@ class RemoteServer extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
case class Codec(encoder: ChannelHandler, decoder: ChannelHandler)
|
||||
object RemoteServerSslContext {
|
||||
import javax.net.ssl.SSLContext
|
||||
|
||||
val (client,server) = {
|
||||
val protocol = "TLS"
|
||||
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
|
||||
//val store = KeyStore.getInstance("JKS")
|
||||
|
||||
val s = SSLContext.getInstance(protocol)
|
||||
s.init(null,null,null)
|
||||
|
||||
val c = SSLContext.getInstance(protocol)
|
||||
c.init(null,null,null)
|
||||
|
||||
(c,s)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -296,20 +340,26 @@ class RemoteServerPipelineFactory(
|
|||
import RemoteServer._
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val zipCodec = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder))
|
||||
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
|
||||
case _ => None
|
||||
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
|
||||
|
||||
val engine = RemoteServerSslContext.server.createSSLEngine()
|
||||
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
|
||||
engine.setUseClientMode(false)
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder))
|
||||
case _ => (join(),join())
|
||||
}
|
||||
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors)
|
||||
|
||||
val stages: Array[ChannelHandler] =
|
||||
zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer))
|
||||
.getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer))
|
||||
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
||||
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -335,6 +385,23 @@ class RemoteServerHandler(
|
|||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
|
||||
openChannels.add(ctx.getChannel)
|
||||
}
|
||||
|
||||
override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) {
|
||||
if(RemoteServer.SECURE) {
|
||||
val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
||||
|
||||
// Begin handshake.
|
||||
sslHandler.handshake().addListener( new ChannelFutureListener {
|
||||
def operationComplete(future : ChannelFuture) : Unit = {
|
||||
if(future.isSuccess)
|
||||
openChannels.add(future.getChannel)
|
||||
else
|
||||
future.getChannel.close
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] &&
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ public class SampleUntypedActor extends UntypedActor {
|
|||
|
||||
} else if (msg.equals("ForwardMessage")) {
|
||||
// Retreive an actor from the ActorRegistry by ID and get an ActorRef back
|
||||
ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head();
|
||||
ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0];
|
||||
// Wrap the ActorRef in an UntypedActorRef and forward the message to this actor
|
||||
UntypedActorRef.wrap(actorRef).forward(msg, self);
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package se.scalablesolutions.akka.actor
|
|||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import Actor._
|
||||
import java.util.concurrent.{CyclicBarrier, TimeUnit, CountDownLatch}
|
||||
|
||||
object ActorRegistrySpec {
|
||||
var record = ""
|
||||
|
|
@ -76,6 +77,17 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldFindThingsFromActorRegistry {
|
||||
ActorRegistry.shutdownAll
|
||||
val actor = actorOf[TestActor]
|
||||
actor.start
|
||||
val found = ActorRegistry.find(a => if(a.actor.isInstanceOf[TestActor]) Some(a) else None)
|
||||
assert(found.isDefined)
|
||||
assert(found.get.actor.isInstanceOf[TestActor])
|
||||
assert(found.get.id === "MyID")
|
||||
actor.stop
|
||||
}
|
||||
|
||||
@Test def shouldGetActorsByIdFromActorRegistry {
|
||||
ActorRegistry.shutdownAll
|
||||
val actor1 = actorOf[TestActor]
|
||||
|
|
@ -203,4 +215,41 @@ class ActorRegistrySpec extends JUnitSuite {
|
|||
ActorRegistry.unregister(actor2)
|
||||
assert(ActorRegistry.actors.size === 0)
|
||||
}
|
||||
|
||||
@Test def shouldBeAbleToRegisterActorsConcurrently {
|
||||
ActorRegistry.shutdownAll
|
||||
|
||||
val latch = new CountDownLatch(3)
|
||||
val barrier = new CyclicBarrier(3)
|
||||
|
||||
def mkTestActor(i:Int) = actorOf( new Actor {
|
||||
self.id = i.toString
|
||||
def receive = { case _ => }
|
||||
})
|
||||
|
||||
def mkTestActors = for(i <- 1 to 10;j <- 1 to 1000) yield mkTestActor(i)
|
||||
|
||||
def mkThread(actors: Iterable[ActorRef]) = new Thread {
|
||||
start
|
||||
override def run {
|
||||
barrier.await
|
||||
actors foreach { _.start }
|
||||
latch.countDown
|
||||
}
|
||||
}
|
||||
|
||||
val testActors1 = mkTestActors
|
||||
val testActors2 = mkTestActors
|
||||
val testActors3 = mkTestActors
|
||||
|
||||
mkThread(testActors1)
|
||||
mkThread(testActors2)
|
||||
mkThread(testActors3)
|
||||
|
||||
assert(latch.await(30,TimeUnit.SECONDS) === true)
|
||||
|
||||
for(i <- 1 to 10) {
|
||||
assert(ActorRegistry.actorsFor(i.toString).length === 3000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ object SimpleRestService extends RestHelper {
|
|||
case Get("liftcount" :: _, req) =>
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a Node back
|
||||
val result = for( a <- ActorRegistry.actorsFor(classOf[SimpleServiceActor]).headOption;
|
||||
val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor];
|
||||
r <- (a !! "Tick").as[Node] ) yield r
|
||||
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
|
|
@ -85,7 +85,7 @@ object SimpleRestService extends RestHelper {
|
|||
case Get("persistentliftcount" :: _, req) =>
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a Node back
|
||||
val result = for( a <- ActorRegistry.actorsFor(classOf[PersistentServiceActor]).headOption;
|
||||
val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor];
|
||||
r <- (a !! "Tick").as[Node] ) yield r
|
||||
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
|
|
|
|||
|
|
@ -15,10 +15,12 @@ public class Boot {
|
|||
new Component[] {
|
||||
new Component(
|
||||
SimpleService.class,
|
||||
SimpleServiceImpl.class,
|
||||
new LifeCycle(new Permanent()),
|
||||
1000),
|
||||
new Component(
|
||||
PersistentSimpleService.class,
|
||||
PersistentSimpleServiceImpl.class,
|
||||
new LifeCycle(new Permanent()),
|
||||
1000)
|
||||
}).supervise();
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import java.lang.Integer
|
|||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
|
|
@ -53,7 +53,7 @@ class SimpleService {
|
|||
def count = {
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption
|
||||
val result = for{a <- actorFor[SimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
|
|
@ -108,7 +108,7 @@ class PersistentSimpleService {
|
|||
def count = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption
|
||||
val result = for{a <- actorFor[PersistentSimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
|
|
@ -155,7 +155,7 @@ class Chat {
|
|||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
||||
//Fetch the first actor of type ChatActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorsFor(classOf[ChatActor]).headOption
|
||||
val result = for{a <- actorFor[ChatActor]
|
||||
r <- (a !! msg).as[String]} yield r
|
||||
//Return either the resulting String or a default one
|
||||
result getOrElse "System__error"
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
|
||||
import se.scalablesolutions.akka.stm.TransactionalMap
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry.actorFor
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
|
|
@ -122,7 +122,7 @@ class SecureTickService {
|
|||
def tick = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NdeSeq back
|
||||
val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption
|
||||
val result = for{a <- actorFor[SecureTickActor]
|
||||
r <- (a !! "Tick").as[Integer]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result match {
|
||||
|
|
|
|||
|
|
@ -75,6 +75,23 @@ akka {
|
|||
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
|
||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||
|
||||
ssl {
|
||||
service = off #on / off
|
||||
|
||||
#You can either use java command-line options or use the settings below
|
||||
|
||||
#key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12
|
||||
#key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12
|
||||
#key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS
|
||||
|
||||
#trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks
|
||||
#trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore
|
||||
#trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS
|
||||
|
||||
#This can be useful for debugging
|
||||
debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl
|
||||
}
|
||||
|
||||
cluster {
|
||||
service = on
|
||||
name = "default" # The name of the cluster
|
||||
|
|
|
|||
|
|
@ -347,6 +347,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// testing
|
||||
val junit = Dependencies.junit
|
||||
val scalatest = Dependencies.scalatest
|
||||
|
||||
override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
#!/bin/bash
|
||||
cd $AKKA_HOME
|
||||
VERSION=akka_2.8.0.RC3-0.10
|
||||
VERSION=akka_2.8.0-0.10
|
||||
TARGET_DIR=dist/$VERSION/$1
|
||||
shift 1
|
||||
VMARGS=$@
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue