Merge branch 'master' into wip-2344-agent-love-√

This commit is contained in:
Viktor Klang 2012-08-06 17:24:03 +02:00
commit 757b4555e4
36 changed files with 220 additions and 139 deletions

View file

@ -6,7 +6,7 @@ Akka is here to change that.
Using the Actor Model together with Software Transactional Memory we raise the abstraction level and provide a better platform to build correct concurrent and scalable applications.
For fault-tolerance we adopt the "Let it crash" model which have been used with great success in the telecom industry to build applications that self-heal, systems that never stop.
For fault-tolerance we adopt the "Let it crash" model which the telecom industry has used with great success to build applications that self-heal and systems that never stop.
Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinRouter;
import akka.testkit.ExtractRoute;
public class CustomRouteTest {
static private ActorSystem system;
// only to test compilability
public void testRoute() {
final ActorRef ref = system.actorOf(new Props().withRouter(new RoundRobinRouter(1)));
final scala.Function1<scala.Tuple2<ActorRef, Object>, scala.collection.Iterable<Destination>> route = ExtractRoute.apply(ref);
route.apply(null);
}
}

View file

@ -292,7 +292,7 @@ object SupervisorHierarchySpec {
case Init -> Stress
self ! Work(familySize * 1000)
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
setTimer("phase", StateTimeout, 60 seconds, false)
setTimer("phase", StateTimeout, 30.seconds.dilated, false)
}
val workSchedule = 250.millis
@ -483,6 +483,7 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
EventFilter[Exception]("expected", occurrences = 1) intercept {
boss ! "fail"
}
awaitCond(worker.asInstanceOf[LocalActorRef].underlying.mailbox.isSuspended)
worker ! "ping"
expectNoMsg(2 seconds)
latch.countDown()

View file

@ -383,14 +383,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"fold" in {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); sender.tell(add) }
}))
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(timeout).mapTo[Int] }
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
Await.result(Future.fold((1 to 10).toList map { i Future(i) })(0)(_ + _), remaining) must be(55)
}
"zip" in {
@ -412,30 +405,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"fold by composing" in {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); sender.tell(add) }
}))
}
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(10000).mapTo[Int] }
Await.result(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(45)
val futures = (1 to 10).toList map { i Future(i) }
Await.result(futures.foldLeft(Future(0))((fr, fa) for (r fr; a fa) yield (r + a)), timeout.duration) must be(55)
}
"fold with an exception" in {
filterException[IllegalArgumentException] {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) sender ! Status.Failure(new IllegalArgumentException("shouldFoldResultsWithException: expected"))
else sender.tell(add)
val futures = (1 to 10).toList map {
case 6 Future(throw new IllegalArgumentException("shouldFoldResultsWithException: expected"))
case i Future(i)
}
}))
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100))(timeout).mapTo[Int] }
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), timeout millis) }.getMessage must be("shouldFoldResultsWithException: expected")
intercept[Throwable] { Await.result(Future.fold(futures)(0)(_ + _), remaining) }.getMessage must be("shouldFoldResultsWithException: expected")
}
}
@ -460,31 +440,17 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
"shouldReduceResults" in {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
def receive = { case (add: Int, wait: Int) Thread.sleep(wait); sender.tell(add) }
}))
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 200))(timeout).mapTo[Int] }
assert(Await.result(Future.reduce(futures)(_ + _), timeout millis) === 45)
val futures = (1 to 10).toList map { i Future(i) }
assert(Await.result(Future.reduce(futures)(_ + _), remaining) === 55)
}
"shouldReduceResultsWithException" in {
filterException[IllegalArgumentException] {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
def receive = {
case (add: Int, wait: Int)
Thread.sleep(wait)
if (add == 6) sender ! Status.Failure(new IllegalArgumentException("shouldFoldResultsWithException: expected"))
else sender.tell(add)
val futures = (1 to 10).toList map {
case 6 Future(throw new IllegalArgumentException("shouldReduceResultsWithException: expected"))
case i Future(i)
}
}))
}
val timeout = 10000
def futures = actors.zipWithIndex map { case (actor: ActorRef, idx: Int) actor.?((idx, idx * 100))(timeout).mapTo[Int] }
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), timeout millis) }.getMessage must be === "shouldFoldResultsWithException: expected"
intercept[Throwable] { Await.result(Future.reduce(futures)(_ + _), remaining) }.getMessage must be === "shouldReduceResultsWithException: expected"
}
}

