Merge branch 'master' of github.com:jboner/akka into ticket-538

This commit is contained in:
ticktock 2010-12-09 10:14:49 -05:00
commit e985ee9765
12 changed files with 191 additions and 45 deletions

View file

@ -995,7 +995,7 @@ class LocalActorRef private[akka] (
"\n\tWill *not* restart actor [{}] anymore." +
"\n\tLast exception causing restart was" +
"\n\t[{}].",
Array(maxNrOfRetries, withinTimeRange, this, reason))
Array[AnyRef](maxNrOfRetries, withinTimeRange, this, reason))
_supervisor.foreach { sup =>
// can supervisor handle the notification?
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)

View file

@ -37,7 +37,7 @@ object Scheduler extends Logging {
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.slf4j.trace(
"Schedule scheduled event\n\tevent = [{}]\n\treceiver = [{}]\n\tinitialDelay = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]",
Array(message, receiver, initialDelay, delay, timeUnit))
Array[AnyRef](message, receiver, initialDelay.asInstanceOf[AnyRef], delay.asInstanceOf[AnyRef], timeUnit))
try {
service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
@ -61,7 +61,7 @@ object Scheduler extends Logging {
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.slf4j.trace(
"Schedule scheduled event\n\trunnable = [{}]\n\tinitialDelay = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]",
Array(runnable, initialDelay, delay, timeUnit))
Array[AnyRef](runnable, initialDelay.asInstanceOf[AnyRef], delay.asInstanceOf[AnyRef], timeUnit))
try {
service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
@ -76,7 +76,7 @@ object Scheduler extends Logging {
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.slf4j.trace(
"Schedule one-time event\n\tevent = [{}]\n\treceiver = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]",
Array(message, receiver, delay, timeUnit))
Array[AnyRef](message, receiver, delay.asInstanceOf[AnyRef], timeUnit))
try {
service.schedule(
new Runnable { def run = receiver ! message },
@ -100,7 +100,7 @@ object Scheduler extends Logging {
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.slf4j.trace(
"Schedule one-time event\n\trunnable = [{}]\n\tdelay = [{}]\n\ttimeUnit = [{}]",
Array(runnable, delay, timeUnit))
Array[AnyRef](runnable, delay.asInstanceOf[AnyRef], timeUnit))
try {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {

View file

@ -106,7 +106,7 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailbox): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
@ -120,7 +120,7 @@ class ExecutorBasedEventDrivenDispatcher(
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailbox): AnyRef =
createMailbox(mailboxType.mailboxImplClassname, actorRef)
private[akka] def start = log.slf4j.debug("Starting up {}\n\twith throughput [{}]", this, throughput)

View file

@ -194,7 +194,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailbox): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
@ -224,7 +224,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef =
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailbox): AnyRef =
createMailbox(mailboxType.mailboxImplClassname, actorRef)
private[akka] override def register(actorRef: ActorRef) = {

View file

@ -166,12 +166,12 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue =
def suspend(actorRef: ActorRef) = mailbox(actorRef).suspend
def resume(actorRef:ActorRef) = mailbox(actorRef).resume
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = null.asInstanceOf[AnyRef]
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailbox): AnyRef = null.asInstanceOf[AnyRef]
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef = null.asInstanceOf[AnyRef]
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailbox): AnyRef = null.asInstanceOf[AnyRef]
override def toString = "HawtDispatcher"
}

View file

@ -31,17 +31,17 @@ trait MessageQueue {
*/
sealed trait MailboxType
abstract class TransientMailboxType(val blocking: Boolean = false) extends MailboxType
case class UnboundedMailbox(block: Boolean = false) extends TransientMailboxType(block)
abstract class TransientMailbox(val blocking: Boolean = false) extends MailboxType
case class UnboundedMailbox(block: Boolean = false) extends TransientMailbox(block)
case class BoundedMailbox(
block: Boolean = false,
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends TransientMailboxType(block) {
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends TransientMailbox(block) {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
case class DurableMailboxType(mailboxImplClassname: String) extends MailboxType
case class DurableMailbox(mailboxImplClassname: String) extends MailboxType
class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {
@ -83,17 +83,17 @@ trait MailboxFactory {
*/
private[akka] def createMailbox(actorRef: ActorRef): AnyRef =
mailboxType.getOrElse(throw new IllegalStateException("No mailbox type defined")) match {
case mb: TransientMailboxType => createTransientMailbox(actorRef, mb)
case mb: DurableMailboxType => createDurableMailbox(actorRef, mb)
case mb: TransientMailbox => createTransientMailbox(actorRef, mb)
case mb: DurableMailbox => createDurableMailbox(actorRef, mb)
}
/**
* Creates and returns a transient mailbox for the given actor.
*/
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef
private[akka] def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailbox): AnyRef
/**
* Creates and returns a durable mailbox for the given actor.
*/
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailboxType): AnyRef
private[akka] def createDurableMailbox(actorRef: ActorRef, mailboxType: DurableMailbox): AnyRef
}

View file

@ -118,7 +118,7 @@ trait Mist extends Logging {
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
log.slf4j.info("Initializing Akka HTTP on {} with Servlet API {}.{}",Array(server, major, minor))
log.slf4j.info("Initializing Akka HTTP on {} with Servlet API {}.{}",Array[AnyRef](server, major: java.lang.Integer, minor: java.lang.Integer))
_factory = if (major >= 3) {
log.slf4j.info("Supporting Java asynchronous contexts.")

View file

@ -31,6 +31,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
import akka.japi.Creator
/**
* Use this object if you need a single remote server on a specific node.
@ -115,6 +116,13 @@ object RemoteServer {
private val guard = new ReadWriteGuard
private val remoteServers = Map[Address, RemoteServer]()
def serverFor(address: InetSocketAddress): Option[RemoteServer] =
serverFor(address.getHostName, address.getPort)
def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
remoteServers.get(Address(hostname, port))
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
serverFor(address) match {
case Some(server) => server
@ -122,13 +130,6 @@ object RemoteServer {
}
}
private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
serverFor(address.getHostName, address.getPort)
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
remoteServers.get(Address(hostname, port))
}
private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
remoteServers.put(Address(hostname, port), server)
}
@ -138,7 +139,7 @@ object RemoteServer {
}
/**
* Used in REflectiveAccess
* Used in ReflectiveAccess
*/
private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) {
serverFor(address) foreach { _.register(actorRef) }
@ -291,6 +292,12 @@ class RemoteServer extends Logging with ListenerManagement {
*/
def registerTypedPerSessionActor(intfClass: Class[_], factory: => AnyRef) : Unit = registerTypedActor(intfClass.getName, factory)
/**
* Register typed actor by interface name.
* Java API
*/
def registerTypedPerSessionActor(intfClass: Class[_], factory: Creator[AnyRef]) : Unit = registerTypedActor(intfClass.getName, factory)
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
@ -301,6 +308,17 @@ class RemoteServer extends Logging with ListenerManagement {
registerTypedPerSessionActor(id, () => factory, typedActorsFactories)
}
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
* Java API
*/
def registerTypedPerSessionActor(id: String, factory: Creator[AnyRef]): Unit = synchronized {
log.slf4j.debug("Registering server side typed remote session actor with id [{}]", id)
registerTypedPerSessionActor(id, factory.create _, typedActorsFactories)
}
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
@ -327,6 +345,17 @@ class RemoteServer extends Logging with ListenerManagement {
registerPerSession(id, () => factory, actorsFactories)
}
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
* Java API
*/
def registerPerSession(id: String, factory: Creator[ActorRef]): Unit = synchronized {
log.slf4j.debug("Registering server side remote session actor with id [{}]", id)
registerPerSession(id, factory.create _, actorsFactories)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning) {
registry.put(id, actorRef) //TODO change to putIfAbsent
@ -354,7 +383,7 @@ class RemoteServer extends Logging with ListenerManagement {
*/
def unregister(actorRef: ActorRef):Unit = synchronized {
if (_isRunning) {
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array(actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
log.slf4j.debug("Unregistering server side remote actor [{}] with id [{}:{}]", Array[AnyRef](actorRef.actorClass.getName, actorRef.id, actorRef.uuid))
actors.remove(actorRef.id, actorRef)
actorsByUuid.remove(actorRef.uuid, actorRef)
}

View file

@ -241,10 +241,11 @@ object RemoteActorSerialization {
*/
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
import ar._
val host = homeAddress.getHostName
val port = homeAddress.getPort
val home = homeAddress
val host = home.getHostName
val port = home.getPort
Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]", Array(actorClassName, host, port))
Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}]",actorClassName, home)
RemoteServer.getOrCreateServer(homeAddress)
ActorRegistry.registerActorByUuid(homeAddress, uuid.toString, ar)

View file

@ -6,6 +6,26 @@ import akka.util.Logging
import Actor._
/*************************************
Instructions how to run the sample:
* Download Akka distribution.
* Unzip and step into the Akka root dir
* Set AKKA_HOME. For exampe 'export AKKA_HOME=`pwd`
* Then open up two shells and in each run:
* sbt
* > project akka-remote
* > console
* Then paste in the code below into both shells.
Then run:
* ServerInitiatedRemoteActorServer.run in one shell
* ServerInitiatedRemoteActorClient.run in one shell
Have fun.
*************************************/
class HelloWorldActor extends Actor {
self.start

View file

@ -1,26 +1,75 @@
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.config.Supervision._
import akka.config._
object TypedActorRegistrySpec {
trait My
class MyImpl extends TypedActor with My
}
class TypedActorRegistrySpec extends JUnitSuite {
class TypedActorRegistrySpec extends WordSpec with MustMatchers {
import TypedActorRegistrySpec._
@Test def shouldGetTypedActorByClassFromActorRegistry {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
"Typed Actor" should {
val actors = ActorRegistry.typedActorsFor(classOf[My])
assert(actors.length === 1)
"be able to be retreived from the registry by class" in {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
val actors = ActorRegistry.typedActorsFor(classOf[My])
actors.length must be (1)
ActorRegistry.shutdownAll
}
val option = ActorRegistry.typedActorFor[My]
assert(option != null)
assert(option.isDefined)
ActorRegistry.shutdownAll
"be able to be retreived from the registry by manifest" in {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
val option = ActorRegistry.typedActorFor[My]
option must not be (null)
option.isDefined must be (true)
ActorRegistry.shutdownAll
}
"be able to be retreived from the registry by class two times" in {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
val actors1 = ActorRegistry.typedActorsFor(classOf[My])
actors1.length must be (1)
val actors2 = ActorRegistry.typedActorsFor(classOf[My])
actors2.length must be (1)
ActorRegistry.shutdownAll
}
"be able to be retreived from the registry by manifest two times" in {
ActorRegistry.shutdownAll
val my = TypedActor.newInstance[My](classOf[My], classOf[MyImpl], 3000)
val option1 = ActorRegistry.typedActorFor[My]
option1 must not be (null)
option1.isDefined must be (true)
val option2 = ActorRegistry.typedActorFor[My]
option2 must not be (null)
option2.isDefined must be (true)
ActorRegistry.shutdownAll
}
"be able to be retreived from the registry by manifest two times (even when created in supervisor)" in {
ActorRegistry.shutdownAll
val manager = new TypedActorConfigurator
manager.configure(
OneForOneStrategy(classOf[Exception] :: Nil, 3, 1000),
Array(new SuperviseTypedActor(classOf[My], classOf[MyImpl], Permanent, 6000))
).supervise
val option1 = ActorRegistry.typedActorFor[My]
option1 must not be (null)
option1.isDefined must be (true)
val option2 = ActorRegistry.typedActorFor[My]
option2 must not be (null)
option2.isDefined must be (true)
ActorRegistry.shutdownAll
}
}
}

View file

@ -441,7 +441,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val stressTestsEnabled = systemOptional[Boolean]("stress.tests",false)
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject {
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject with McPom {
override def disableCrossPaths = true
lazy val sourceArtifact = Artifact(this.artifactID, "source", "jar", Some("sources"), Nil, None)
lazy val docsArtifact = Artifact(this.artifactID, "doc", "jar", Some("docs"), Nil, None)
@ -450,6 +450,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def packageDocsJar = this.defaultJarPath("-docs.jar")
override def packageSrcJar = this.defaultJarPath("-sources.jar")
override def packageToPublishActions = super.packageToPublishActions ++ Seq(this.packageDocs, this.packageSrc)
override def pomPostProcess(node: scala.xml.Node): scala.xml.Node = mcPom(AkkaParentProject.this.moduleConfigurations)(super.pomPostProcess(node))
/**
* Used for testOptions, possibility to enable the running of integration and or stresstests
@ -492,3 +493,49 @@ trait DeployProject { self: BasicScalaProject =>
trait OSGiProject extends BNDPlugin { self: DefaultProject =>
override def bndExportPackage = Seq("akka.*;version=%s".format(projectVersion.value))
}
trait McPom { self: DefaultProject =>
import scala.xml._
def mcPom(mcs: Set[ModuleConfiguration])(node: Node): Node = {
def cleanUrl(url: String) = url match {
case null => ""
case "" => ""
case u if u endsWith "/" => u
case u => u + "/"
}
val oldRepos = (node \\ "project" \ "repositories" \ "repository").
map( n => cleanUrl((n \ "url").text) -> (n \ "name").text).toList
val newRepos = mcs.filter(_.resolver.isInstanceOf[MavenRepository]).map(m => {
val r = m.resolver.asInstanceOf[MavenRepository]
cleanUrl(r.root) -> r.name
})
val repos = Map((oldRepos ++ newRepos):_*).map( pair =>
<repository>
<id>{pair._2.toSeq.filter(_.isLetterOrDigit).mkString}</id>
<name>{pair._2}</name>
<url>{pair._1}</url>
</repository>
)
def rewrite(pf:PartialFunction[Node,Node])(ns: Seq[Node]): Seq[Node] = for(subnode <- ns) yield subnode match {
case e: Elem =>
if (pf isDefinedAt e) pf(e)
else Elem(e.prefix, e.label, e.attributes, e.scope, rewrite(pf)(e.child):_*)
case other => other
}
val rule: PartialFunction[Node,Node] = if ((node \\ "project" \ "repositories" ).isEmpty) {
case Elem(prefix, "project", attribs, scope, children @ _*) =>
Elem(prefix, "project", attribs, scope, children ++ <repositories>{repos}</repositories>:_*)
} else {
case Elem(prefix, "repositories", attribs, scope, children @ _*) =>
Elem(prefix, "repositories", attribs, scope, repos.toList:_*)
}
rewrite(rule)(node.theSeq)(0)
}
}