Removed Actor.send function

This commit is contained in:
Jonas Bonér 2010-03-30 23:58:50 +02:00
parent 7cf13c7715
commit 19879f3605
11 changed files with 40 additions and 40 deletions

View file

@ -32,13 +32,13 @@ class JGroupsClusterActor extends BasicClusterActor {
def setState(state: Array[Byte]): Unit = ()
def receive(m: JG_MSG): Unit =
if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) me send Message(m.getSrc,m.getRawBuffer)
if (isActive && m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) me ! Message(m.getSrc,m.getRawBuffer)
def viewAccepted(view: JG_VIEW): Unit =
if (isActive) me send View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress)
if (isActive) me ! View(Set[ADDR_T]() ++ view.getMembers - channel.get.getAddress)
def suspect(a: Address): Unit =
if (isActive) me send Zombie(a)
if (isActive) me ! Zombie(a)
def block: Unit =
log debug "UNSUPPORTED: JGroupsClusterActor::block" //TODO HotSwap to a buffering body

View file

@ -81,10 +81,10 @@ class ShoalClusterActor extends BasicClusterActor {
signal.acquire()
if(isActive) {
signal match {
case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage)
case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers - serverName)
case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken)
case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken)
case ms : MessageSignal => me ! Message[ADDR_T](ms.getMemberToken,ms.getMessage)
case jns : JoinNotificationSignal => me ! View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers - serverName)
case fss : FailureSuspectedSignal => me ! Zombie[ADDR_T](fss.getMemberToken)
case fns : FailureNotificationSignal => me ! Zombie[ADDR_T](fns.getMemberToken)
case _ => log.debug("Unhandled signal: [%s]",signal)
}
}

View file

@ -11,9 +11,8 @@ class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
name = classOf[AkkaBroadcaster].getName
val caster = new Actor {
def receive = { case f : Function0[_] => f() }
start
def receive = { case f : Function0[_] => f() }
start
}
override def destroy {
@ -22,6 +21,6 @@ class AkkaBroadcaster extends org.atmosphere.jersey.JerseyBroadcaster {
}
protected override def broadcast(r : AtmosphereResource[_,_], e : AtmosphereResourceEvent[_,_]) = {
caster.send( () => super.broadcast(r,e) )
caster ! (() => super.broadcast(r,e))
}
}

View file

@ -44,8 +44,8 @@ trait BootableCometActorService extends Bootable with Logging {
adapter.setServletInstance(new AkkaServlet)
adapter.setContextPath(uri.getPath)
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
@ -55,19 +55,18 @@ trait BootableCometActorService extends Bootable with Logging {
t.setAdapter(adapter)
t.setEnableAsyncExecution(true)
t.setAsyncHandler(ah)
t.listen
t }
t.listen
t
}
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
}
}
abstract override def onUnload = {
super.onUnload
if (jerseySelectorThread.isDefined) {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
}
super.onUnload
if (jerseySelectorThread.isDefined) {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
}
}
}

View file

