Chat sample now compiles with newActor[TYPE]

This commit is contained in:
Jonas Bonér 2010-05-02 10:50:45 +02:00
commit 01892c2b3f
6 changed files with 49 additions and 51 deletions

View file

@ -72,7 +72,7 @@ object AMQP {
shutdownListener, initReconnectDelay,
passive, durable, autoDelete, configurationArguments)
def stopConnection(connection: FaultTolerantConnectionActor) = supervisor.stopConnection(connection)
def stopConnection(connection: ActorID) = supervisor.stopConnection(connection)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -80,7 +80,7 @@ object AMQP {
class AMQPSupervisor extends Actor with Logging {
import scala.collection.JavaConversions._
private val connections = new ConcurrentHashMap[FaultTolerantConnectionActor, FaultTolerantConnectionActor]
private val connections = new ConcurrentHashMap[ActorID, ActorID]
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Throwable])
@ -132,7 +132,7 @@ object AMQP {
consumer
}
def stopConnection(connection: FaultTolerantConnectionActor) = {
def stopConnection(connection: ActorID) = {
connection ! Stop
unlink(connection)
connections.remove(connection)
@ -360,7 +360,6 @@ object AMQP {
val autoDelete: Boolean,
val configurationArguments: Map[java.lang.String, Object])
extends FaultTolerantConnectionActor {
consumer: Consumer =>
import scala.collection.JavaConversions._
@ -456,7 +455,7 @@ object AMQP {
log.error(
cause, "Delivery of message to MessageConsumerListener [%s] failed",
listener.toString(exchangeName))
consumer ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
self ! Failure(cause) // pass on and re-throw exception in consumer actor to trigger restart and reconnect
}
}
@ -473,7 +472,7 @@ object AMQP {
log.warning(
"MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
listener.toString(exchangeName), signal.getReference, signal.getReason)
consumer ! UnregisterMessageConsumerListener(listener)
self ! UnregisterMessageConsumerListener(listener)
}
}
})
@ -592,10 +591,10 @@ object AMQP {
} catch {
case e: Exception =>
val waitInMillis = delay * 2
val self = this
val outerActorID = self
log.debug("Trying to reconnect to AMQP server in %n milliseconds [%s]", waitInMillis, this)
reconnectionTimer.schedule(new TimerTask() {
override def run = self ! Reconnect(waitInMillis)
override def run = outerActorID ! Reconnect(waitInMillis)
}, delay)
}
}

View file

@ -21,7 +21,6 @@ class JGroupsClusterActor extends BasicClusterActor {
override def init = {
super.init
log debug "Initiating JGroups-based cluster actor"
val me = this
isActive = true
// Set up the JGroups local endpoint
@ -32,13 +31,13 @@ class JGroupsClusterActor extends BasicClusterActor {
def setState(state: Array[Byte]): Unit = ()
def receive(m: JG_MSG): Unit =
if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) me ! Message(m.getSrc,m.getRawBuffer)
if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) self ! Message(m.getSrc,m.getRawBuffer)
def viewAccepted(view: JG_VIEW): Unit =
if (isActive) me ! View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress)
if (isActive) self ! View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress)
def suspect(a: Address): Unit =
if (isActive) me ! Zombie(a)
if (isActive) self ! Zombie(a)
def block: Unit =
log debug "UNSUPPORTED: JGroupsClusterActor::block" //TODO HotSwap to a buffering body

View file

