Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
patriknw 2011-04-06 07:46:15 +02:00
commit 706e128d15
16 changed files with 366 additions and 63 deletions

View file

@ -70,7 +70,6 @@ public class Actors {
return Actor$.MODULE$.actorOf(type);
}
/**
* The message that is sent when an Actor gets a receive timeout.
* <pre>
@ -83,4 +82,27 @@ public class Actors {
public final static ReceiveTimeout$ receiveTimeout() {
return ReceiveTimeout$.MODULE$;
}
}
/**
* The message that when sent to an Actor kills it by throwing an exception.
* <pre>
* actor.sendOneWay(kill());
* </pre>
* @return the single instance of Kill
*/
public final static Kill$ kill() {
return Kill$.MODULE$;
}
/**
* The message that when sent to an Actor shuts it down by calling 'stop'.
* <pre>
* actor.sendOneWay(poisonPill());
* </pre>
* @return the single instance of PoisonPill
*/
public final static PoisonPill$ poisonPill() {
return PoisonPill$.MODULE$;
}
}

View file

@ -5,25 +5,33 @@
package akka
import akka.actor.newUuid
import java.io.{StringWriter, PrintWriter}
import java.net.{InetAddress, UnknownHostException}
/**
* Akka base Exception. Each Exception gets:
* <ul>
* <li>a UUID for tracking purposes</li>
* <li>a message including exception name, uuid, original message and the stacktrace</li>
* <li>a method 'log' that will log the exception once and only once</li>
* <li>a uuid for tracking purposes</li>
* <li>toString that includes exception name, message, uuid, and the stacktrace</li>
* </ul>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable abstract class AkkaException(message: String = "") extends {
val exceptionName = getClass.getName
class AkkaException(message: String = "") extends RuntimeException(message) with Serializable {
val uuid = "%s_%s".format(AkkaException.hostname, newUuid)
} with RuntimeException(message) {
override lazy val toString = "%s\n\t[%s]\n\t%s".format(exceptionName, uuid, message)
override lazy val toString = {
val name = getClass.getName
val trace = stackTraceToString
"%s: %s\n[%s]\n%s".format(name, message, uuid, trace)
}
def stackTraceToString = {
val trace = getStackTrace
val sb = new StringBuffer
for (i <- 0 until trace.length)
sb.append("\tat %s\n" format trace(i))
sb.toString
}
}
object AkkaException {

View file

@ -116,18 +116,18 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case b: UnboundedMailbox if b.blocking =>
new DefaultUnboundedMessageQueue(true) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
case b: UnboundedMailbox =>
if (b.blocking) {
new DefaultUnboundedMessageQueue(true) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
} else { //If we have an unbounded, non-blocking mailbox, we can go lockless
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
final def enqueue(m: MessageInvocation) = this.add(m)
final def dequeue(): MessageInvocation = this.poll()
}
}
case b: UnboundedMailbox if !b.blocking => //If we have an unbounded, non-blocking mailbox, we can go lockless
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
final def enqueue(m: MessageInvocation) = this.add(m)
final def dequeue(): MessageInvocation = this.poll()
}
case b: BoundedMailbox =>
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this

View file

@ -95,14 +95,14 @@ class ClientInitiatedRemoteActorSpec extends AkkaRemoteTest {
"shouldSendBangBangMessageAndReceiveReply" in {
val actor = remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start
val result = actor !! "Hello"
val result = actor !! ("Hello", 10000)
"World" must equal (result.get.asInstanceOf[String])
actor.stop
}
"shouldSendBangBangMessageAndReceiveReplyConcurrently" in {
val actors = (1 to 10).map(num => { remote.actorOf[RemoteActorSpecActorBidirectional](host,port).start }).toList
actors.map(_ !!! "Hello") foreach { future =>
actors.map(_ !!! ("Hello", 10000)) foreach { future =>
"World" must equal (future.await.result.asInstanceOf[Option[String]].get)
}
actors.foreach(_.stop)

View file

@ -0,0 +1,208 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.tutorial.java.first;
import akka.actor.*;
import static akka.actor.Actors.*;
import akka.routing.*;
import static akka.routing.Routing.Broadcast;
import akka.dispatch.Dispatchers;
import static java.util.Arrays.asList;
import java.util.concurrent.CountDownLatch;
/**
* First part in Akka tutorial for Java.
* <p/>
* Calculates Pi.
* <p/>
* Run on command line:
* <pre>
* $ cd akka-1.1
* $ export AKKA_HOME=`pwd`
* $ javac -cp dist/akka-actor-1.1-SNAPSHOT.jar:scala-library.jar akka/tutorial/java/first/Pi.java
* $ java -cp dist/akka-actor-1.1-SNAPSHOT.jar:scala-library.jar:. akka.tutorial.java.first.Pi
* $ ...
* </pre>
* <p/>
* Run it in Maven:
* <pre>
* $ mvn
* > scala:console
* > val pi = new akka.tutorial.java.first.Pi
* > pi.calculate(4, 10000, 10000)
* > ...
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
public class Pi {
public static void main(String[] args) throws Exception {
Pi pi = new Pi();
pi.calculate(4, 10000, 10000);
}
// ====================
// ===== Messages =====
// ====================
static class Calculate {}
static class Work {
private final int arg;
private final int nrOfElements;
public Work(int arg, int nrOfElements) {
this.arg = arg;
this.nrOfElements = nrOfElements;
}
public int getArg() { return arg; }
public int getNrOfElements() { return nrOfElements; }
}
static class Result {
private final double value;
public Result(double value) {
this.value = value;
}
public double getValue() { return value; }
}
// ==================
// ===== Worker =====
// ==================
static class Worker extends UntypedActor {
// define the work
private double calculatePiFor(int arg, int nrOfElements) {
double acc = 0.0D;
for (int i = arg * nrOfElements; i <= ((arg + 1) * nrOfElements - 1); i++) {
acc += 4 * Math.pow(-1, i) / (2 * i + 1);
}
return acc;
}
// message handler
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work)message;
getContext().replyUnsafe(new Result(calculatePiFor(work.getArg(), work.getNrOfElements()))); // perform the work
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
}
// ==================
// ===== Master =====
// ==================
static class Master extends UntypedActor {
private final int nrOfWorkers;
private final int nrOfMessages;
private final int nrOfElements;
private final CountDownLatch latch;
private double pi;
private int nrOfResults;
private long start;
private ActorRef router;
static class PiRouter extends UntypedLoadBalancer {
private final InfiniteIterator<ActorRef> workers;
public PiRouter(ActorRef[] workers) {
this.workers = new CyclicIterator<ActorRef>(asList(workers));
}
public InfiniteIterator<ActorRef> seq() {
return workers;
}
}
public Master(int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
this.nrOfWorkers = nrOfWorkers;
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
// create the workers
final ActorRef[] workers = new ActorRef[nrOfWorkers];
for (int i = 0; i < nrOfWorkers; i++) {
workers[i] = actorOf(Worker.class).start();
}
// wrap them with a load-balancing router
router = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new PiRouter(workers);
}
}).start();
}
// message handler
public void onReceive(Object message) {
if (message instanceof Calculate) {
// schedule work
for (int arg = 0; arg < nrOfMessages; arg++) {
router.sendOneWay(new Work(arg, nrOfElements), getContext());
}
// send a PoisonPill to all workers telling them to shut down themselves
router.sendOneWay(new Broadcast(poisonPill()));
// send a PoisonPill to the router, telling him to shut himself down
router.sendOneWay(poisonPill());
} else if (message instanceof Result) {
// handle result from the worker
Result result = (Result)message;
pi += result.getValue();
nrOfResults += 1;
if (nrOfResults == nrOfMessages) getContext().stop();
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
}
@Override
public void preStart() {
start = System.currentTimeMillis();
}
@Override
public void postStop() {
// tell the world that the calculation is complete
System.out.println(String.format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis", pi, (System.currentTimeMillis() - start)));
latch.countDown();
}
}
// ==================
// ===== Run it =====
// ==================
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) throws Exception {
// this latch is only plumbing to kSystem.currentTimeMillis(); when the calculation is completed
final CountDownLatch latch = new CountDownLatch(1);
// create the master
ActorRef master = actorOf(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
}
}).start();
// start the calculation
master.sendOneWay(new Calculate());
// wait for master to shut down
latch.await();
}
}

View file

@ -2,36 +2,46 @@
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.tutorial.sbt.pi
package akka.tutorial.scala.first
import akka.actor.{Actor, ActorRef, PoisonPill}
import Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import akka.event.EventHandler
import akka.dispatch.Dispatchers
import System.{currentTimeMillis => now}
import java.util.concurrent.CountDownLatch
/**
* Sample for Akka, SBT an Scala tutorial.
* First part in Akka tutorial.
* <p/>
* Calculates Pi.
* <p/>
* Run on command line:
* <pre>
* $ cd akka-1.1
* $ export AKKA_HOME=`pwd`
* $ scalac -cp dist/akka-actor-1.1-SNAPSHOT.jar Pi.scala
* $ java -cp dist/akka-actor-1.1-SNAPSHOT.jar:scala-library.jar:. akka.tutorial.scala.first.Pi
* $ ...
* </pre>
* <p/>
* Run it in SBT:
* <pre>
* $ sbt
* > update
* > console
* > akka.tutorial.sbt.pi.Pi.calculate
* > akka.tutorial.scala.first.Pi.calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
* > ...
* > :quit
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Pi {
object Pi extends App {
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// ====================
// ===== Messages =====
@ -48,7 +58,10 @@ object Pi {
// define the work
val calculatePiFor = (arg: Int, nrOfElements: Int) => {
val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1)
range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum
var acc = 0.0D
range foreach (i => acc += 4 * math.pow(-1, i) / (2 * i + 1))
acc
//range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum
}
def receive = {
@ -94,7 +107,7 @@ object Pi {
override def postStop = {
// tell the world that the calculation is complete
EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start)))
latch.countDown
}
}
@ -102,10 +115,7 @@ object Pi {
// ==================
// ===== Run it =====
// ==================
def calculate = {
val nrOfWorkers = 4
val nrOfMessages = 10000
val nrOfElements = 10000
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)

View file

@ -1,3 +0,0 @@
import sbt._
class PiTutorialProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject

View file

@ -1,3 +0,0 @@
#Project properties
#Fri Apr 01 14:48:23 CEST 2011
plugin.uptodate=true

View file

@ -0,0 +1,5 @@
project.organization=se.scalablesolutions.akka
project.name=Akka Tutorial 1 SBT
project.version=1.0
build.scala.versions=2.9.0.RC1
sbt.version=0.7.6.RC0

View file

@ -0,0 +1,6 @@
import sbt._
class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
val akkaRepo = "Akka Repo" at "http://akka.io/repository"
val akkaPlugin = "se.scalablesolutions.akka" % "akka-sbt-plugin" % "1.1-SNAPSHOT"
}

View file

@ -2,34 +2,46 @@
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.tutorial.sbt.pi
package akka.tutorial.second
import akka.actor.Actor._
import akka.routing.{Routing, CyclicIterator}
import Routing._
import akka.event.EventHandler
import System.{currentTimeMillis => now}
import akka.actor.{Channel, Actor, PoisonPill}
import akka.dispatch.Future
import System.{currentTimeMillis => now}
/**
* Sample for Akka, SBT an Scala tutorial.
* Second part in Akka tutorial.
* <p/>
* Calculates Pi.
* <p/>
* Run on command line:
* <pre>
* $ cd akka-1.1
* $ export AKKA_HOME=`pwd`
* $ scalac -cp dist/akka-actor-1.1-SNAPSHOT.jar Pi.scala
* $ java -cp dist/akka-actor-1.1-SNAPSHOT.jar:scala-library.jar:. akka.tutorial.second.Pi
* $ ...
* </pre>
* <p/>
* Run it in SBT:
* <pre>
* $ sbt
* > update
* > console
* > akka.tutorial.sbt.pi.Pi.calculate
* > akka.tutorial.second.Pi.calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
* > ...
* > :quit
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Pi2 {
object Pi extends App {
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// ====================
// ===== Messages =====
@ -46,7 +58,10 @@ object Pi2 {
// define the work
val calculatePiFor = (arg: Int, nrOfElements: Int) => {
val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1)
range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum
var acc = 0.0D
range foreach (i => acc += 4 * math.pow(-1, i) / (2 * i + 1))
acc
//range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum
}
def receive = {
@ -124,8 +139,3 @@ object Pi2 {
}
}
}
// To be able to run it as a main application
object Main extends App {
Pi2.calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
}

View file

@ -966,10 +966,10 @@ private[akka] abstract class ActorAspect {
else if (TypedActor.returnsFuture_?(methodRtti)) future.get
else {
if (future.isDefined) {
future.get.await.resultOrException match {
case s: Some[AnyRef] => s.get
case None => throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]")
}
future.get.await
val result = future.get.resultOrException
if (result.isDefined) result.get
else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]")
} else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]")
}
}

View file

@ -408,13 +408,18 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// Tutorials
// -------------------------------------------------------------------------------------------------------------------
class AkkaTutorialPiSbtProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaTutorialFirstProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaTutorialSecondProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath)
class AkkaTutorialsParentProject(info: ProjectInfo) extends ParentProject(info) {
override def disableCrossPaths = true
lazy val akka_tutorial_pi_sbt = project("akka-tutorial-pi-sbt", "akka-tutorial-pi-sbt",
new AkkaTutorialPiSbtProject(_), akka_actor)
lazy val akka_tutorial_first = project("akka-tutorial-first", "akka-tutorial-first",
new AkkaTutorialFirstProject(_), akka_actor)
lazy val akka_tutorial_second = project("akka-tutorial-second", "akka-tutorial-second",
new AkkaTutorialSecondProject(_), akka_actor)
lazy val publishRelease = {
val releaseConfiguration = new DefaultPublishConfiguration(localReleaseRepository, "release", false)
@ -477,7 +482,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
def akkaArtifacts = descendents(info.projectPath / "dist", "*-" + version + ".jar")
// ------------------------------------------------------------
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info)
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info)
with DeployProject with OSGiProject with McPom {
override def disableCrossPaths = true
@ -546,12 +551,12 @@ trait McPom { self: DefaultProject =>
case u => u + "/"
}
val oldRepos =
(node \\ "project" \ "repositories" \ "repository").map { n =>
val oldRepos =
(node \\ "project" \ "repositories" \ "repository").map { n =>
cleanUrl((n \ "url").text) -> (n \ "name").text
}.toList
val newRepos =
val newRepos =
mcs.filter(_.resolver.isInstanceOf[MavenRepository]).map { m =>
val r = m.resolver.asInstanceOf[MavenRepository]
cleanUrl(r.root) -> r.name

35
scripts/git-remove-history.sh Executable file
View file

@ -0,0 +1,35 @@
#!/bin/bash
cat <<'EOT'
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@ This command rewrites GIT history like git-rebase. Beware never to rewrite @
@ trees which are already published, as that would deeply upset all cloning @
@ repos. For more details see 'git help rebase'. Tread carefully! @
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
EOT
read -p "I know what I am doing: " answer
test "$answer" = yes || exit 1
set -o errexit
# Author: David Underhill
# Script to permanently delete files/folders from your git repository. To use
# it, cd to your repository's root and then run the script with a list of paths
# you want to delete, e.g., git-delete-history path1 path2
if [ $# -eq 0 ]; then
exit 0
fi
# make sure we're at the root of git repo
if [ ! -d .git ]; then
echo "Error: must run this script from the root of a git repository"
exit 1
fi
# remove all paths passed as arguments from the history of the repo
files=$@
git filter-branch --index-filter "git rm -rf --cached --ignore-unmatch $files" HEAD
# remove the temporary history git-filter-branch otherwise leaves behind for a long time
rm -rf .git/refs/original/ && git reflog expire --all && git gc --aggressive --prune