@ -466,7 +466,7 @@ trait Actor extends TransactionManagement with Logging {
_remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
$
def isRunning = _isRunning
/**

View file

@ -110,7 +110,7 @@ object ActorRegistry extends Logging {
} else actorsByClassName.put(className, actor :: Nil)
// notify listeners
foreachListener(_ send ActorRegistered(actor))
foreachListener(_ ! ActorRegistered(actor))
}
/**
@ -121,7 +121,7 @@ object ActorRegistry extends Logging {
actorsById remove actor.getId
actorsByClassName remove actor.getClass.getName
// notify listeners
foreachListener(_ send ActorUnregistered(actor))
foreachListener(_ ! ActorUnregistered(actor))
}
/**

View file

@ -152,7 +152,7 @@ abstract class BasicClusterActor extends ClusterActor {
case Papers(x) => remotes = remotes + (src -> Node(x))
case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m)
case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ ! m)
case unknown => log debug ("Unknown message: %s", unknown.toString)
}
@ -212,19 +212,19 @@ abstract class BasicClusterActor extends ClusterActor {
* Registers a local endpoint
*/
def registerLocalNode(hostname: String, port: Int): Unit =
send(RegisterLocalNode(RemoteAddress(hostname, port)))
this ! RegisterLocalNode(RemoteAddress(hostname, port))
/**
* Deregisters a local endpoint
*/
def deregisterLocalNode(hostname: String, port: Int): Unit =
send(DeregisterLocalNode(RemoteAddress(hostname, port)))
this ! DeregisterLocalNode(RemoteAddress(hostname, port))
/**
* Broadcasts the specified message to all Actors of type Class on all known Nodes
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
send(RelayedMessage(to.getName, msg))
this ! RelayedMessage(to.getName, msg)
}
/**

View file

@ -310,7 +310,7 @@ class RemoteServerHandler(
actor.!(message)(Some(remoteActor))
} else {
// couldn't find a way to reply, send the message without a source/sender
actor.send(message)
actor ! message
}
} else {
try {

View file

@ -27,7 +27,7 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
def thread(body: => Unit) = {
val thread = new IsolatedEventBasedThread(body).start
thread send Start
thread ! Start
thread
}
@ -93,9 +93,9 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
private[this] val in = new In(this)
def <<(ref: DataFlowVariable[T]) = in send Set(ref())
def <<(ref: DataFlowVariable[T]) = in ! Set(ref())
def <<(value: T) = in send Set(value)
def <<(value: T) = in ! Set(value)
def apply(): T = {
val ref = value.get
@ -104,13 +104,13 @@ import se.scalablesolutions.akka.dispatch.CompletableFuture
val out = new Out(this)
blockedReaders.offer(out)
val result = out !! Get
out send Exit
out ! Exit
result.getOrElse(throw new DataFlowVariableException(
"Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
}
}
def shutdown = in send Exit
def shutdown = in ! Exit
}
/**

View file

@ -49,7 +49,7 @@ trait Dispatcher { self: Actor =>
protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) =>
if (self.sender.isDefined) routes(a) forward transform(a)
else routes(a) send transform(a)
else routes(a) ! transform(a)
}
def receive = dispatch

View file

@ -201,16 +201,17 @@ class AkkaParent(info: ProjectInfo) extends AkkaDefaults(info) {
// ------------------------------------------------------------
// repositories
val embeddedrepo = "embedded repo" at new File(akkaHome, "embedded-repo").toURI.toString
val m2 = "m2" at "http://download.java.net/maven/2"
val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots"
val scala_tools = "scala-tools" at "http://scala-tools.org/repo-releases"
val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
val databinder = "DataBinder" at "http://databinder.net/repo"
// val configgy = "Configgy" at "http://www.lag.net/repo"
val codehaus = "Codehaus" at "http://repository.codehaus.org"
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val jboss = "jBoss" at "http://repository.jboss.org/maven2"
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
val google = "google" at "http://google-maven-repository.googlecode.com/svn/repository"
val m2 = "m2" at "http://download.java.net/maven/2"
val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots"
// val configgy = "Configgy" at "http://www.lag.net/repo"
// ------------------------------------------------------------
// project defintions
@ -234,7 +235,7 @@ class AkkaParent(info: ProjectInfo) extends AkkaDefaults(info) {
lazy val akka_fun_test = project("akka-fun-test-java", "akka-fun-test-java", new AkkaFunTestProject(_), akka_kernel)
// examples
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_), akka_kernel)
// ------------------------------------------------------------
// subprojects
@ -424,6 +425,7 @@ class AkkaParent(info: ProjectInfo) extends AkkaDefaults(info) {
}
class AkkaSampleRestJavaProject(info: ProjectInfo) extends AkkaDefaults(info) {
val scala_library = "org.scala-lang" % "scala-library" % buildScalaVersion % "compile"
lazy val dist = deployTask(info, deployPath) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying")
}