@ -1,6 +1,7 @@
package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.{Actor, ActorID}
import se.scalablesolutions.akka.actor.Actor._
object Patterns {
type PF[A, B] = PartialFunction[A, B]
@ -22,21 +23,20 @@ object Patterns {
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator
def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = newActor(() => new Actor with LoadBalancer {
val seq = actors
}
})
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor =
new Actor with Dispatcher {
def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = newActor(() => new Actor with Dispatcher {
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
}
})
def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher {
def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher {
def routes = routing
}
})
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor =
def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID =
dispatcherActor({case _ => actorToLog}, logger)
}
@ -44,7 +44,7 @@ trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, Actor]
protected def routes: PartialFunction[Any, ActorID]
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
@ -56,7 +56,7 @@ trait Dispatcher { self: Actor =>
}
trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[Actor]
protected def seq: InfiniteIterator[ActorID]
protected def routes = { case x if seq.hasNext => seq.next }
}
@ -75,11 +75,11 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
}
}
class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] {
class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] {
def hasNext = items != Nil
def next = {
def actorWithSmallestMailbox(a1: Actor, a2: Actor) = {
def actorWithSmallestMailbox(a1: ActorID, a2: ActorID) = {
if (a1.mailboxSize < a2.mailboxSize) a1 else a2
}
items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2))
@ -87,13 +87,13 @@ class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator
}
sealed trait ListenerMessage
case class Listen(listener : Actor) extends ListenerMessage
case class Deafen(listener : Actor) extends ListenerMessage
case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage
case class Listen(listener : ActorID) extends ListenerMessage
case class Deafen(listener : ActorID) extends ListenerMessage
case class WithListeners(f : Set[ActorID] => Unit) extends ListenerMessage
trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[Actor]())
private lazy val listeners = Agent(Set[ActorID]())
protected def listenerManagement : PartialFunction[Any,Unit] = {
case Listen(l) => listeners( _ + l)

View file

@ -18,7 +18,8 @@ Then to run the sample:
- Run 'sbt console' to start up a REPL (interpreter).
4. In the first REPL you get execute:
- scala> import sample.chat._
- scala> ChatService.start
- scala> import se.scalablesolutions.akka.actor.Actor._
- scala> val chatService = newActor[ChatService].start
5. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run

View file

@ -6,7 +6,7 @@ package sample.chat
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorID, RemoteActor}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
@ -34,7 +34,8 @@ Then to run the sample:
- Run 'sbt console' to start up a REPL (interpreter).
2. In the first REPL you get execute:
- scala> import sample.chat._
- scala> ChatService.start
- scala> import se.scalablesolutions.akka.actor.Actor._
- scala> val chatService = newActor[ChatService].start
3. In the second REPL you get execute:
- scala> import sample.chat._
- scala> Runner.run
@ -59,7 +60,6 @@ case class ChatMessage(from: String, message: String) extends Event
* Chat client.
*/
class ChatClient(val name: String) {
import Actor.Sender.Self
val chat = RemoteClient.actorFor("chat:service", "localhost", 9999)
def login = chat ! Login(name)
@ -71,7 +71,7 @@ class ChatClient(val name: String) {
/**
* Internal chat client session.
*/
class Session(user: String, storage: Actor) extends Actor {
class Session(user: String, storage: ActorID) extends Actor {
private val loginTime = System.currentTimeMillis
private var userLog: List[String] = Nil
@ -106,14 +106,10 @@ class RedisChatStorage extends ChatStorage {
def receive = {
case msg @ ChatMessage(from, message) =>
log.debug("New chat message [%s]", message)
atomic {
chatLog + message.getBytes("UTF-8")
}
atomic { chatLog + message.getBytes("UTF-8") }
case GetChatLog(_) =>
val messageList = atomic {
chatLog.map(bytes => new String(bytes, "UTF-8")).toList
}
val messageList = atomic { chatLog.map(bytes => new String(bytes, "UTF-8")).toList }
reply(ChatLog(messageList))
}
@ -127,13 +123,13 @@ class RedisChatStorage extends ChatStorage {
*/
trait SessionManagement { this: Actor =>
val storage: ChatStorage // needs someone to provide the ChatStorage
val sessions = new HashMap[String, Actor]
val storage: ActorID // needs someone to provide the ChatStorage
val sessions = new HashMap[String, ActorID]
protected def sessionManagement: PartialFunction[Any, Unit] = {
case Login(username) =>
log.info("User [%s] has logged in", username)
val session = new Session(username, storage)
val session = newActor(() => new Session(username, storage))
session.start
sessions += (username -> session)
@ -154,7 +150,7 @@ trait SessionManagement { this: Actor =>
* Uses self-type annotation (this: Actor =>) to declare that it needs to be mixed in with an Actor.
*/
trait ChatManagement { this: Actor =>
val sessions: HashMap[String, Actor] // needs someone to provide the Session map
val sessions: HashMap[String, ActorID] // needs someone to provide the Session map
protected def chatManagement: PartialFunction[Any, Unit] = {
case msg @ ChatMessage(from, _) => sessions(from) ! msg
@ -166,7 +162,7 @@ trait ChatManagement { this: Actor =>
* Creates and links a RedisChatStorage.
*/
trait RedisChatStorageFactory { this: Actor =>
val storage: ChatStorage = spawnLink[RedisChatStorage] // starts and links ChatStorage
val storage = spawnLink[RedisChatStorage] // starts and links ChatStorage
}
/**
@ -176,7 +172,7 @@ trait ChatServer extends Actor {
faultHandler = Some(OneForOneStrategy(5, 5000))
trapExit = List(classOf[Exception])
val storage: ChatStorage
val storage: ActorID
log.info("Chat service is starting up...")
@ -197,18 +193,21 @@ trait ChatServer extends Actor {
}
/**
* Object encapsulating the full Chat Service.
* Class encapsulating the full Chat Service.
* Start service by invoking:
* <pre>
* val chatService = Actor.newActor[ChatService].start
* </pre>
*/
object ChatService extends
class ChatService extends
ChatServer with
SessionManagement with
ChatManagement with
RedisChatStorageFactory {
override def start: Actor = {
override def start {
super.start
RemoteNode.start("localhost", 9999)
RemoteNode.register("chat:service", this)
this
}
}

View file

@ -263,7 +263,7 @@ trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] w
}
//Schedule the invalidation of nonces
Scheduler.schedule(this, InvalidateNonces, noncePurgeInterval, noncePurgeInterval, TimeUnit.MILLISECONDS)
Scheduler.schedule(self, InvalidateNonces, noncePurgeInterval, noncePurgeInterval, TimeUnit.MILLISECONDS)
//authenticate or invalidate nonces
override def receive = authenticate orElse invalidateNonces