diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 1cd29ced00..7ef8e0750c 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -148,9 +148,8 @@ class ActorProducer(val ep: ActorEndpoint) extends DefaultProducer(ep) with Asyn else targetByUuid(ep.uuid.get) private def targetById(id: String) = ActorRegistry.actorsFor(id) match { - case Nil => None - case actor :: Nil => Some(actor) - case actors => Some(actors.head) + case actors if actors.length == 0 => None + case actors => Some(actors(0)) } private def targetByUuid(uuid: String) = ActorRegistry.actorFor(uuid) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index aea37432b7..532ead6754 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -31,12 +31,13 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent object ActorRegistry extends ListenerManagement { private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] - private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] + + private val Naught = Array[ActorRef]() //Nil for Arrays /** * Returns all actors in the system. */ - def actors: List[ActorRef] = filter(_ => true) + def actors: Array[ActorRef] = filter(_ => true) /** * Invokes a function for all actors. @@ -46,16 +47,31 @@ object ActorRegistry extends ListenerManagement { while (elements.hasMoreElements) f(elements.nextElement) } + /** + * Invokes the function on all known actors until it returns Some + * Returns None if the function never returns Some + */ + def find[T](f: (ActorRef) => Option[T]) : Option[T] = { + val elements = actorsByUUID.elements + while (elements.hasMoreElements) { + val result = f(elements.nextElement) + + if(result.isDefined) + return result + } + None + } + /** * Finds all actors that are subtypes of the class passed in as the Manifest argument and supproting passed message. */ - def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): List[ActorRef] = + def actorsFor[T <: Actor](message: Any)(implicit manifest: Manifest[T] ): Array[ActorRef] = filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass) && a.isDefinedAt(message)) /** * Finds all actors that satisfy a predicate. */ - def filter(p: ActorRef => Boolean): List[ActorRef] = { + def filter(p: ActorRef => Boolean): Array[ActorRef] = { val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { @@ -64,37 +80,34 @@ object ActorRegistry extends ListenerManagement { all += actorId } } - all.toList + all.toArray } /** * Finds all actors that are subtypes of the class passed in as the Manifest argument. */ - def actorsFor[T <: Actor](implicit manifest: Manifest[T]): List[ActorRef] = - filter(a => manifest.erasure.isAssignableFrom(a.actor.getClass)) + def actorsFor[T <: Actor](implicit manifest: Manifest[T]): Array[ActorRef] = + actorsFor[T](manifest.erasure.asInstanceOf[Class[T]]) /** * Finds any actor that matches T. */ def actorFor[T <: Actor](implicit manifest: Manifest[T]): Option[ActorRef] = - actorsFor[T](manifest).headOption + find(a => if(manifest.erasure.isAssignableFrom(a.actor.getClass)) Some(a) else None) /** - * Finds all actors of the exact type specified by the class passed in as the Class argument. + * Finds all actors of type or sub-type specified by the class passed in as the Class argument. */ - def actorsFor[T <: Actor](clazz: Class[T]): List[ActorRef] = { - if (actorsByClassName.containsKey(clazz.getName)) { - actorsByClassName.get(clazz.getName).toArray.toList.asInstanceOf[List[ActorRef]] - } else Nil - } + def actorsFor[T <: Actor](clazz: Class[T]): Array[ActorRef] = + filter(a => clazz.isAssignableFrom(a.actor.getClass)) /** * Finds all actors that has a specific id. */ - def actorsFor(id: String): List[ActorRef] = { + def actorsFor(id: String): Array[ActorRef] = { if (actorsById.containsKey(id)) { - actorsById.get(id).toArray.toList.asInstanceOf[List[ActorRef]] - } else Nil + actorsById.get(id).toArray(Naught) + } else Naught } /** @@ -109,27 +122,26 @@ object ActorRegistry extends ListenerManagement { * Registers an actor in the ActorRegistry. */ def register(actor: ActorRef) = { - // UUID - actorsByUUID.put(actor.uuid, actor) - // ID val id = actor.id if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) - if (actorsById.containsKey(id)) actorsById.get(id).add(actor) + + val set = actorsById get id + if(set ne null) + set add actor else { - val set = new ConcurrentSkipListSet[ActorRef] - set.add(actor) - actorsById.put(id, set) + val newSet = new ConcurrentSkipListSet[ActorRef] + newSet add actor + + val oldSet = actorsById.putIfAbsent(id,newSet) + + //Parry for two simultaneous putIfAbsent(id,newSet) + if(oldSet ne null) + oldSet add actor } - // Class name - val className = actor.actorClassName - if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) - else { - val set = new ConcurrentSkipListSet[ActorRef] - set.add(actor) - actorsByClassName.put(className, set) - } + // UUID + actorsByUUID.put(actor.uuid, actor) // notify listeners foreachListener(_ ! ActorRegistered(actor)) @@ -141,11 +153,11 @@ object ActorRegistry extends ListenerManagement { def unregister(actor: ActorRef) = { actorsByUUID remove actor.uuid - val id = actor.id - if (actorsById.containsKey(id)) actorsById.get(id).remove(actor) + val set = actorsById get actor.id + if (set ne null) + set remove actor - val className = actor.actorClassName - if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).remove(actor) + //FIXME: safely remove set if empty, leaks memory // notify listeners foreachListener(_ ! ActorUnregistered(actor)) @@ -159,7 +171,6 @@ object ActorRegistry extends ListenerManagement { foreach(_.stop) actorsByUUID.clear actorsById.clear - actorsByClassName.clear log.info("All actors have been shut down and unregistered from ActorRegistry") } } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 836dc0ea86..1f03c1eba2 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -82,7 +82,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat override def register(actorRef: ActorRef) = { // The actor will need a ConcurrentLinkedDeque based mailbox - if( actorRef.mailbox == null ) { + if( actorRef.mailbox eq null ) { actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() } super.register(actorRef) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index a08348e461..e0212572b2 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -19,6 +19,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.timeout.ReadTimeoutHandler import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} +import org.jboss.netty.handler.ssl.SslHandler import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet} @@ -251,21 +252,28 @@ class RemoteClientPipelineFactory(name: String, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) + + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) + + val engine = RemoteServerSslContext.client.createSSLEngine() + engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? + engine.setUseClientMode(true) + + val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val zipCodec = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) - //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) - case _ => None + val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) + case _ => (join(),join()) } + val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages: Array[ChannelHandler] = - zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) - .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) + val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) + new StaticChannelPipeline(stages: _*) } } @@ -344,9 +352,24 @@ class RemoteClientHandler(val name: String, } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.listeners.toArray.foreach(l => - l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port)) - log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) + def connect = { + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientConnected(client.hostname, client.port)) + log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) + } + + if(RemoteServer.SECURE){ + val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + sslHandler.handshake().addListener( new ChannelFutureListener { + def operationComplete(future : ChannelFuture) : Unit = { + if(future.isSuccess) + connect + //else + //FIXME: What is the correct action here? + } + }) + } else { + connect + } } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index f3b548cd33..db4490761b 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -22,6 +22,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} +import org.jboss.netty.handler.ssl.SslHandler + import scala.collection.mutable.Map @@ -73,6 +75,32 @@ object RemoteServer { level } + val SECURE = { + if(config.getBool("akka.remote.ssl.service",false)){ + + val properties = List( + ("key-store-type" ,"keyStoreType"), + ("key-store" ,"keyStore"), + ("key-store-pass" ,"keyStorePassword"), + ("trust-store-type","trustStoreType"), + ("trust-store" ,"trustStore"), + ("trust-store-pass","trustStorePassword") + ).map(x => ("akka.remote.ssl." + x._1,"javax.net.ssl."+x._2)) + + //If property is not set, and we have a value from our akka.conf, use that value + for{ p <- properties if System.getProperty(p._2) eq null + c <- config.getString(p._1) + } System.setProperty(p._2,c) + + if(config.getBool("akka.remote.ssl.debug",false)) + System.setProperty("javax.net.debug","ssl") + + true + } + else + false + } + object Address { def apply(hostname: String, port: Int) = new Address(hostname, port) } @@ -282,7 +310,23 @@ class RemoteServer extends Logging { } } -case class Codec(encoder: ChannelHandler, decoder: ChannelHandler) +object RemoteServerSslContext { + import javax.net.ssl.SSLContext + + val (client,server) = { + val protocol = "TLS" + //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") + //val store = KeyStore.getInstance("JKS") + + val s = SSLContext.getInstance(protocol) + s.init(null,null,null) + + val c = SSLContext.getInstance(protocol) + c.init(null,null,null) + + (c,s) + } +} /** * @author Jonas Bonér @@ -296,20 +340,26 @@ class RemoteServerPipelineFactory( import RemoteServer._ def getPipeline: ChannelPipeline = { - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) - val protobufEnc = new ProtobufEncoder - val zipCodec = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) - //case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder)) - case _ => None + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) + + val engine = RemoteServerSslContext.server.createSSLEngine() + engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? + engine.setUseClientMode(false) + + val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) + val protobufEnc = new ProtobufEncoder + val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) + case _ => (join(),join()) } + val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors) - val stages: Array[ChannelHandler] = - zipCodec.map(codec => Array(codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteServer)) - .getOrElse(Array(lenDec, protobufDec, lenPrep, protobufEnc, remoteServer)) + val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) + new StaticChannelPipeline(stages: _*) } } @@ -335,6 +385,23 @@ class RemoteServerHandler( override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) { openChannels.add(ctx.getChannel) } + + override def channelConnected(ctx : ChannelHandlerContext, e : ChannelStateEvent) { + if(RemoteServer.SECURE) { + val sslHandler : SslHandler = ctx.getPipeline.get(classOf[SslHandler]) + + // Begin handshake. + sslHandler.handshake().addListener( new ChannelFutureListener { + def operationComplete(future : ChannelFuture) : Unit = { + if(future.isSuccess) + openChannels.add(future.getChannel) + else + future.getChannel.close + } + }) + } + } + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java index 8040e1394f..ed8a67ab13 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SampleUntypedActor.java @@ -36,7 +36,7 @@ public class SampleUntypedActor extends UntypedActor { } else if (msg.equals("ForwardMessage")) { // Retreive an actor from the ActorRegistry by ID and get an ActorRef back - ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id").head(); + ActorRef actorRef = ActorRegistry.actorsFor("some-actor-id")[0]; // Wrap the ActorRef in an UntypedActorRef and forward the message to this actor UntypedActorRef.wrap(actorRef).forward(msg, self); diff --git a/akka-core/src/test/scala/misc/ActorRegistrySpec.scala b/akka-core/src/test/scala/misc/ActorRegistrySpec.scala index 6914472e2c..61626e9db3 100644 --- a/akka-core/src/test/scala/misc/ActorRegistrySpec.scala +++ b/akka-core/src/test/scala/misc/ActorRegistrySpec.scala @@ -3,6 +3,7 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ +import java.util.concurrent.{CyclicBarrier, TimeUnit, CountDownLatch} object ActorRegistrySpec { var record = "" @@ -76,6 +77,17 @@ class ActorRegistrySpec extends JUnitSuite { actor.stop } + @Test def shouldFindThingsFromActorRegistry { + ActorRegistry.shutdownAll + val actor = actorOf[TestActor] + actor.start + val found = ActorRegistry.find(a => if(a.actor.isInstanceOf[TestActor]) Some(a) else None) + assert(found.isDefined) + assert(found.get.actor.isInstanceOf[TestActor]) + assert(found.get.id === "MyID") + actor.stop + } + @Test def shouldGetActorsByIdFromActorRegistry { ActorRegistry.shutdownAll val actor1 = actorOf[TestActor] @@ -203,4 +215,41 @@ class ActorRegistrySpec extends JUnitSuite { ActorRegistry.unregister(actor2) assert(ActorRegistry.actors.size === 0) } + + @Test def shouldBeAbleToRegisterActorsConcurrently { + ActorRegistry.shutdownAll + + val latch = new CountDownLatch(3) + val barrier = new CyclicBarrier(3) + + def mkTestActor(i:Int) = actorOf( new Actor { + self.id = i.toString + def receive = { case _ => } + }) + + def mkTestActors = for(i <- 1 to 10;j <- 1 to 1000) yield mkTestActor(i) + + def mkThread(actors: Iterable[ActorRef]) = new Thread { + start + override def run { + barrier.await + actors foreach { _.start } + latch.countDown + } + } + + val testActors1 = mkTestActors + val testActors2 = mkTestActors + val testActors3 = mkTestActors + + mkThread(testActors1) + mkThread(testActors2) + mkThread(testActors3) + + assert(latch.await(30,TimeUnit.SECONDS) === true) + + for(i <- 1 to 10) { + assert(ActorRegistry.actorsFor(i.toString).length === 3000) + } + } } diff --git a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala index b361fbb16b..d5358a7d89 100644 --- a/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala +++ b/akka-samples/akka-sample-lift/src/main/scala/akka/SimpleService.scala @@ -64,7 +64,7 @@ object SimpleRestService extends RestHelper { case Get("liftcount" :: _, req) => //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorsFor(classOf[SimpleServiceActor]).headOption; + val result = for( a <- ActorRegistry.actorFor[SimpleServiceActor]; r <- (a !! "Tick").as[Node] ) yield r //Return either the resulting NodeSeq or a default one @@ -85,7 +85,7 @@ object SimpleRestService extends RestHelper { case Get("persistentliftcount" :: _, req) => //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a Node back - val result = for( a <- ActorRegistry.actorsFor(classOf[PersistentServiceActor]).headOption; + val result = for( a <- ActorRegistry.actorFor[PersistentServiceActor]; r <- (a !! "Tick").as[Node] ) yield r //Return either the resulting NodeSeq or a default one diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java index cd382ae6ec..d9b41cd136 100644 --- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/Boot.java @@ -15,10 +15,12 @@ public class Boot { new Component[] { new Component( SimpleService.class, + SimpleServiceImpl.class, new LifeCycle(new Permanent()), 1000), new Component( PersistentSimpleService.class, + PersistentSimpleServiceImpl.class, new LifeCycle(new Permanent()), 1000) }).supervise(); diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index fc96bba182..c3b71a3fdf 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -16,7 +16,7 @@ import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} -import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor +import se.scalablesolutions.akka.actor.ActorRegistry.actorFor import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} @@ -53,7 +53,7 @@ class SimpleService { def count = { //Fetch the first actor of type SimpleServiceActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + val result = for{a <- actorFor[SimpleServiceActor] r <- (a !! "Tick").as[NodeSeq]} yield r //Return either the resulting NodeSeq or a default one result getOrElse Error in counter @@ -108,7 +108,7 @@ class PersistentSimpleService { def count = { //Fetch the first actor of type PersistentSimpleServiceActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + val result = for{a <- actorFor[PersistentSimpleServiceActor] r <- (a !! "Tick").as[NodeSeq]} yield r //Return either the resulting NodeSeq or a default one result getOrElse Error in counter @@ -155,7 +155,7 @@ class Chat { val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) //Fetch the first actor of type ChatActor //Send it the "Tick" message and expect a NodeSeq back - val result = for{a <- actorsFor(classOf[ChatActor]).headOption + val result = for{a <- actorFor[ChatActor] r <- (a !! msg).as[String]} yield r //Return either the resulting String or a default one result getOrElse "System__error" diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index e5c8029eb8..02af6174c6 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} import se.scalablesolutions.akka.stm.TransactionalMap -import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor +import se.scalablesolutions.akka.actor.ActorRegistry.actorFor class Boot { val factory = SupervisorFactory( @@ -122,7 +122,7 @@ class SecureTickService { def tick = { //Fetch the first actor of type PersistentSimpleServiceActor //Send it the "Tick" message and expect a NdeSeq back - val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption + val result = for{a <- actorFor[SecureTickActor] r <- (a !! "Tick").as[Integer]} yield r //Return either the resulting NodeSeq or a default one result match { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index ec5f85355f..3b80953fe6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -75,6 +75,23 @@ akka { compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 + ssl { + service = off #on / off + + #You can either use java command-line options or use the settings below + + #key-store-type = "pkcs12" #Same as -Djavax.net.ssl.keyStoreType=pkcs12 + #key-store = "yourcertificate.p12" #Same as -Djavax.net.ssl.keyStore=yourcertificate.p12 + #key-store-pass = "$PASS" #Same as -Djavax.net.ssl.keyStorePassword=$PASS + + #trust-store-type = "jks" #Same as -Djavax.net.ssl.trustStoreType=jks + #trust-store = "your.keystore" #Same as -Djavax.net.ssl.trustStore=your.keystore + #trust-store-pass = "$PASS" #-Djavax.net.ssl.trustStorePassword=$PASS + + #This can be useful for debugging + debug = off #if on, very verbose debug, same as -Djavax.net.debug=ssl + } + cluster { service = on name = "default" # The name of the cluster diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index e63bfc573f..02f72d7f0e 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -347,6 +347,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // testing val junit = Dependencies.junit val scalatest = Dependencies.scalatest + + override def bndImportPackage = "javax.transaction;version=1.1" :: super.bndImportPackage.toList } // ------------------------------------------------------------------------------------------------------------------- diff --git a/scripts/run_akka.sh b/scripts/run_akka.sh index 2d87a08148..f2ededd90f 100755 --- a/scripts/run_akka.sh +++ b/scripts/run_akka.sh @@ -1,6 +1,6 @@ #!/bin/bash cd $AKKA_HOME -VERSION=akka_2.8.0.RC3-0.10 +VERSION=akka_2.8.0-0.10 TARGET_DIR=dist/$VERSION/$1 shift 1 VMARGS=$@