Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-06-23 10:24:40 +02:00
commit f5541a37c0
12 changed files with 246 additions and 39 deletions

View file

@ -40,7 +40,13 @@
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<artifactId>akka-core_2.8.0.RC3</artifactId>
<version>0.9</version>
<version>0.9.1</version>
<exclusions>
<exclusion>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
@ -54,6 +60,12 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
<version>0.6-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View file

@ -0,0 +1,35 @@
package se.scalablesolutions.akka.api;
import static se.scalablesolutions.akka.actor.ActiveObject.link;
import static se.scalablesolutions.akka.actor.ActiveObject.newInstance;
import org.junit.Assert;
import org.junit.Test;
import se.scalablesolutions.akka.config.OneForOneStrategy;
import junit.framework.TestCase;
/**
* <p>Small misc tests that do not fit anywhere else and does not require a separate testcase</p>
*
* @author johanrask
*
*/
public class MiscActiveObjectTest extends TestCase {
/**
* Verifies that both preRestart and postRestart methods are invoked when
* an actor is restarted
*/
public void testFailingPostRestartInvocation() throws InterruptedException {
SimpleJavaPojo pojo = newInstance(SimpleJavaPojo.class,500);
SimpleJavaPojo supervisor = newInstance(SimpleJavaPojo.class,500);
link(supervisor,pojo,new OneForOneStrategy(3, 2000),new Class[]{Throwable.class});
pojo.throwException();
Thread.sleep(500);
Assert.assertTrue(pojo.pre);
Assert.assertTrue(pojo.post);
}
}

View file

@ -38,7 +38,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
@ -51,7 +51,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@ -59,10 +59,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
@ -72,7 +72,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@ -80,10 +80,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 10000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 10000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");

View file

@ -0,0 +1,36 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
public class SimpleJavaPojo {
public boolean pre = false;
public boolean post = false;
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
@prerestart
public void pre() {
System.out.println("** pre()");
pre = true;
}
@postrestart
public void post() {
System.out.println("** post()");
post = true;
}
public void throwException() {
throw new RuntimeException();
}
}

View file

