diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 7bfd9edc7d..3dabce5023 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -72,7 +72,7 @@ object AMQP { shutdownListener, initReconnectDelay, passive, durable, autoDelete, configurationArguments) - def stopConnection(connection: FaultTolerantConnectionActor) = supervisor.stopConnection(connection) + def stopConnection(connection: ActorID) = supervisor.stopConnection(connection) /** * @author Jonas Bonér @@ -80,7 +80,7 @@ object AMQP { class AMQPSupervisor extends Actor with Logging { import scala.collection.JavaConversions._ - private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor] + private val connections = new ConcurrentHashMap[ActorID, ActorID] faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Throwable]) @@ -132,7 +132,7 @@ object AMQP { consumer } - def stopConnection(connection: FaultTolerantConnectionActor) = { + def stopConnection(connection: ActorID) = { connection ! Stop unlink(connection) connections.remove(connection) @@ -360,7 +360,6 @@ object AMQP { val autoDelete: Boolean, val configurationArguments: Map[java.lang.String, Object]) extends FaultTolerantConnectionActor { - consumer: Consumer => import scala.collection.JavaConversions._ @@ -456,7 +455,7 @@ object AMQP { log.error( cause, "Delivery of message to MessageConsumerListener [%s] failed", listener.toString(exchangeName)) - consumer ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect + self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect } } @@ -473,7 +472,7 @@ object AMQP { log.warning( "MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason) - consumer ! UnregisterMessageConsumerListener(listener) + self ! UnregisterMessageConsumerListener(listener) } } }) @@ -592,10 +591,10 @@ object AMQP { } catch { case e: Exception => val waitInMillis = delay * 2 - val self = this + val outerActorID = self log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this) reconnectionTimer.schedule(new TimerTask() { - override def run = self ! Reconnect(waitInMillis) + override def run = outerActorID ! Reconnect(waitInMillis) }, delay) } } diff --git a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala index 491493fdea..5fce79bfae 100644 --- a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala +++ b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala @@ -21,7 +21,6 @@ class JGroupsClusterActor extends BasicClusterActor { override def init = { super.init log debug "Initiating JGroups-based cluster actor" - val me = this isActive = true // Set up the JGroups local endpoint @@ -32,13 +31,13 @@ class JGroupsClusterActor extends BasicClusterActor { def setState(state: Array[Byte]): Unit = () def receive(m: JG_MSG): Unit = - if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) me ! Message(m.getSrc,m.getRawBuffer) + if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) self ! Message(m.getSrc,m.getRawBuffer) def viewAccepted(view: JG_VIEW): Unit = - if (isActive) me ! View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress) + if (isActive) self ! View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress) def suspect(a: Address): Unit = - if (isActive) me ! Zombie(a) + if (isActive) self ! Zombie(a) def block: Unit = log debug "UNSUPPORTED: JGroupsClusterActor::block" //TODO HotSwap to a buffering body diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index d8a49c74e3..25f2739619 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -1,6 +1,7 @@ package se.scalablesolutions.akka.patterns -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Actor, ActorID} +import se.scalablesolutions.akka.actor.Actor._ object Patterns { type PF[A, B] = PartialFunction[A, B] @@ -22,21 +23,20 @@ object Patterns { filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) //FIXME 2.8, use default params with CyclicIterator - def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { + def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = newActor(() => new Actor with LoadBalancer { val seq = actors - } + }) - def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = - new Actor with Dispatcher { + def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = newActor(() => new Actor with Dispatcher { override def transform(msg: Any) = msgTransformer(msg) def routes = routing - } + }) - def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher { + def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { def routes = routing - } + }) - def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = + def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = dispatcherActor({case _ => actorToLog}, logger) } @@ -44,7 +44,7 @@ trait Dispatcher { self: Actor => protected def transform(msg: Any): Any = msg - protected def routes: PartialFunction[Any, Actor] + protected def routes: PartialFunction[Any, ActorID] protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => @@ -56,7 +56,7 @@ trait Dispatcher { self: Actor => } trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[Actor] + protected def seq: InfiniteIterator[ActorID] protected def routes = { case x if seq.hasNext => seq.next } } @@ -75,11 +75,11 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { } } -class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { +class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { def hasNext = items != Nil def next = { - def actorWithSmallestMailbox(a1: Actor, a2: Actor) = { + def actorWithSmallestMailbox(a1: ActorID, a2: ActorID) = { if (a1.mailboxSize < a2.mailboxSize) a1 else a2 } items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2)) @@ -87,13 +87,13 @@ class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator } sealed trait ListenerMessage -case class Listen(listener : Actor) extends ListenerMessage -case class Deafen(listener : Actor) extends ListenerMessage -case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage +case class Listen(listener : ActorID) extends ListenerMessage +case class Deafen(listener : ActorID) extends ListenerMessage +case class WithListeners(f : Set[ActorID] => Unit) extends ListenerMessage trait Listeners { self : Actor => import se.scalablesolutions.akka.actor.Agent - private lazy val listeners = Agent(Set[Actor]()) + private lazy val listeners = Agent(Set[ActorID]()) protected def listenerManagement : PartialFunction[Any,Unit] = { case Listen(l) => listeners( _ + l) diff --git a/akka-samples/akka-sample-chat/README b/akka-samples/akka-sample-chat/README index 66e54e3d44..cf787bd4d2 100644 --- a/akka-samples/akka-sample-chat/README +++ b/akka-samples/akka-sample-chat/README @@ -18,7 +18,8 @@ Then to run the sample: - Run 'sbt console' to start up a REPL (interpreter). 4. In the first REPL you get execute: - scala> import sample.chat._ - - scala> ChatService.start + - scala> import se.scalablesolutions.akka.actor.Actor._ + - scala> val chatService = newActor[ChatService].start 5. In the second REPL you get execute: - scala> import sample.chat._ - scala> Runner.run diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index 51c4c9f91c..38251efc4c 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -6,7 +6,7 @@ package sample.chat import scala.collection.mutable.HashMap -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor} +import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorID, RemoteActor} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient} import se.scalablesolutions.akka.persistence.common.PersistentVector import se.scalablesolutions.akka.persistence.redis.RedisStorage @@ -34,7 +34,8 @@ Then to run the sample: - Run 'sbt console' to start up a REPL (interpreter). 2. In the first REPL you get execute: - scala> import sample.chat._ - - scala> ChatService.start + - scala> import se.scalablesolutions.akka.actor.Actor._ + - scala> val chatService = newActor[ChatService].start 3. In the second REPL you get execute: - scala> import sample.chat._ - scala> Runner.run @@ -59,7 +60,6 @@ case class ChatMessage(from: String, message: String) extends Event * Chat client. */ class ChatClient(val name: String) { - import Actor.Sender.Self val chat = RemoteClient.actorFor("chat:service", "localhost", 9999) def login = chat ! Login(name) @@ -71,7 +71,7 @@ class ChatClient(val name: String) { /** * Internal chat client session. */ -class Session(user: String, storage: Actor) extends Actor { +class Session(user: String, storage: ActorID) extends Actor { private val loginTime = System.currentTimeMillis private var userLog: List[String] = Nil @@ -106,14 +106,10 @@ class RedisChatStorage extends ChatStorage { def receive = { case msg @ ChatMessage(from, message) => log.debug("New chat message [%s]", message) - atomic { - chatLog + message.getBytes("UTF-8") - } + atomic { chatLog + message.getBytes("UTF-8") } case GetChatLog(_) => - val messageList = atomic { - chatLog.map(bytes => new String(bytes, "UTF-8")).toList - } + val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList } reply(ChatLog(messageList)) } @@ -127,13 +123,13 @@ class RedisChatStorage extends ChatStorage { */ trait SessionManagement { this: Actor => - val storage: ChatStorage // needs someone to provide the ChatStorage - val sessions = new HashMap[String, Actor] + val storage: ActorID // needs someone to provide the ChatStorage + val sessions = new HashMap[String, ActorID] protected def sessionManagement: PartialFunction[Any, Unit] = { case Login(username) => log.info("User [%s] has logged in", username) - val session = new Session(username, storage) + val session = newActor(() => new Session(username, storage)) session.start sessions += (username -> session) @@ -154,7 +150,7 @@ trait SessionManagement { this: Actor => * Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor. */ trait ChatManagement { this: Actor => - val sessions: HashMap[String, Actor] // needs someone to provide the Session map + val sessions: HashMap[String, ActorID] // needs someone to provide the Session map protected def chatManagement: PartialFunction[Any, Unit] = { case msg @ ChatMessage(from, _) => sessions(from) ! msg @@ -166,7 +162,7 @@ trait ChatManagement { this: Actor => * Creates and links a RedisChatStorage. */ trait RedisChatStorageFactory { this: Actor => - val storage: ChatStorage = spawnLink[RedisChatStorage] // starts and links ChatStorage + val storage = spawnLink[RedisChatStorage] // starts and links ChatStorage } /** @@ -176,7 +172,7 @@ trait ChatServer extends Actor { faultHandler = Some(OneForOneStrategy(5, 5000)) trapExit = List(classOf[Exception]) - val storage: ChatStorage + val storage: ActorID log.info("Chat service is starting up...") @@ -197,18 +193,21 @@ trait ChatServer extends Actor { } /** - * Object encapsulating the full Chat Service. + * Class encapsulating the full Chat Service. + * Start service by invoking: + *
+ * val chatService = Actor.newActor[ChatService].start
+ * 
*/ -object ChatService extends +class ChatService extends ChatServer with SessionManagement with ChatManagement with RedisChatStorageFactory { - override def start: Actor = { + override def start { super.start RemoteNode.start("localhost", 9999) RemoteNode.register("chat:service", this) - this } } diff --git a/akka-security/src/main/scala/Security.scala b/akka-security/src/main/scala/Security.scala index 634fc400ac..a3ba2a2cdf 100644 --- a/akka-security/src/main/scala/Security.scala +++ b/akka-security/src/main/scala/Security.scala @@ -263,7 +263,7 @@ trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] w } //Schedule the invalidation of nonces - Scheduler.schedule(this, InvalidateNonces, noncePurgeInterval, noncePurgeInterval, TimeUnit.MILLISECONDS) + Scheduler.schedule(self, InvalidateNonces, noncePurgeInterval, noncePurgeInterval, TimeUnit.MILLISECONDS) //authenticate or invalidate nonces override def receive = authenticate orElse invalidateNonces