View file

@ -231,7 +231,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.con
"deliver messages in a random fashion" in {
val connectionCount = 10
val iterationCount = 10
val iterationCount = 100
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger

View file

@ -3,34 +3,48 @@
*/
package akka.routing
import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import akka.dispatch.Dispatchers
import akka.testkit.ExtractRoute
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CustomRouteSpec extends AkkaSpec {
class MyRouter extends RouterConfig {
//#custom-router
import akka.actor.{ ActorRef, Props, SupervisorStrategy }
import akka.dispatch.Dispatchers
class MyRouter(target: ActorRef) extends RouterConfig {
override def createRoute(p: Props, prov: RouteeProvider): Route = {
prov.createAndRegisterRoutees(p, 1, Nil)
{
case (sender, message: String) Seq(Destination(sender, target))
case (sender, message) toAll(sender, prov.routees)
}
}
override def supervisorStrategy = SupervisorStrategy.defaultStrategy
override def routerDispatcher = Dispatchers.DefaultDispatcherId
}
//#custom-router
"A custom RouterConfig" must {
"be testable" in {
val router = system.actorOf(Props.empty.withRouter(new MyRouter))
//#test-route
import akka.pattern.ask
import akka.testkit.ExtractRoute
import scala.concurrent.Await
import scala.concurrent.util.duration._
val target = system.actorOf(Props.empty)
val router = system.actorOf(Props.empty.withRouter(new MyRouter(target)))
val route = ExtractRoute(router)
route(testActor -> "hallo").size must be(1)
val r = Await.result(router.ask(CurrentRoutees)(1 second).mapTo[RouterRoutees], 1 second)
r.routees.size must be(1)
route(testActor -> "hallo") must be(Seq(Destination(testActor, target)))
route(testActor -> 12) must be(Seq(Destination(testActor, r.routees.head)))
//#test-route
}
}

View file

@ -11,7 +11,7 @@ package akka
* <li>toString that includes exception name, message and uuid</li>
* </ul>
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
class AkkaException(message: String, cause: Throwable) extends RuntimeException(message, cause) with Serializable {
def this(msg: String) = this(msg, null)

View file

@ -29,6 +29,7 @@ trait NoSerializationVerificationNeeded
/**
* Internal use only
*/
@SerialVersionUID(1L)
private[akka] case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
abstract class PoisonPill extends AutoReceivedMessage with PossiblyHarmful
@ -36,6 +37,7 @@ abstract class PoisonPill extends AutoReceivedMessage with PossiblyHarmful
/**
* A message all Actors will understand, that when processed will terminate the Actor permanently.
*/
@SerialVersionUID(1L)
case object PoisonPill extends PoisonPill {
/**
* Java API: get the singleton instance
@ -48,6 +50,7 @@ abstract class Kill extends AutoReceivedMessage with PossiblyHarmful
* A message all Actors will understand, that when processed will make the Actor throw an ActorKilledException,
* which will trigger supervision.
*/
@SerialVersionUID(1L)
case object Kill extends Kill {
/**
* Java API: get the singleton instance
@ -58,6 +61,7 @@ case object Kill extends Kill {
/**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
*/
@SerialVersionUID(1L)
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
abstract class ReceiveTimeout extends PossiblyHarmful
@ -66,6 +70,7 @@ abstract class ReceiveTimeout extends PossiblyHarmful
* When using ActorContext.setReceiveTimeout, the singleton instance of ReceiveTimeout will be sent
* to the Actor when there hasn't been any message for that long.
*/
@SerialVersionUID(1L)
case object ReceiveTimeout extends ReceiveTimeout {
/**
* Java API: get the singleton instance
@ -83,22 +88,26 @@ sealed trait SelectionPath extends AutoReceivedMessage
/**
* Internal use only
*/
@SerialVersionUID(1L)
private[akka] case class SelectChildName(name: String, next: Any) extends SelectionPath
/**
* Internal use only
*/
@SerialVersionUID(1L)
private[akka] case class SelectChildPattern(pattern: Pattern, next: Any) extends SelectionPath
/**
* Internal use only
*/
@SerialVersionUID(1L)
private[akka] case class SelectParent(next: Any) extends SelectionPath
/**
* IllegalActorStateException is thrown when a core invariant in the Actor implementation has been violated.
* For instance, if you try to create an Actor that doesn't extend Actor.
*/
@SerialVersionUID(1L)
class IllegalActorStateException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
@ -107,6 +116,7 @@ class IllegalActorStateException private[akka] (message: String, cause: Throwabl
/**
* ActorKilledException is thrown when an Actor receives the akka.actor.Kill message
*/
@SerialVersionUID(1L)
class ActorKilledException private[akka] (message: String, cause: Throwable)
extends AkkaException(message, cause)
with NoStackTrace {
@ -117,11 +127,13 @@ class ActorKilledException private[akka] (message: String, cause: Throwable)
* An InvalidActorNameException is thrown when you try to convert something, usually a String, to an Actor name
* which doesn't validate.
*/
@SerialVersionUID(1L)
class InvalidActorNameException(message: String) extends AkkaException(message)
/**
* An ActorInitializationException is thrown when the the initialization logic for an Actor fails.
*/
@SerialVersionUID(1L)
class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) {
def this(msg: String) = this(null, msg, null)
@ -136,6 +148,7 @@ class ActorInitializationException private[akka] (val actor: ActorRef, message:
* @param origCause is the exception which caused the restart in the first place
* @param msg is the message which was optionally passed into preRestart()
*/
@SerialVersionUID(1L)
class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any])
extends ActorInitializationException(actor, "exception in preRestart(" + origCause.getClass + ", " + msg.map(_.getClass) + ")", cause) {
}
@ -148,6 +161,7 @@ class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val
* @param cause is the exception thrown by that actor within preRestart()
* @param origCause is the exception which caused the restart in the first place
*/
@SerialVersionUID(1L)
class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable)
extends ActorInitializationException(actor, "exception post restart (" + origCause.getClass + ")", cause) {
}
@ -157,6 +171,7 @@ class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val
* Technically it's only "null" which is an InvalidMessageException but who knows,
* there might be more of them in the future, or not.
*/
@SerialVersionUID(1L)
class InvalidMessageException private[akka] (message: String, cause: Throwable = null)
extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
@ -166,6 +181,7 @@ class InvalidMessageException private[akka] (message: String, cause: Throwable =
* A DeathPactException is thrown by an Actor that receives a Terminated(someActor) message
* that it doesn't handle itself, effectively crashing the Actor and escalating to the supervisor.
*/
@SerialVersionUID(1L)
case class DeathPactException private[akka] (dead: ActorRef)
extends AkkaException("Monitored actor [" + dead + "] terminated")
with NoStackTrace
@ -174,11 +190,13 @@ case class DeathPactException private[akka] (dead: ActorRef)
* When an InterruptedException is thrown inside an Actor, it is wrapped as an ActorInterruptedException as to
* avoid cascading interrupts to other threads than the originally interrupted one.
*/
@SerialVersionUID(1L)
class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace
/**
* This message is published to the EventStream whenever an Actor receives a message it doesn't understand
*/
@SerialVersionUID(1L)
case class UnhandledMessage(@BeanProperty message: Any, @BeanProperty sender: ActorRef, @BeanProperty recipient: ActorRef)
/**
@ -191,12 +209,14 @@ object Status {
/**
* This class/message type is preferably used to indicate success of some operation performed.
*/
@SerialVersionUID(1L)
case class Success(status: AnyRef) extends Status
/**
* This class/message type is preferably used to indicate failure of some operation performed.
* As an example, it is used to signal failure with AskSupport is used (ask/?).
*/
@SerialVersionUID(1L)
case class Failure(cause: Throwable) extends Status
}
@ -224,6 +244,7 @@ object Actor {
/**
* emptyBehavior is a Receive-expression that matches no messages at all, ever.
*/
@SerialVersionUID(1L)
object emptyBehavior extends Receive {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("Empty behavior apply()")

View file

@ -32,7 +32,7 @@ object ActorPath {
* is sorted by path elements FROM RIGHT TO LEFT, where RootActorPath >
* ChildActorPath in case the number of elements is different.
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
/**
* The Address under which this path can be reached; walks up the tree to
@ -103,7 +103,7 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
* Root of the hierarchy of ActorPaths. There is exactly root per ActorSystem
* and node (for remote-enabled or clustered systems).
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
final case class RootActorPath(address: Address, name: String = "/") extends ActorPath {
override def parent: ActorPath = this
@ -126,7 +126,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
}
}
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name)

View file

@ -350,7 +350,7 @@ private[akka] class LocalActorRef private[akka] (
* Memento pattern for serializing ActorRefs transparently
* INTERNAL API
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
private[akka] case class SerializedActorRef private (path: String) {
import akka.serialization.JavaSerializer.currentSystem
@ -405,10 +405,11 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
* When a message is sent to an Actor that is terminated before receiving the message, it will be sent as a DeadLetter
* to the ActorSystem's EventStream
*/
@SerialVersionUID(1L)
case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
private[akka] object DeadLetterActorRef {
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = JavaSerializer.currentSystem.value.deadLetters

View file

@ -405,7 +405,7 @@ class LocalActorRefProvider(
case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok"
case StopChild(child) context.stop(child)
case m deadLetters ! DeadLetter(m, sender, self)
}

View file

@ -16,6 +16,7 @@ import annotation.tailrec
* for example a remote transport would want to associate additional
* information with an address, then this must be done externally.
*/
@SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
def this(protocol: String, system: String) = this(protocol, system, None, None)

View file

@ -27,7 +27,7 @@ import annotation.tailrec
* context.actorOf(someProps, "someName", Deploy(scope = RemoteScope("someOtherNodeName")))
* }}}
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
final case class Deploy(
path: String = "",
config: Config = ConfigFactory.empty,
@ -76,7 +76,7 @@ trait Scope {
def withFallback(other: Scope): Scope
}
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
abstract class LocalScope extends Scope
//FIXME docs
@ -92,7 +92,7 @@ case object LocalScope extends LocalScope {
/**
* This is the default value and as such allows overrides.
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
abstract class NoScopeGiven extends Scope
case object NoScopeGiven extends NoScopeGiven {
def withFallback(other: Scope): Scope = other

View file

@ -113,7 +113,7 @@ object Props {
* Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..));
* }}}
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed when SI-4804 is fixed
@SerialVersionUID(1L)
case class Props(
creator: () Actor = Props.defaultCreator,
dispatcher: String = Dispatchers.DefaultDispatcherId,

View file

@ -494,7 +494,7 @@ object TypedProps {
* TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
* It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class TypedProps[T <: AnyRef] protected[TypedProps] (
interfaces: Seq[Class[_]],
creator: () T,

View file

@ -128,11 +128,9 @@ object Futures {
*/
def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
r
})
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]())) { (fr, fa)
for (r fr; a fa) yield { r add a; r }
}
}
/**

View file

@ -310,7 +310,7 @@ trait Router extends Actor {
* INTERNAL API
*/
private object Router {
@SerialVersionUID(1L)
case object Resize
val defaultSupervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
@ -325,6 +325,7 @@ private object Router {
*
* Router implementations may choose to handle this message differently.
*/
@SerialVersionUID(1L)
case class Broadcast(message: Any)
/**
@ -333,6 +334,7 @@ case class Broadcast(message: Any)
* about what routees the router is routing over.
*/
abstract class CurrentRoutees
@SerialVersionUID(1L)
case object CurrentRoutees extends CurrentRoutees {
/**
* Java API: get the singleton instance
@ -343,6 +345,7 @@ case object CurrentRoutees extends CurrentRoutees {
/**
* Message used to carry information about what routees the router is currently using.
*/
@SerialVersionUID(1L)
case class RouterRoutees(routees: Iterable[ActorRef])
/**
@ -351,6 +354,7 @@ case class RouterRoutees(routees: Iterable[ActorRef])
* sender should match the sender of the original request, but e.g. the scatter-
* gather router needs to receive the replies with an AskActorRef instead.
*/
@SerialVersionUID(1L)
case class Destination(sender: ActorRef, recipient: ActorRef)
/**
@ -359,7 +363,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
* from lower-precedence sources. The decision whether or not to create a
* router is taken in the LocalActorRefProvider based on Props.
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
abstract class NoRouter extends RouterConfig
case object NoRouter extends NoRouter {
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null // FIXME, null, really??
@ -391,7 +395,7 @@ case object FromConfig extends FromConfig {
* This can be used when the dispatcher to be used for the head Router needs to be configured
* (defaults to default-dispatcher).
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends RouterConfig
with Serializable {
@ -462,7 +466,7 @@ object RoundRobinRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
@ -581,7 +585,7 @@ object RandomRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
@ -707,7 +711,7 @@ object SmallestMailboxRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
@ -907,7 +911,7 @@ object BroadcastRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy)
@ -1018,7 +1022,7 @@ object ScatterGatherFirstCompletedRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
*/
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration,
override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
@ -1132,6 +1136,7 @@ case object DefaultResizer {
}
//FIXME DOCUMENT ME
@SerialVersionUID(1L)
case class DefaultResizer(
/**
* The fewest number of routees the router should ever have.

View file

@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit
import java.lang.{ Double JDouble }
import scala.concurrent.util.Duration
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
@SerialVersionUID(1L)
case class Timeout(duration: Duration) {
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))

View file

@ -3,6 +3,8 @@
*/
package akka.cluster
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -3,6 +3,8 @@
*/
package akka.cluster
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -3,6 +3,8 @@
*/
package akka.cluster
import language.implicitConversions
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.{ Address, ExtendedActorSystem }
@ -12,7 +14,7 @@ import akka.testkit._
import scala.concurrent.util.duration._
import scala.concurrent.util.Duration
import org.scalatest.Suite
import org.scalatest.TestFailedException
import org.scalatest.exceptions.TestFailedException
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorPath
import akka.actor.RootActorPath
@ -73,7 +75,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
super.withFixture(test)
} catch {
case t
case t: Throwable
failed = true
throw t
}

View file

@ -3,6 +3,8 @@
*/
package akka.cluster
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -4,6 +4,8 @@
package akka.cluster
import language.implicitConversions
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -3,6 +3,8 @@
*/
package akka.cluster
import language.postfixOps
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec

View file

@ -1,4 +1,3 @@
.. _what-is-akka:
###############
@ -11,10 +10,10 @@ We believe that writing correct concurrent, fault-tolerant and scalable
applications is too hard. Most of the time it's because we are using the wrong
tools and the wrong level of abstraction. Akka is here to change that. Using the
Actor Model we raise the abstraction level and provide a better platform to build
correct concurrent and scalable applications. For fault-tolerance we adopt the
"Let it crash" model which have been used with great success in the telecom industry to build
applications that self-heals, systems that never stop. Actors also provides the
abstraction for transparent distribution and the basis for truly scalable and
correct, concurrent, and scalable applications. For fault-tolerance we adopt the
"Let it crash" model which the telecom industry has used with great success to
build applications that self-heal and systems that never stop. Actors also provide
the abstraction for transparent distribution and the basis for truly scalable and
fault-tolerant applications.
Akka is Open Source and available under the Apache 2 License.
@ -107,7 +106,7 @@ The Typesafe Stack is all fully open source.
Typesafe Console
================
On top of the Typesafe Stack we have also have commercial product called Typesafe
On top of the Typesafe Stack we also have a commercial product called Typesafe
Console which provides the following features:
#. Slick Web UI with real-time view into the system

View file

@ -86,7 +86,7 @@ public class FutureDocTestBase {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
ExecutionContext ec =
ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere);
ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere);
//Use ec with your Futures
Future<String> f1 = Futures.successful("foo");

View file

@ -707,3 +707,17 @@ Some `Specs2 <http://specs2.org>`_ users have contributed examples of how to wor
* Specifications are by default executed concurrently, which requires some care
when writing the tests or alternatively the ``sequential`` keyword.
Testing Custom Router Logic
===========================
Given the following custom (dummy) router:
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala#custom-router
This might be tested by dispatching messages and asserting their reception at
the right destinations, but that can be inconvenient. Therefore exists the
:obj:`ExtractRoute` extractor, which can be used like so:
.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala#test-route

View file

@ -3,6 +3,8 @@
*/
package akka.remote.router
import language.postfixOps
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

View file

@ -3,6 +3,8 @@
*/
package akka.remote.router
import language.postfixOps
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

View file

@ -3,6 +3,8 @@
*/
package akka.remote.router
import language.postfixOps
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props

View file

@ -3,6 +3,8 @@
*/
package akka.remote.testconductor
import language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.actor.Actor

View file

@ -116,7 +116,8 @@ akka {
# (I&O) Increase this if you want to be able to send messages with large payloads
message-frame-size = 1 MiB
# (O) Timeout duration
# (O) Sets the connectTimeoutMillis of all outbound connections,
# i.e. how long a connect may take until it is timed out
connection-timeout = 120s
# (I) Sets the size of the connection backlog

View file

@ -8,6 +8,18 @@ import scala.annotation.tailrec
import akka.actor.{ UnstartedCell, ActorRef }
import akka.routing.{ RoutedActorRef, RoutedActorCell, Route }
/**
* This object can be used to extract the `Route` out of a RoutedActorRef.
* These are the refs which represent actors created from [[akka.actor.Props]]
* having a [[akka.routing.RouterConfig]]. Use this extractor if you want to
* test the routing directly, i.e. without actually dispatching messages.
*
* {{{
* val router = system.actorOf(Props[...].withRouter(new MyRouter))
* val route = ExtractRoute(router)
* route(sender -> message) must be(...)
* }}}
*/
object ExtractRoute {
def apply(ref: ActorRef): Route = {
@tailrec def rec(tries: Int = 10): Route = {

View file

@ -19,18 +19,12 @@ import Sphinx.{ sphinxDocs, sphinxHtml, sphinxLatex, sphinxPdf, sphinxPygments,
object AkkaBuild extends Build {
System.setProperty("akka.mode", "test") // Is there better place for this?
lazy val desiredScalaVersion = "2.10.0-M5"
val enableMiMa = false
lazy val buildSettings = Seq(
organization := "com.typesafe.akka",
version := "2.1-SNAPSHOT",
//scalaVersion := desiredScalaVersion
scalaVersion := "2.10.0-SNAPSHOT",
scalaVersion in update <<= (scalaVersion) apply {
case "2.10.0-SNAPSHOT" => desiredScalaVersion
case x => x
}
scalaVersion := "2.10.0-M6"
)
lazy val akka = Project(
@ -44,7 +38,8 @@ object AkkaBuild extends Build {
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id),
initialCommands in ThisBuild :=
"""|import akka.actor._
"""|import language.postfixOps
|import akka.actor._
|import akka.dispatch._
|import com.typesafe.config.ConfigFactory
|import scala.concurrent.util.duration._
@ -57,14 +52,22 @@ object AkkaBuild extends Build {
|implicit val timeout = Timeout(5 seconds)
|""".stripMargin,
initialCommands in Test in ThisBuild += "import akka.testkit._",
// online version of docs
/*
* online version of docs: the main docs are generated by
* akka-docs/sphinx and its dependencies, but we need to run the HTML
* part twice, so add it also as akka/sphinx. The trick is to reroute the
* dependencies of that one to their akka-docs brothers, for which the
* map identity is crucial; if anybody knows how/why, please drop RK a
* line (without it, the pygments task would run twice in parallel for
* the same directory, wreaking the expected amount of havoc).
*/
sphinxDocs <<= baseDirectory / "akka-docs",
sphinxTags in sphinxHtml += "online",
sphinxPygments <<= sphinxPygments in LocalProject(docs.id),
sphinxLatex <<= sphinxLatex in LocalProject(docs.id),
sphinxPdf <<= sphinxPdf in LocalProject(docs.id)
sphinxPygments <<= sphinxPygments in LocalProject(docs.id) map identity,
sphinxLatex <<= sphinxLatex in LocalProject(docs.id) map identity,
sphinxPdf <<= sphinxPdf in LocalProject(docs.id) map identity
),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, /*akkaSbtPlugin,*/ samples, tutorials, osgi, osgiAries, docs)
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, samples, tutorials, osgi, osgiAries, docs)
)
lazy val actor = Project(
@ -72,7 +75,6 @@ object AkkaBuild extends Build {
base = file("akka-actor"),
settings = defaultSettings ++ OSGi.actor ++ Seq(
autoCompilerPlugins := true,
libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-reflect" % v },
packagedArtifact in (Compile, packageBin) <<= (artifact in (Compile, packageBin), OsgiKeys.bundle).identityMap,
artifact in (Compile, packageBin) ~= (_.copy(`type` = "bundle")),
// to fix scaladoc generation
@ -261,6 +263,7 @@ object AkkaBuild extends Build {
base = file("akka-sbt-plugin"),
settings = defaultSettings ++ Seq(
sbtPlugin := true,
scalacOptions in Compile := Seq("-encoding", "UTF-8", "-deprecation", "-unchecked"),
scalaVersion := "2.9.1"
)
)
@ -333,8 +336,7 @@ object AkkaBuild extends Build {
override lazy val settings = super.settings ++ buildSettings ++ Seq(
resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/",
//resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/",
resolvers += "Typesafe 2.10 Freshness" at "http://typesafe.artifactoryonline.com/typesafe/scala-fresh-2.10.x/",
resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/",
shellPrompt := { s => Project.extract(s).currentProject.id + " > " }
)
@ -403,9 +405,8 @@ object AkkaBuild extends Build {
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
// compile options
scalacOptions ++= Seq("-encoding", "UTF-8", "-target:jvm-1.6", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Ywarn-adapted-args") ++ (
if (true || (System getProperty "java.runtime.version" startsWith "1.7")) Seq() else Seq("-optimize")), // -optimize fails with jdk7
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.6", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Ywarn-adapted-args"),
javacOptions in Compile ++= Seq("-target", "1.6", "-Xlint:unchecked", "-Xlint:deprecation"),
ivyLoggingLevel in ThisBuild := UpdateLogging.Quiet,
@ -472,10 +473,9 @@ object AkkaBuild extends Build {
previousArtifact := None
)
def akkaPreviousArtifact(id: String, organization: String = "com.typesafe.akka", version: String = "2.0"): Option[sbt.ModuleID] = {
// the artifact to compare binary compatibility with
Some(organization % id % version)
}
def akkaPreviousArtifact(id: String, organization: String = "com.typesafe.akka", version: String = "2.0"): Option[sbt.ModuleID] =
if (enableMiMa) Some(organization % id % version) // the artifact to compare binary compatibility with
else None
}
// Dependencies
@ -495,9 +495,9 @@ object Dependencies {
val slf4j = Seq(slf4jApi, Test.logback)
val agent = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit)
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, scalaActors, Test.scalatest, Test.junit)
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
val mailboxes = Seq(Test.scalatest, Test.junit)
@ -515,23 +515,18 @@ object Dependencies {
val docs = Seq(Test.scalatest, Test.junit, Test.junitIntf)
val zeroMQ = Seq(protobuf, Dependency.zeroMQ, Test.scalatest, Test.junit)
val zeroMQ = Seq(protobuf, zeroMQClient, Test.scalatest, Test.junit)
}
object Dependency {
def v(a: String): String = a+"_"+AkkaBuild.desiredScalaVersion
// Compile
val config = "com.typesafe" % "config" % "0.4.1" // ApacheV2
val camelCore = "org.apache.camel" % "camel-core" % "2.8.0" // ApacheV2
val netty = "io.netty" % "netty" % "3.5.1.Final" // ApacheV2
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.4.1" // New BSD
//val scalaStm = "org.scala-tools" % "scala-stm" % "0.5" // Modified BSD (Scala)
val scalaStm = "scala-stm" % "scala-stm" % "0.6-SNAPSHOT" //"0.5" // Modified BSD (Scala)
val scalaActors = "org.scala-lang" % "scala-actors" % "2.10.0-SNAPSHOT"
val scalaStm = "org.scala-tools" %% "scala-stm" % "0.6" // Modified BSD (Scala)
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.6.4" // MIT
val zeroMQ = "org.zeromq" % v("zeromq-scala-binding") % "0.0.6" // ApacheV2
val zeroMQClient = "org.zeromq" %% "zeromq-scala-binding" % "0.0.6" // ApacheV2
val uncommonsMath = "org.uncommons.maths" % "uncommons-maths" % "1.2.2a" // ApacheV2
val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "0.3.2" // ApacheV2
val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
@ -544,8 +539,8 @@ object Dependency {
val junit = "junit" % "junit" % "4.10" % "test" // Common Public License 1.0
val logback = "ch.qos.logback" % "logback-classic" % "1.0.4" % "test" // EPL 1.0 / LGPL 2.1
val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT
val scalatest = "org.scalatest" % v("scalatest") % "1.9-2.10.0-M5-B2" % "test" // ApacheV2
val scalacheck = "org.scalacheck" % v("scalacheck") % "1.10.0" % "test" // New BSD
val scalatest = "org.scalatest" %% "scalatest" % "1.9-2.10.0-M6-B2" % "test" // ApacheV2
val scalacheck = "org.scalacheck" %% "scalacheck" % "1.10.0" % "test" // New BSD
val ariesProxy = "org.apache.aries.proxy" % "org.apache.aries.proxy.impl" % "0.3" % "test" // ApacheV2
val pojosr = "com.googlecode.pojosr" % "de.kalpatec.pojosr.framework" % "0.1.4" % "test" // ApacheV2
val tinybundles = "org.ops4j.pax.tinybundles" % "tinybundles" % "1.0.0" % "test" // ApacheV2
@ -590,6 +585,6 @@ object OSGi {
def defaultImports = Seq("!sun.misc", akkaImport(), configImport(), scalaImport(), "*")
def akkaImport(packageName: String = "akka.*") = "%s;version=\"[2.1,2.2)\"".format(packageName)
def configImport(packageName: String = "com.typesafe.config.*") = "%s;version=\"[0.4.1,0.5)\"".format(packageName)
def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.9.2,2.10)\"".format(packageName)
def scalaImport(packageName: String = "scala.*") = "%s;version=\"[2.10,2.11)\"".format(packageName)
}

View file

@ -32,8 +32,8 @@ object Sphinx {
sphinx <<= sphinxTask
)
def pygmentsTask = (sphinxPygmentsDir, sphinxTarget, streams) map {
(pygments, baseTarget, s) => {
def pygmentsTask = (sphinxDocs, sphinxPygmentsDir, sphinxTarget, streams) map {
(cwd, pygments, baseTarget, s) => {
val target = baseTarget / "site-packages"
val empty = (target * "*.egg").get.isEmpty
if (empty) {
@ -42,8 +42,8 @@ object Sphinx {
val logger = newLogger(s)
val command = Seq("easy_install", "--install-dir", target.absolutePath, pygments.absolutePath)
val env = "PYTHONPATH" -> target.absolutePath
s.log.debug("Command: " + command.mkString(" "))
val exitCode = Process(command, pygments, env) ! logger
s.log.debug("Command: " + command.mkString(" ") + "\nEnv:" + env)
val exitCode = Process(command, cwd, env) ! logger
if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles.")
(pygments * ("*.egg-info" | "build" | "temp")).get.foreach(IO.delete)
s.log.info("Sphinx pygments styles installed at: " + target)
@ -56,7 +56,7 @@ object Sphinx {
(cacheDirectory, sphinxDocs, sphinxTarget, sphinxPygments, tagsKey, streams) map {
(cacheDir, docs, baseTarget, pygments, tags, s) => {
val target = baseTarget / builder
val doctrees = baseTarget / "doctrees"
val doctrees = baseTarget / "doctrees" / builder
val cache = cacheDir / "sphinx" / builder
val cached = FileFunction.cached(cache)(FilesInfo.hash, FilesInfo.exists) { (in, out) =>
val changes = in.modified

View file

@ -203,6 +203,12 @@ try git checkout -b ${release_branch}
# find and replace the version
try ${script_dir}/find-replace ${current_version} ${version}
#find and replace github links
try ${script_dir}/find-replace http://github.com/akka/akka/tree/master http://github.com/akka/akka/tree/v${version}
try ${script_dir}/find-replace https://github.com/akka/akka/tree/master http://github.com/akka/akka/tree/v${version}
try ${script_dir}/find-replace http://github.com/akka/akka/blob/master http://github.com/akka/akka/tree/v${version}
try ${script_dir}/find-replace https://github.com/akka/akka/blob/master http://github.com/akka/akka/tree/v${version}
# start clean
try sbt clean
@ -216,7 +222,7 @@ fi
# build the release
echolog "Building the release..."
try sbt build-release
try cp akka-spring/src/main/resources/akka/spring/akka-*.xsd ${release_dir}
#try cp akka-spring/src/main/resources/akka/spring/akka-*.xsd ${release_dir}
echolog "Creating gzipped tar download..."
try tar -cz -C ${unzipped_dir} -f ${release_dir}/downloads/akka-${version}.tgz akka-${version}
echolog "Successfully created local release"