@ -648,6 +648,7 @@ private[akka] sealed class ActiveObjectAspect {
object Dispatcher {
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
val ZERO_ITEM_OBJECT_ARRAY = Array[Object]()
var crashedActorTl:ThreadLocal[Dispatcher] = new ThreadLocal();
}
/**
@ -655,7 +656,7 @@ object Dispatcher {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Option[RestartCallbacks]) extends Actor {
private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor {
import Dispatcher._
private[actor] var target: Option[AnyRef] = None
@ -663,13 +664,18 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
private var targetClass:Class[_] = _
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired))
self.makeTransactionRequired
self.id = targetClass.getName
this.targetClass = targetClass
target = Some(targetInstance)
context = ctx
val methods = targetInstance.getClass.getDeclaredMethods.toList
@ -735,22 +741,43 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
override def preRestart(reason: Throwable) {
try {
// Since preRestart is called we know that this dispatcher
// is about to be restarted. Put the instance in a thread
// local so the new dispatcher can be initialized with the contents of the
// old.
//FIXME - This should be considered as a workaround.
crashedActorTl.set(this)
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def postRestart(reason: Throwable) {
try {
if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
if (postRestart.isDefined) {
postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def init = {
// Get the crashed dispatcher from thread local and intitialize this actor with the
// contents of the old dispatcher
val oldActor = crashedActorTl.get();
if(oldActor != null) {
initialize(oldActor.targetClass,oldActor.target.get,oldActor.context)
crashedActorTl.set(null)
}
}
override def initTransactionalState = {
try {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
} catch { case e: InvocationTargetException => throw e.getCause }
}
private def serializeArguments(joinPoint: JoinPoint) = {
val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
var unserializable = false

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.comet
import se.scalablesolutions.akka.util.Logging
import java.util.{List => JList}
import javax.servlet.ServletConfig
import javax.servlet.{ServletConfig,ServletContext}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.sun.jersey.spi.container.servlet.ServletContainer
@ -43,14 +43,32 @@ class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProce
* Used by the Akka Kernel to bootstrap REST and Comet.
*/
class AkkaServlet extends AtmosphereServlet with Logging {
import se.scalablesolutions.akka.config.Config.{config => c}
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
lazy val servlet = createRestServlet
protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet {
val servlet = new AtmosphereRestServlet {
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
}
override def getInitParameter(key : String) = Option(super.getInitParameter(key)).getOrElse(initParams.get(key))
override def getInitParameterNames() = {
val names = new java.util.Vector[String]()
val i = initParams.keySet.iterator
while(i.hasNext) names.add(i.next.toString)
val e = super.getInitParameterNames
while(e.hasMoreElements) names.add(e.nextElement.toString)
names.elements
}
/**
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
* Instead we specify what semantics we want in code.

View file

@ -37,10 +37,6 @@ import javax.annotation.security.{DenyAll, PermitAll, RolesAllowed}
import java.security.Principal
import java.util.concurrent.TimeUnit
import net.liftweb.util.{SecurityHelpers, StringHelpers, IoHelpers}
object Enc extends SecurityHelpers with StringHelpers with IoHelpers
case object OK
/**
@ -249,7 +245,7 @@ trait BasicAuthenticationActor extends AuthenticationActor[BasicCredentials] {
* rest-part of the akka config
*/
trait DigestAuthenticationActor extends AuthenticationActor[DigestCredentials] with Logging {
import Enc._
import LiftUtils._
private object InvalidateNonces
@ -483,3 +479,87 @@ trait SpnegoAuthenticationActor extends AuthenticationActor[SpnegoCredentials] w
}
}
/*
* Copyright 2006-2010 WorldWide Conferencing, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
object LiftUtils {
import java.security.{MessageDigest,SecureRandom}
val random = new SecureRandom()
def md5(in: Array[Byte]): Array[Byte] = (MessageDigest.getInstance("MD5")).digest(in)
/**
* Create a random string of a given size
* @param size size of the string to create. Must be a positive or nul integer
* @return the generated string
*/
def randomString(size: Int): String = {
def addChar(pos: Int, lastRand: Int, sb: StringBuilder): StringBuilder = {
if (pos >= size) sb
else {
val randNum = if ((pos % 6) == 0) random.nextInt else lastRand
sb.append((randNum & 0x1f) match {
case n if n < 26 => ('A' + n).toChar
case n => ('0' + (n - 26)).toChar
})
addChar(pos + 1, randNum >> 5, sb)
}
}
addChar(0, 0, new StringBuilder(size)).toString
}
/** encode a Byte array as hexadecimal characters */
def hexEncode(in: Array[Byte]): String = {
val sb = new StringBuilder
val len = in.length
def addDigit(in: Array[Byte], pos: Int, len: Int, sb: StringBuilder) {
if (pos < len) {
val b: Int = in(pos)
val msb = (b & 0xf0) >> 4
val lsb = (b & 0x0f)
sb.append((if (msb < 10) ('0' + msb).asInstanceOf[Char] else ('a' + (msb - 10)).asInstanceOf[Char]))
sb.append((if (lsb < 10) ('0' + lsb).asInstanceOf[Char] else ('a' + (lsb - 10)).asInstanceOf[Char]))
addDigit(in, pos + 1, len, sb)
}
}
addDigit(in, 0, len, sb)
sb.toString
}
/**
* Splits a string of the form &lt;name1=value1, name2=value2, ... &gt; and unquotes the quoted values.
* The result is a Map[String, String]
*/
def splitNameValuePairs(props: String): Map[String, String] = {
/**
* If str is surrounded by quotes it return the content between the quotes
*/
def unquote(str: String) = {
if ((str ne null) && str.length >= 2 && str.charAt(0) == '\"' && str.charAt(str.length - 1) == '\"')
str.substring(1, str.length - 1)
else
str
}
val list = props.split(",").toList.map(in => {
val pair = in match { case null => Nil case s => s.split("=").toList.map(_.trim).filter(_.length > 0) }
(pair(0), unquote(pair(1)))
})
val map: Map[String, String] = Map.empty
(map /: list)((m, next) => m + (next))
}
}

View file

@ -61,12 +61,6 @@ trait EmbeddedAppServer extends Bootable with Logging {
"org.atmosphere.container.GrizzlyCometSupport")
adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
adapter.addInitParameter("com.sun.jersey.config.property.packages",
config.getList("akka.rest.resource_packages").mkString(";")
)
adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(",")
)
if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath)

View file

@ -1,6 +1,6 @@
project.name=Akka Plugin
project.name=Akka SBT Plugin
project.organization=se.scalablesolutions.akka
# mirrors akka version
project.version=0.9.1
project.version=0.10
sbt.version=0.7.4
build.scala.versions=2.7.7

View file

@ -1,3 +1,7 @@
import sbt._
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info)
class AkkaPluginProject(info: ProjectInfo) extends PluginProject(info) {
override def managedStyle = ManagedStyle.Maven
val publishTo = "Scala Tools Nexus" at "http://nexus.scala-tools.org/content/repositories/releases/"
Credentials(Path.userHome / ".ivy2" / ".scala-tools-credentials", log)
}

View file

@ -1,11 +1,12 @@
import sbt._
object AkkaRepositories {
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
}
trait AkkaBaseProject extends BasicScalaProject {
@ -14,7 +15,8 @@ trait AkkaBaseProject extends BasicScalaProject {
// Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases)
// is resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action.
val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
// for development version resolve to .ivy2/local
// val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
@ -34,11 +36,12 @@ trait AkkaBaseProject extends BasicScalaProject {
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) // only while snapshot version
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
}
trait AkkaProject extends AkkaBaseProject {
val akkaVersion = "0.9.1"
val akkaVersion = "0.10"
// convenience method
def akkaModule(module: String) = "se.scalablesolutions.akka" %% ("akka-" + module) % akkaVersion

View file

@ -224,8 +224,6 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile"
val commons_logging = "commons-logging" % "commons-logging" % "1.1.1" % "compile"
val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
val lift_common = "net.liftweb" % "lift-common" % LIFT_VERSION % "compile"
val lift_util = "net.liftweb" % "lift-util" % LIFT_VERSION % "compile"
// testing
val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"