Merge branch 'master' into wip-cluster-console-jboner
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
commit
19a8cd8f91
40 changed files with 585 additions and 161 deletions
|
|
@ -19,6 +19,7 @@ import akka.dispatch.{ Await, Dispatchers, Future, Promise }
|
|||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
import akka.actor.TypedActor._
|
||||
import java.lang.IllegalStateException
|
||||
|
||||
object TypedActorSpec {
|
||||
|
||||
|
|
@ -162,20 +163,26 @@ object TypedActorSpec {
|
|||
|
||||
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver {
|
||||
|
||||
private def ensureContextAvailable[T](f: ⇒ T): T = TypedActor.context match {
|
||||
case null ⇒ throw new IllegalStateException("TypedActor.context is null!")
|
||||
case some ⇒ f
|
||||
}
|
||||
|
||||
override def crash(): Unit = throw new IllegalStateException("Crash!")
|
||||
|
||||
override def preStart(): Unit = latch.countDown()
|
||||
override def preStart(): Unit = ensureContextAvailable(latch.countDown())
|
||||
|
||||
override def postStop(): Unit = for (i ← 1 to 3) latch.countDown()
|
||||
override def postStop(): Unit = ensureContextAvailable(for (i ← 1 to 3) latch.countDown())
|
||||
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown()
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ensureContextAvailable(for (i ← 1 to 5) latch.countDown())
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown()
|
||||
override def postRestart(reason: Throwable): Unit = ensureContextAvailable(for (i ← 1 to 7) latch.countDown())
|
||||
|
||||
override def onReceive(msg: Any, sender: ActorRef): Unit = {
|
||||
msg match {
|
||||
case "pigdog" ⇒ sender ! "dogpig"
|
||||
}
|
||||
ensureContextAvailable(
|
||||
msg match {
|
||||
case "pigdog" ⇒ sender ! "dogpig"
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,10 +5,11 @@ package akka.event
|
|||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.duration._
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystemImpl }
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging.InitializeLogger
|
||||
import akka.pattern.gracefulStop
|
||||
|
||||
object EventStreamSpec {
|
||||
|
||||
|
|
@ -20,6 +21,14 @@ object EventStreamSpec {
|
|||
}
|
||||
""".format(Logging.StandardOutLoggerName))
|
||||
|
||||
val configUnhandled = ConfigFactory.parseString("""
|
||||
akka {
|
||||
stdout-loglevel = WARNING
|
||||
loglevel = DEBUG
|
||||
actor.debug.unhandled = on
|
||||
}
|
||||
""")
|
||||
|
||||
case class M(i: Int)
|
||||
|
||||
case class SetTarget(ref: ActorRef)
|
||||
|
|
@ -27,9 +36,13 @@ object EventStreamSpec {
|
|||
class MyLog extends Actor {
|
||||
var dst: ActorRef = context.system.deadLetters
|
||||
def receive = {
|
||||
case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized
|
||||
case SetTarget(ref) ⇒ dst = ref; dst ! "OK"
|
||||
case e: Logging.LogEvent ⇒ dst ! e
|
||||
case Logging.InitializeLogger(bus) ⇒
|
||||
bus.subscribe(context.self, classOf[SetTarget])
|
||||
bus.subscribe(context.self, classOf[UnhandledMessage])
|
||||
sender ! Logging.LoggerInitialized
|
||||
case SetTarget(ref) ⇒ dst = ref; dst ! "OK"
|
||||
case e: Logging.LogEvent ⇒ dst ! e
|
||||
case u: UnhandledMessage ⇒ dst ! u
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -61,6 +74,19 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
|||
}
|
||||
}
|
||||
|
||||
"be able to log unhandled messages" in {
|
||||
val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled)
|
||||
try {
|
||||
sys.eventStream.subscribe(testActor, classOf[AnyRef])
|
||||
val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters)
|
||||
sys.eventStream.publish(m)
|
||||
expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42"))
|
||||
sys.eventStream.unsubscribe(testActor)
|
||||
} finally {
|
||||
sys.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
"manage log levels" in {
|
||||
val bus = new EventStream(false)
|
||||
bus.startDefaultLoggers(impl)
|
||||
|
|
|
|||
|
|
@ -14,6 +14,11 @@ akka {
|
|||
|
||||
# Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
|
||||
event-handlers = ["akka.event.Logging$DefaultLogger"]
|
||||
|
||||
# Event handlers are created and registered synchronously during ActorSystem
|
||||
# start-up, and since they are actors, this timeout is used to bound the
|
||||
# waiting time
|
||||
event-handler-startup-timeout = 5s
|
||||
|
||||
# Log level used by the configured loggers (see "event-handlers") as soon
|
||||
# as they have been started; before that, see "stdout-loglevel"
|
||||
|
|
@ -275,6 +280,9 @@ akka {
|
|||
|
||||
# enable DEBUG logging of subscription changes on the eventStream
|
||||
event-stream = off
|
||||
|
||||
# enable DEBUG logging of unhandled messages
|
||||
unhandled = off
|
||||
}
|
||||
|
||||
# Entries for pluggable serializers and their bindings.
|
||||
|
|
@ -295,7 +303,7 @@ akka {
|
|||
|
||||
# Used to set the behavior of the scheduler.
|
||||
# Changing the default values may change the system behavior drastically so make sure
|
||||
# you know what you're doing!
|
||||
# you know what you're doing! See the Scheduler section of the Akka documentation for more details.
|
||||
scheduler {
|
||||
# The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler
|
||||
# in the system.
|
||||
|
|
|
|||
|
|
@ -17,9 +17,9 @@ import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
|
|||
import java.io.Closeable
|
||||
import akka.dispatch.Await.Awaitable
|
||||
import akka.dispatch.Await.CanAwait
|
||||
import java.util.concurrent.{ CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||
import akka.util._
|
||||
import collection.immutable.Stack
|
||||
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||
|
||||
object ActorSystem {
|
||||
|
||||
|
|
@ -125,12 +125,15 @@ object ActorSystem {
|
|||
final val LogLevel = getString("akka.loglevel")
|
||||
final val StdoutLogLevel = getString("akka.stdout-loglevel")
|
||||
final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
|
||||
final val EventHandlerStartTimeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
||||
final val LogConfigOnStart = config.getBoolean("akka.log-config-on-start")
|
||||
|
||||
final val AddLoggingReceive = getBoolean("akka.actor.debug.receive")
|
||||
final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive")
|
||||
final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle")
|
||||
final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm")
|
||||
final val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
|
||||
final val DebugUnhandledMessage = getBoolean("akka.actor.debug.unhandled")
|
||||
|
||||
final val Home = config.getString("akka.home") match {
|
||||
case "" ⇒ None
|
||||
|
|
@ -200,7 +203,7 @@ object ActorSystem {
|
|||
*
|
||||
* Where no name is given explicitly, one will be automatically generated.
|
||||
*
|
||||
* <b><i>Important Notice:</i></o>
|
||||
* <b><i>Important Notice:</i></b>
|
||||
*
|
||||
* This class is not meant to be extended by user code. If you want to
|
||||
* actually roll your own Akka, it will probably be better to look into
|
||||
|
|
@ -376,7 +379,7 @@ abstract class ActorSystem extends ActorRefFactory {
|
|||
/**
|
||||
* More powerful interface to the actor system’s implementation which is presented to extensions (see [[akka.actor.Extension]]).
|
||||
*
|
||||
* <b><i>Important Notice:</i></o>
|
||||
* <b><i>Important Notice:</i></b>
|
||||
*
|
||||
* This class is not meant to be extended by user code. If you want to
|
||||
* actually roll your own Akka, beware that you are completely on your own in
|
||||
|
|
@ -404,6 +407,11 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
*/
|
||||
def deathWatch: DeathWatch
|
||||
|
||||
/**
|
||||
* A ThreadFactory that can be used if the transport needs to create any Threads
|
||||
*/
|
||||
def threadFactory: ThreadFactory
|
||||
|
||||
/**
|
||||
* ClassLoader wrapper which is used for reflective accesses internally. This is set
|
||||
* to use the context class loader, if one is set, or the class loader which
|
||||
|
|
|
|||
|
|
@ -227,15 +227,19 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
case _ ⇒ super.supervisorStrategy
|
||||
}
|
||||
|
||||
override def preStart(): Unit = me match {
|
||||
case l: PreStart ⇒ l.preStart()
|
||||
case _ ⇒ super.preStart()
|
||||
override def preStart(): Unit = withContext {
|
||||
me match {
|
||||
case l: PreStart ⇒ l.preStart()
|
||||
case _ ⇒ super.preStart()
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = try {
|
||||
me match {
|
||||
case l: PostStop ⇒ l.postStop()
|
||||
case _ ⇒ super.postStop()
|
||||
withContext {
|
||||
me match {
|
||||
case l: PostStop ⇒ l.postStop()
|
||||
case _ ⇒ super.postStop()
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
TypedActor(context.system).invocationHandlerFor(proxyVar.get) match {
|
||||
|
|
@ -246,14 +250,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
}
|
||||
}
|
||||
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = me match {
|
||||
case l: PreRestart ⇒ l.preRestart(reason, message)
|
||||
case _ ⇒ super.preRestart(reason, message)
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext {
|
||||
me match {
|
||||
case l: PreRestart ⇒ l.preRestart(reason, message)
|
||||
case _ ⇒ super.preRestart(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = me match {
|
||||
case l: PostRestart ⇒ l.postRestart(reason)
|
||||
case _ ⇒ super.postRestart(reason)
|
||||
override def postRestart(reason: Throwable): Unit = withContext {
|
||||
me match {
|
||||
case l: PostRestart ⇒ l.postRestart(reason)
|
||||
case _ ⇒ super.postRestart(reason)
|
||||
}
|
||||
}
|
||||
|
||||
protected def withContext[T](unitOfWork: ⇒ T): T = {
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
import scala.util.control.NoStackTrace
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.dispatch.Await
|
||||
import annotation.implicitNotFound
|
||||
|
||||
/**
|
||||
* This trait brings log level handling to the EventStream: it reads the log
|
||||
|
|
@ -95,26 +96,40 @@ trait LoggingBus extends ActorEventBus {
|
|||
case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil
|
||||
case loggers ⇒ loggers
|
||||
}
|
||||
val myloggers = for {
|
||||
loggerName ← defaultLoggers
|
||||
if loggerName != StandardOutLoggerName
|
||||
} yield {
|
||||
try {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
val myloggers =
|
||||
for {
|
||||
loggerName ← defaultLoggers
|
||||
if loggerName != StandardOutLoggerName
|
||||
} yield {
|
||||
try {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}
|
||||
}
|
||||
guard.withGuard {
|
||||
loggers = myloggers
|
||||
_logLevel = level
|
||||
}
|
||||
try {
|
||||
if (system.settings.DebugUnhandledMessage)
|
||||
subscribe(system.systemActorOf(Props(new Actor {
|
||||
println("started" + self)
|
||||
def receive = {
|
||||
case UnhandledMessage(msg, sender, rcp) ⇒
|
||||
println("got it")
|
||||
publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg))
|
||||
}
|
||||
}), "UnhandledMessageForwarder"), classOf[UnhandledMessage])
|
||||
} catch {
|
||||
case _: InvalidActorNameException ⇒ // ignore if it is already running
|
||||
}
|
||||
publish(Debug(logName, this.getClass, "Default Loggers started"))
|
||||
if (!(defaultLoggers contains StandardOutLoggerName)) {
|
||||
unsubscribe(StandardOutLogger)
|
||||
|
|
@ -153,7 +168,7 @@ trait LoggingBus extends ActorEventBus {
|
|||
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = {
|
||||
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
|
||||
val actor = system.systemActorOf(Props(clazz), name)
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit def timeout = system.settings.EventHandlerStartTimeout
|
||||
import akka.pattern.ask
|
||||
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
|
||||
case _: TimeoutException ⇒
|
||||
|
|
@ -211,7 +226,7 @@ trait LoggingBus extends ActorEventBus {
|
|||
*
|
||||
* The default implementation of the second variant will just call the first.
|
||||
*/
|
||||
trait LogSource[-T] {
|
||||
@implicitNotFound("Cannot find LogSource for ${T} please see ScalaDoc for LogSource for how to obtain or construct one.") trait LogSource[-T] {
|
||||
def genString(t: T): String
|
||||
def genString(t: T, system: ActorSystem): String = genString(t)
|
||||
def getClazz(t: T): Class[_] = t.getClass
|
||||
|
|
|
|||
|
|
@ -349,6 +349,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
val newtemp = new Array[Byte](size)
|
||||
if (_tempLength > 0) Array.copy(_temp, 0, newtemp, 0, _tempLength)
|
||||
_temp = newtemp
|
||||
_tempCapacity = _temp.length
|
||||
}
|
||||
|
||||
private def ensureTempSize(size: Int) {
|
||||
|
|
|
|||
|
|
@ -88,13 +88,13 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* @see Producer#produce(Any, ExchangePattern)
|
||||
*/
|
||||
protected def produce: Receive = {
|
||||
case res: MessageResult ⇒ receiveAfterProduce(res.message)
|
||||
case res: FailureResult ⇒ receiveAfterProduce(res.failure)
|
||||
case res: MessageResult ⇒ routeResponse(res.message)
|
||||
case res: FailureResult ⇒ routeResponse(res.failure)
|
||||
case msg ⇒ {
|
||||
if (oneway)
|
||||
produce(receiveBeforeProduce(msg), ExchangePattern.InOnly)
|
||||
produce(transformOutgoingMessage(msg), ExchangePattern.InOnly)
|
||||
else
|
||||
produce(receiveBeforeProduce(msg), ExchangePattern.InOut)
|
||||
produce(transformOutgoingMessage(msg), ExchangePattern.InOut)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -103,9 +103,14 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subtraits or subclasses.
|
||||
*/
|
||||
protected def receiveBeforeProduce: PartialFunction[Any, Any] = {
|
||||
case msg ⇒ msg
|
||||
}
|
||||
protected def transformOutgoingMessage(msg: Any): Any = msg
|
||||
|
||||
/**
|
||||
* Called before the response message is sent to the original sender. The original
|
||||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subtraits or subclasses.
|
||||
*/
|
||||
protected def transformResponse(msg: Any): Any = msg
|
||||
|
||||
/**
|
||||
* Called after a response was received from the endpoint specified by <code>endpointUri</code>. The
|
||||
|
|
@ -114,9 +119,8 @@ trait ProducerSupport { this: Actor ⇒
|
|||
* done. This method may be overridden by subtraits or subclasses (e.g. to forward responses to another
|
||||
* actor).
|
||||
*/
|
||||
protected def receiveAfterProduce: Receive = {
|
||||
case msg ⇒ if (!oneway) sender ! msg
|
||||
}
|
||||
|
||||
protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,14 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
|
|||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subclasses.
|
||||
*/
|
||||
def onReceiveBeforeProduce(message: AnyRef): AnyRef = message
|
||||
def onTransformOutgoingMessage(message: AnyRef): AnyRef = message
|
||||
|
||||
/**
|
||||
* Called before the response message is sent to original sender. The original
|
||||
* message is passed as argument. By default, this method simply returns the argument but may be overridden
|
||||
* by subclasses.
|
||||
*/
|
||||
def onTransformResponse(message: AnyRef): AnyRef = message
|
||||
|
||||
/**
|
||||
* Called after a response was received from the endpoint specified by <code>endpointUri</code>. The
|
||||
|
|
@ -27,15 +34,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport {
|
|||
* if <code>oneway</code> is <code>false</code>. If <code>oneway</code> is <code>true</code>, nothing is
|
||||
* done. This method may be overridden by subclasses (e.g. to forward responses to another actor).
|
||||
*/
|
||||
def onReceiveAfterProduce(message: AnyRef): Unit = super.receiveAfterProduce(message)
|
||||
def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message)
|
||||
|
||||
final override def receiveBeforeProduce = {
|
||||
case msg: AnyRef ⇒ onReceiveBeforeProduce(msg)
|
||||
}
|
||||
|
||||
final override def receiveAfterProduce = {
|
||||
case msg: AnyRef ⇒ onReceiveAfterProduce(msg)
|
||||
}
|
||||
final override def transformOutgoingMessage(msg: Any): AnyRef = onTransformOutgoingMessage(msg.asInstanceOf[AnyRef])
|
||||
final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef])
|
||||
final override def routeResponse(msg: Any): Unit = onRouteResponse(msg.asInstanceOf[AnyRef])
|
||||
|
||||
final override def endpointUri = getEndpointUri
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveAfterProduce(Object message) {
|
||||
public void onRouteResponse(Object message) {
|
||||
CamelMessage msg = (CamelMessage)message;
|
||||
String body = msg.getBodyAs(String.class,getCamelContext());
|
||||
getProducerTemplate().sendBody("direct:forward-test-1", body);
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ object ProducerFeatureTest {
|
|||
class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
override protected def receiveBeforeProduce = {
|
||||
override protected def transformOutgoingMessage(msg: Any) = msg match {
|
||||
case msg: CamelMessage ⇒ if (upper) msg.mapBody {
|
||||
body: String ⇒ body.toUpperCase
|
||||
}
|
||||
|
|
@ -277,9 +277,7 @@ object ProducerFeatureTest {
|
|||
class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
override protected def receiveAfterProduce = {
|
||||
case msg ⇒ target forward msg
|
||||
}
|
||||
override protected def routeResponse(msg: Any): Unit = target forward msg
|
||||
}
|
||||
|
||||
class TestResponder extends Actor {
|
||||
|
|
|
|||
|
|
@ -157,6 +157,45 @@ class VectorClockSpec extends AkkaSpec {
|
|||
merged1 == merged2 must be(true)
|
||||
}
|
||||
|
||||
"correctly merge two disjoint vector clocks" in {
|
||||
val node1 = Node("1")
|
||||
val node2 = Node("2")
|
||||
val node3 = Node("3")
|
||||
val node4 = Node("4")
|
||||
|
||||
val clock1_1 = VectorClock()
|
||||
val clock2_1 = clock1_1 + node1
|
||||
val clock3_1 = clock2_1 + node2
|
||||
val clock4_1 = clock3_1 + node2
|
||||
val clock5_1 = clock4_1 + node3
|
||||
|
||||
val clock1_2 = VectorClock()
|
||||
val clock2_2 = clock1_2 + node4
|
||||
val clock3_2 = clock2_2 + node4
|
||||
|
||||
val merged1 = clock3_2 merge clock5_1
|
||||
merged1.versions.size must be(4)
|
||||
merged1.versions.contains(node1) must be(true)
|
||||
merged1.versions.contains(node2) must be(true)
|
||||
merged1.versions.contains(node3) must be(true)
|
||||
merged1.versions.contains(node4) must be(true)
|
||||
|
||||
val merged2 = clock5_1 merge clock3_2
|
||||
merged2.versions.size must be(4)
|
||||
merged2.versions.contains(node1) must be(true)
|
||||
merged2.versions.contains(node2) must be(true)
|
||||
merged2.versions.contains(node3) must be(true)
|
||||
merged2.versions.contains(node4) must be(true)
|
||||
|
||||
clock3_2 < merged1 must be(true)
|
||||
clock5_1 < merged1 must be(true)
|
||||
|
||||
clock3_2 < merged2 must be(true)
|
||||
clock5_1 < merged2 must be(true)
|
||||
|
||||
merged1 == merged2 must be(true)
|
||||
}
|
||||
|
||||
"pass blank clock incrementing" in {
|
||||
val node1 = Node("1")
|
||||
val node2 = Node("2")
|
||||
|
|
|
|||
Binary file not shown.
|
Before Width: | Height: | Size: 1.2 KiB After Width: | Height: | Size: 658 B |
|
|
@ -29,7 +29,7 @@ The quintessential feature of actor systems is that tasks are split up and
|
|||
delegated until they become small enough to be handled in one piece. In doing
|
||||
so, not only is the task itself clearly structured, but the resulting actors
|
||||
can be reasoned about in terms of which messages they should process, how they
|
||||
should react nominally and how failure should be handled. If one actor does not
|
||||
should react normally and how failure should be handled. If one actor does not
|
||||
have the means for dealing with a certain situation, it sends a corresponding
|
||||
failure message to its supervisor, asking for help. The recursive structure
|
||||
then allows to handle failure at the right level.
|
||||
|
|
@ -41,7 +41,7 @@ trying to keep everything “under the carpet”.
|
|||
|
||||
Now, the difficulty in designing such a system is how to decide who should
|
||||
supervise what. There is of course no single best solution, but there are a few
|
||||
guide lines which might be helpful:
|
||||
guidelines which might be helpful:
|
||||
|
||||
- If one actor manages the work another actor is doing, e.g. by passing on
|
||||
sub-tasks, then the manager should supervise the child. The reason is that
|
||||
|
|
@ -101,6 +101,12 @@ Actor Best Practices
|
|||
breaks all the properties which make programming in actors such a nice
|
||||
experience.
|
||||
|
||||
#. Top-level actors are the innermost part of your Error Kernel, so create them
|
||||
sparingly and prefer truly hierarchical systems. This has benefits wrt.
|
||||
fault-handling (both considering the granularity of configuration and the
|
||||
performance) and it also reduces the number of blocking calls made, since
|
||||
the creation of top-level actors involves synchronous messaging.
|
||||
|
||||
What you should not concern yourself with
|
||||
-----------------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -43,6 +43,12 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol
|
|||
* **The actor send rule:** the send of the message to an actor happens before the receive of that message by the same actor.
|
||||
* **The actor subsequent processing rule:** processing of one message happens before processing of the next message by the same actor.
|
||||
|
||||
.. note::
|
||||
|
||||
In layman's terms this means that changes to internal fields of the actor is visible when the next message
|
||||
is processed by that actor. So fields in your actor does not need to be volatile or equivalent.
|
||||
|
||||
|
||||
Both rules only apply for the same actor instance and are not valid if different actors are used.
|
||||
|
||||
Futures and the Java Memory Model
|
||||
|
|
|
|||
|
|
@ -150,4 +150,20 @@ public class TypedActorDocTestBase {
|
|||
//Ignore
|
||||
}
|
||||
}
|
||||
|
||||
@Test public void proxyAnyActorRef() {
|
||||
try {
|
||||
//#typed-actor-remote
|
||||
Squarer typedActor =
|
||||
TypedActor.get(system).
|
||||
typedActorOf(
|
||||
new TypedProps<Squarer>(Squarer.class),
|
||||
system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar")
|
||||
);
|
||||
//Use "typedActor" as a FooBar
|
||||
//#typed-actor-remote
|
||||
} catch (Exception e) {
|
||||
//dun care
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,3 +23,4 @@ Java API
|
|||
fsm
|
||||
extending-akka
|
||||
zeromq
|
||||
microkernel
|
||||
|
|
|
|||
67
akka-docs/java/microkernel.rst
Normal file
67
akka-docs/java/microkernel.rst
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
|
||||
.. _microkernel:
|
||||
|
||||
#############
|
||||
Microkernel (Java)
|
||||
#############
|
||||
|
||||
The Akka Microkernel is included in the Akka download found at `downloads`_.
|
||||
|
||||
.. _downloads: http://akka.io/downloads
|
||||
|
||||
To run an application with the microkernel you need to create a Bootable class
|
||||
that handles the startup and shutdown the application. An example is included below.
|
||||
|
||||
Put your application jar in the ``deploy`` directory to have it automatically
|
||||
loaded.
|
||||
|
||||
To start the kernel use the scripts in the ``bin`` directory, passing the boot
|
||||
classes for your application.
|
||||
|
||||
There is a simple example of an application setup for running with the
|
||||
microkernel included in the akka download. This can be run with the following
|
||||
command (on a unix-based system):
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
bin/akka sample.kernel.hello.HelloKernel
|
||||
|
||||
Use ``Ctrl-C`` to interrupt and exit the microkernel.
|
||||
|
||||
On a Windows machine you can also use the bin/akka.bat script.
|
||||
|
||||
The code for the Hello Kernel example (see the ``HelloKernel`` class for an example
|
||||
of creating a Bootable):
|
||||
|
||||
.. includecode:: ../../akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java
|
||||
|
||||
|
||||
Distribution of microkernel application
|
||||
---------------------------------------
|
||||
|
||||
To make a distribution package of the microkernel and your application the ``akka-sbt-plugin`` provides
|
||||
``AkkaKernelPlugin``. It creates the directory structure, with jar files, configuration files and
|
||||
start scripts.
|
||||
|
||||
To use the sbt plugin you define it in your ``project/plugins.sbt``:
|
||||
|
||||
.. includecode:: ../../akka-sbt-plugin/sample/project/plugins.sbt
|
||||
|
||||
Then you add it to the settings of your ``project/Build.scala``. It is also important that you add the ``akka-kernel`` dependency.
|
||||
This is an example of a complete sbt build file:
|
||||
|
||||
.. includecode:: ../../akka-sbt-plugin/sample/project/Build.scala
|
||||
|
||||
Run the plugin with sbt::
|
||||
|
||||
> dist
|
||||
> dist:clean
|
||||
|
||||
There are several settings that can be defined:
|
||||
|
||||
* ``outputDirectory`` - destination directory of the package, default ``target/dist``
|
||||
* ``distJvmOptions`` - JVM parameters to be used in the start script
|
||||
* ``configSourceDirs`` - Configuration files are copied from these directories, default ``src/config``, ``src/main/config``, ``src/main/resources``
|
||||
* ``distMainClass`` - Kernel main class to use in start script
|
||||
* ``libFilter`` - Filter of dependency jar files
|
||||
* ``additionalLibs`` - Additional dependency jar files
|
||||
|
|
@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions
|
|||
You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the
|
||||
scheduled operation.
|
||||
|
||||
.. warning::
|
||||
|
||||
The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``.
|
||||
It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue.
|
||||
The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration
|
||||
properties. For more information, see: `HashedWheelTimers <http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt>`_.
|
||||
|
||||
Some examples
|
||||
-------------
|
||||
|
||||
|
|
|
|||
|
|
@ -198,3 +198,10 @@ Proxying
|
|||
|
||||
You can use the ``typedActorOf`` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor.
|
||||
This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with ``actorFor`` and pass the ``ActorRef`` to ``typedActorOf``.
|
||||
|
||||
Lookup & Remoting
|
||||
-----------------
|
||||
|
||||
Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes.
|
||||
|
||||
.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java#typed-actor-remote
|
||||
|
|
@ -82,6 +82,13 @@ that is used in log messages and for identifying actors. The name must not be em
|
|||
or start with ``$``. If the given name is already in use by another child to the
|
||||
same parent actor an `InvalidActorNameException` is thrown.
|
||||
|
||||
.. warning::
|
||||
|
||||
Creating top-level actors with ``system.actorOf`` is a blocking operation,
|
||||
hence it may dead-lock due to starvation if the default dispatcher is
|
||||
overloaded. To avoid problems, do not call this method from within actors or
|
||||
futures which run on the default dispatcher.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
When you create the ``UntypedActor`` then it will automatically call the ``preStart``
|
||||
callback method on the ``UntypedActor`` class. This is an excellent place to
|
||||
|
|
|
|||
|
|
@ -6,6 +6,5 @@ Modules
|
|||
|
||||
durable-mailbox
|
||||
http
|
||||
microkernel
|
||||
camel
|
||||
spring
|
||||
|
|
|
|||
|
|
@ -76,6 +76,13 @@ that is used in log messages and for identifying actors. The name must not be em
|
|||
or start with ``$``. If the given name is already in use by another child to the
|
||||
same parent actor an `InvalidActorNameException` is thrown.
|
||||
|
||||
.. warning::
|
||||
|
||||
Creating top-level actors with ``system.actorOf`` is a blocking operation,
|
||||
hence it may dead-lock due to starvation if the default dispatcher is
|
||||
overloaded. To avoid problems, do not call this method from within actors or
|
||||
futures which run on the default dispatcher.
|
||||
|
||||
Actors are automatically started asynchronously when created.
|
||||
When you create the ``Actor`` then it will automatically call the ``preStart``
|
||||
callback method on the ``Actor`` trait. This is an excellent place to
|
||||
|
|
@ -143,7 +150,9 @@ The :class:`Actor` trait defines only one abstract method, the above mentioned
|
|||
If the current actor behavior does not match a received message,
|
||||
:meth:`unhandled` is called, which by default publishes an
|
||||
``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor
|
||||
system’s event stream.
|
||||
system’s event stream (set configuration item
|
||||
``akka.event-handler-startup-timeout`` to ``true`` to have them converted into
|
||||
actual Debug messages)
|
||||
|
||||
In addition, it offers:
|
||||
|
||||
|
|
|
|||
84
akka-docs/scala/camel.rst
Normal file
84
akka-docs/scala/camel.rst
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
|
||||
.. _camel-scala:
|
||||
|
||||
#######
|
||||
Camel
|
||||
#######
|
||||
|
||||
For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_.
|
||||
|
||||
For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_
|
||||
(pdf) of the book `Camel in Action`_.
|
||||
|
||||
.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf
|
||||
.. _Camel in Action: http://www.manning.com/ibsen/
|
||||
.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x
|
||||
|
||||
Other, more advanced external articles (for version 1) are:
|
||||
|
||||
* `Akka Consumer Actors: New Features and Best Practices <http://krasserm.blogspot.com/2011/02/akka-consumer-actors-new-features-and.html>`_
|
||||
* `Akka Producer Actors: New Features and Best Practices <http://krasserm.blogspot.com/2011/02/akka-producer-actor-new-features-and.html>`_
|
||||
|
||||
|
||||
|
||||
Introduction
|
||||
============
|
||||
|
||||
The akka-camel module allows actors to receive
|
||||
and send messages over a great variety of protocols and APIs. This section gives
|
||||
a brief overview of the general ideas behind the akka-camel module, the
|
||||
remaining sections go into the details. In addition to the native Scala and Java
|
||||
actor API, actors can now exchange messages with other systems over large number
|
||||
of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a
|
||||
few. At the moment, approximately 80 protocols and APIs are supported.
|
||||
|
||||
The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight
|
||||
integration framework for the JVM. For an introduction to Apache Camel you may
|
||||
want to read this `Apache Camel article`_. Camel comes with a
|
||||
large number of `components`_ that provide bindings to different protocols and
|
||||
APIs. The `camel-extra`_ project provides further components.
|
||||
|
||||
.. _Apache Camel: http://camel.apache.org/
|
||||
.. _Apache Camel article: http://architects.dzone.com/articles/apache-camel-integration
|
||||
.. _components: http://camel.apache.org/components.html
|
||||
.. _camel-extra: http://code.google.com/p/camel-extra/
|
||||
|
||||
Usage of Camel's integration components in Akka is essentially a
|
||||
one-liner. Here's an example.
|
||||
|
||||
.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer-mina
|
||||
|
||||
The above example exposes an actor over a tcp endpoint on port 6200 via Apache
|
||||
Camel's `Mina component`_. The actor implements the endpointUri method to define
|
||||
an endpoint from which it can receive messages. After starting the actor, tcp
|
||||
clients can immediately send messages to and receive responses from that
|
||||
actor. If the message exchange should go over HTTP (via Camel's `Jetty
|
||||
component`_), only the actor's endpointUri method must be changed.
|
||||
|
||||
.. _Mina component: http://camel.apache.org/mina.html
|
||||
.. _Jetty component: http://camel.apache.org/jetty.html
|
||||
|
||||
.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer
|
||||
|
||||
Actors can also trigger message exchanges with external systems i.e. produce to
|
||||
Camel endpoints.
|
||||
|
||||
.. includecode:: code/akka/docs/camel/Introduction.scala#Producer
|
||||
|
||||
In the above example, any message sent to this actor will be added (produced) to
|
||||
the example JMS queue. Producer actors may choose from the same set of Camel
|
||||
components as Consumer actors do.
|
||||
|
||||
The number of Camel components is constantly increasing. The akka-camel module
|
||||
can support these in a plug-and-play manner. Just add them to your application's
|
||||
classpath, define a component-specific endpoint URI and use it to exchange
|
||||
messages over the component-specific protocols or APIs. This is possible because
|
||||
Camel components bind protocol-specific message formats to a Camel-specific
|
||||
`normalized message format`__. The normalized message format hides
|
||||
protocol-specific details from Akka and makes it therefore very easy to support
|
||||
a large number of protocols through a uniform Camel component interface. The
|
||||
akka-camel module further converts mutable Camel messages into immutable
|
||||
representations which are used by Consumer and Producer actors for pattern
|
||||
matching, transformation, serialization or storage.
|
||||
|
||||
__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java
|
||||
|
|
@ -140,6 +140,17 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
|||
//#typed-actor-poisonpill
|
||||
}
|
||||
|
||||
"proxy any ActorRef" in {
|
||||
//#typed-actor-remote
|
||||
val typedActor: Foo with Bar =
|
||||
TypedActor(system).
|
||||
typedActorOf(
|
||||
TypedProps[FooBar],
|
||||
system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar"))
|
||||
//Use "typedActor" as a FooBar
|
||||
//#typed-actor-remote
|
||||
}
|
||||
|
||||
"supercharge" in {
|
||||
//#typed-actor-supercharge-usage
|
||||
val awesomeFooBar: Foo with Bar = TypedActor(system).typedActorOf(TypedProps[FooBar]())
|
||||
|
|
|
|||
44
akka-docs/scala/code/akka/docs/camel/Introduction.scala
Normal file
44
akka-docs/scala/code/akka/docs/camel/Introduction.scala
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
package akka.docs.camel
|
||||
|
||||
import akka.actor._
|
||||
import akka.camel._
|
||||
|
||||
//#Consumer-mina
|
||||
import akka.actor.Actor
|
||||
import akka.actor.Actor._
|
||||
import akka.camel.{CamelMessage, Consumer}
|
||||
|
||||
class MyActor extends Consumer {
|
||||
def endpointUri = "mina:tcp://localhost:6200?textline=true"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => { /* ... */}
|
||||
case _ => { /* ... */}
|
||||
}
|
||||
}
|
||||
|
||||
// start and expose actor via tcp
|
||||
val sys = ActorSystem("camel")
|
||||
val myActor = sys.actorOf(Props[MyActor])
|
||||
//#Consumer-mina
|
||||
|
||||
|
||||
//#Consumer
|
||||
class MyActor extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/example"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => { /* ... */}
|
||||
case _ => { /* ... */}
|
||||
}
|
||||
}
|
||||
//#Consumer
|
||||
|
||||
//#Producer
|
||||
import akka.actor.Actor
|
||||
import akka.camel.{Producer, Oneway}
|
||||
|
||||
class MyActor extends Actor with Producer with Oneway {
|
||||
def endpointUri = "jms:queue:example"
|
||||
}
|
||||
//#Producer
|
||||
|
|
@ -26,3 +26,5 @@ Scala API
|
|||
testing
|
||||
extending-akka
|
||||
zeromq
|
||||
microkernel
|
||||
camel
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
.. _microkernel:
|
||||
|
||||
#############
|
||||
Microkernel
|
||||
Microkernel (Scala)
|
||||
#############
|
||||
|
||||
The Akka Microkernel is included in the Akka download found at `downloads`_.
|
||||
|
|
@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions
|
|||
You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the
|
||||
scheduled operation.
|
||||
|
||||
.. warning::
|
||||
|
||||
The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``.
|
||||
It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue.
|
||||
The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration
|
||||
properties. For more information, see: `HashedWheelTimers <http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt>`_.
|
||||
|
||||
Some examples
|
||||
-------------
|
||||
|
||||
|
|
|
|||
|
|
@ -203,6 +203,13 @@ This is usable if you want to communicate remotely with TypedActors on other mac
|
|||
|
||||
The ActorRef needs to accept ``MethodCall`` messages.
|
||||
|
||||
Lookup & Remoting
|
||||
-----------------
|
||||
|
||||
Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes.
|
||||
|
||||
.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala#typed-actor-remote
|
||||
|
||||
Supercharging
|
||||
-------------
|
||||
|
||||
|
|
|
|||
|
|
@ -78,8 +78,7 @@ class RemoteActorRefProvider(
|
|||
_transport = {
|
||||
val fqn = remoteSettings.RemoteTransport
|
||||
val args = Seq(
|
||||
classOf[RemoteSettings] -> remoteSettings,
|
||||
classOf[ActorSystemImpl] -> system,
|
||||
classOf[ExtendedActorSystem] -> system,
|
||||
classOf[RemoteActorRefProvider] -> this)
|
||||
|
||||
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match {
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
package akka.remote
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef }
|
||||
import akka.dispatch.SystemMessage
|
||||
import akka.event.{ LoggingAdapter, Logging }
|
||||
import akka.AkkaException
|
||||
import akka.serialization.Serialization
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.dispatch.ChildTerminated
|
||||
import akka.actor._
|
||||
|
||||
/**
|
||||
* Remote life-cycle events.
|
||||
|
|
@ -152,7 +152,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
|
|||
* be available (i.e. fully initialized) by the time the first message is
|
||||
* received or when the start() method returns, whatever happens first.
|
||||
*/
|
||||
abstract class RemoteTransport {
|
||||
abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
|
||||
/**
|
||||
* Shuts down the remoting
|
||||
*/
|
||||
|
|
@ -163,11 +163,6 @@ abstract class RemoteTransport {
|
|||
*/
|
||||
def address: Address
|
||||
|
||||
/**
|
||||
* The actor system, for which this transport is instantiated. Will publish to its eventStream.
|
||||
*/
|
||||
def system: ActorSystem
|
||||
|
||||
/**
|
||||
* Start up the transport, i.e. enable incoming connections.
|
||||
*/
|
||||
|
|
@ -197,7 +192,7 @@ abstract class RemoteTransport {
|
|||
override def toString = address.toString
|
||||
}
|
||||
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) {
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) {
|
||||
|
||||
def originalReceiver = input.getRecipient.getPath
|
||||
|
||||
|
|
@ -216,7 +211,7 @@ trait RemoteMarshallingOps {
|
|||
|
||||
def log: LoggingAdapter
|
||||
|
||||
def system: ActorSystemImpl
|
||||
def system: ExtendedActorSystem
|
||||
|
||||
def provider: RemoteActorRefProvider
|
||||
|
||||
|
|
@ -288,9 +283,9 @@ trait RemoteMarshallingOps {
|
|||
case AddressFromURIString(address) if address == provider.transport.address ⇒
|
||||
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
|
||||
r.!(remoteMessage.payload)(remoteMessage.sender)
|
||||
case r ⇒ log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)
|
||||
case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address)
|
||||
}
|
||||
case r ⇒ log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null")
|
||||
case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] (
|
|||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
|
||||
if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
send((message, senderOption, recipient))
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress)
|
||||
|
|
|
|||
|
|
@ -16,18 +16,19 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel }
|
|||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
|
||||
import org.jboss.netty.util.HashedWheelTimer
|
||||
import akka.actor.{ Address, ActorSystemImpl, ActorRef }
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
|
||||
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted }
|
||||
import akka.util.NonFatal
|
||||
import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
|
||||
|
||||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider)
|
||||
extends RemoteTransport with RemoteMarshallingOps {
|
||||
class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps {
|
||||
|
||||
import provider.remoteSettings
|
||||
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
|
|
@ -66,7 +67,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
|
||||
def address = _address.get
|
||||
|
||||
val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")")
|
||||
lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")")
|
||||
|
||||
def start(): Unit = {
|
||||
server.start()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package sample.kernel.hello.java;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.kernel.Bootable;
|
||||
|
||||
public class HelloKernel implements Bootable {
|
||||
final ActorSystem system = ActorSystem.create("hellokernel");
|
||||
|
||||
static class HelloActor extends UntypedActor {
|
||||
final ActorRef worldActor =
|
||||
getContext().actorOf(new Props(WorldActor.class));
|
||||
|
||||
public void onReceive(Object message) {
|
||||
if (message == "start")
|
||||
worldActor.tell("Hello");
|
||||
else if (message instanceof String)
|
||||
System.out.println("Received message '%s'".format((String)message));
|
||||
else unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
static class WorldActor extends UntypedActor {
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof String)
|
||||
getSender().tell(((String)message).toUpperCase() + " world!");
|
||||
else unhandled(message);
|
||||
}
|
||||
}
|
||||
|
||||
public void startup() {
|
||||
system.actorOf(new Props(HelloActor.class)).tell("start");
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
system.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
@ -43,25 +43,19 @@ class Slf4jEventHandler extends Actor with SLF4JLogging {
|
|||
case event @ Error(cause, logSource, logClass, message) ⇒
|
||||
withMdc(logSource, event.thread.getName) {
|
||||
cause match {
|
||||
case Error.NoCause ⇒ Logger(logClass, logSource).error(message.toString)
|
||||
case _ ⇒ Logger(logClass, logSource).error(message.toString, cause)
|
||||
case Error.NoCause | null ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else null)
|
||||
case cause ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause)
|
||||
}
|
||||
}
|
||||
|
||||
case event @ Warning(logSource, logClass, message) ⇒
|
||||
withMdc(logSource, event.thread.getName) {
|
||||
Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef])
|
||||
}
|
||||
withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) }
|
||||
|
||||
case event @ Info(logSource, logClass, message) ⇒
|
||||
withMdc(logSource, event.thread.getName) {
|
||||
Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef])
|
||||
}
|
||||
withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) }
|
||||
|
||||
case event @ Debug(logSource, logClass, message) ⇒
|
||||
withMdc(logSource, event.thread.getName) {
|
||||
Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef])
|
||||
}
|
||||
withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) }
|
||||
|
||||
case InitializeLogger(_) ⇒
|
||||
log.info("Slf4jEventHandler started")
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.testkit
|
||||
|
||||
import scala.util.matching.Regex
|
||||
import akka.actor.{ DeadLetter, ActorSystem, Terminated }
|
||||
import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage }
|
||||
import akka.dispatch.{ SystemMessage, Terminate }
|
||||
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
|
||||
import akka.event.Logging
|
||||
|
|
@ -447,7 +447,7 @@ class TestEventListener extends Logging.DefaultLogger {
|
|||
|
||||
override def receive = {
|
||||
case InitializeLogger(bus) ⇒
|
||||
Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _))
|
||||
Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter], classOf[UnhandledMessage]) foreach (bus.subscribe(context.self, _))
|
||||
sender ! LoggerInitialized
|
||||
case Mute(filters) ⇒ filters foreach addFilter
|
||||
case UnMute(filters) ⇒ filters foreach removeFilter
|
||||
|
|
@ -462,6 +462,9 @@ class TestEventListener extends Logging.DefaultLogger {
|
|||
val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg)
|
||||
if (!filter(event)) print(event)
|
||||
}
|
||||
case UnhandledMessage(msg, sender, rcp) ⇒
|
||||
val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)
|
||||
if (!filter(event)) print(event)
|
||||
case m ⇒ print(Debug(context.system.name, this.getClass, m))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -92,6 +92,18 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"An AkkaSpec" must {
|
||||
|
||||
"warn about unhandled messages" in {
|
||||
implicit val system = ActorSystem("AkkaSpec0", AkkaSpec.testConf)
|
||||
try {
|
||||
val a = system.actorOf(Props.empty)
|
||||
EventFilter.warning(start = "unhandled message", occurrences = 1) intercept {
|
||||
a ! 42
|
||||
}
|
||||
} finally {
|
||||
system.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
"terminate all actors" in {
|
||||
// verbose config just for demonstration purposes, please leave in in case of debugging
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ object Pi extends App {
|
|||
|
||||
var pi: Double = _
|
||||
var nrOfResults: Int = _
|
||||
val start: Long = System.currentTimeMillis
|
||||
val start: Long = System.currentTimeMillis()
|
||||
|
||||
//#create-router
|
||||
val workerRouter = context.actorOf(
|
||||
|
|
@ -66,7 +66,7 @@ object Pi extends App {
|
|||
nrOfResults += 1
|
||||
if (nrOfResults == nrOfMessages) {
|
||||
// Send the result to the listener
|
||||
listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis)
|
||||
listener ! PiApproximation(pi, duration = (System.currentTimeMillis() - start).millis)
|
||||
// Stops this actor and all its supervised children
|
||||
context.stop(self)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ object AkkaBuild extends Build {
|
|||
lazy val buildSettings = Seq(
|
||||
organization := "com.typesafe.akka",
|
||||
version := "2.1-SNAPSHOT",
|
||||
scalaVersion := "2.9.1-1"
|
||||
scalaVersion := "2.9.2"
|
||||
)
|
||||
|
||||
lazy val akka = Project(
|
||||
|
|
@ -332,7 +332,8 @@ object AkkaBuild extends Build {
|
|||
|
||||
override lazy val settings = super.settings ++ buildSettings ++ Seq(
|
||||
resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/",
|
||||
resolvers += "Twitter Public Repo" at "http://maven.twttr.com" // This will be going away with com.mongodb.async's next release
|
||||
resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release
|
||||
shellPrompt := { s => Project.extract(s).currentProject.id + " > " }
|
||||
)
|
||||
|
||||
lazy val baseSettings = Defaults.defaultSettings ++ Publish.settings
|
||||
|
|
@ -423,7 +424,7 @@ object Dependencies {
|
|||
|
||||
val actorTests = Seq(
|
||||
Test.junit, Test.scalatest, Test.commonsMath, Test.mockito,
|
||||
Test.scalacheck, protobuf, jacksonMapper
|
||||
Test.scalacheck, protobuf
|
||||
)
|
||||
|
||||
val remote = Seq(
|
||||
|
|
@ -486,60 +487,41 @@ object Dependency {
|
|||
// Versions
|
||||
|
||||
object V {
|
||||
val Camel = "2.8.0"
|
||||
val Jackson = "1.8.0"
|
||||
val Jetty = "7.4.0.v20110414"
|
||||
val Logback = "0.9.28"
|
||||
val Netty = "3.3.0.Final"
|
||||
val Protobuf = "2.4.1"
|
||||
val Rabbit = "2.3.1"
|
||||
val ScalaStm = "0.5"
|
||||
val Scalatest = "1.6.1"
|
||||
val Slf4j = "1.6.4"
|
||||
val Spring = "3.0.5.RELEASE"
|
||||
val Zookeeper = "3.4.0"
|
||||
val Camel = "2.8.0"
|
||||
val Logback = "0.9.28"
|
||||
val Netty = "3.3.0.Final"
|
||||
val Protobuf = "2.4.1"
|
||||
val Rabbit = "2.3.1"
|
||||
val ScalaStm = "0.5"
|
||||
val Scalatest = "1.6.1"
|
||||
val Slf4j = "1.6.4"
|
||||
val Spring = "3.0.5.RELEASE"
|
||||
val Zookeeper = "3.4.0"
|
||||
}
|
||||
|
||||
// Compile
|
||||
|
||||
val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" // New BSD
|
||||
val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % V.Zookeeper // ApacheV2
|
||||
val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2
|
||||
val camelSpring = "org.apache.camel" % "camel-spring" % V.Camel // ApacheV2
|
||||
val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2
|
||||
val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2
|
||||
val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2
|
||||
val guice = "org.guiceyfruit" % "guice-all" % "2.0" // ApacheV2
|
||||
val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % V.Jackson // ApacheV2
|
||||
val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % V.Jackson // ApacheV2
|
||||
val jettyUtil = "org.eclipse.jetty" % "jetty-util" % V.Jetty // Eclipse license
|
||||
val jettyXml = "org.eclipse.jetty" % "jetty-xml" % V.Jetty // Eclipse license
|
||||
val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % V.Jetty // Eclipse license
|
||||
val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL
|
||||
val log4j = "log4j" % "log4j" % "1.2.14" // ApacheV2
|
||||
val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2
|
||||
val netty = "io.netty" % "netty" % V.Netty // ApacheV2
|
||||
val osgi = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
|
||||
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
|
||||
val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License
|
||||
val redis = "net.debasishg" % "redisclient_2.9.1" % "2.4.0" // ApacheV2
|
||||
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)
|
||||
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
|
||||
val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2
|
||||
val springContext = "org.springframework" % "spring-context" % V.Spring // ApacheV2
|
||||
val staxApi = "javax.xml.stream" % "stax-api" % "1.0-2" // ApacheV2
|
||||
val twttrUtilCore = "com.twitter" % "util-core" % "1.8.1" // ApacheV2
|
||||
val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2
|
||||
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2
|
||||
val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2
|
||||
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.5" // ApacheV2
|
||||
|
||||
// Provided
|
||||
|
||||
object Provided {
|
||||
val javaxServlet = "org.apache.geronimo.specs" % "geronimo-servlet_3.0_spec" % "1.0" % "provided" // CDDL v1
|
||||
val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "provided" // Eclipse license
|
||||
}
|
||||
val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" // New BSD
|
||||
val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2
|
||||
val camelSpring = "org.apache.camel" % "camel-spring" % V.Camel // ApacheV2
|
||||
val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2
|
||||
val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2
|
||||
val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2
|
||||
val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL
|
||||
val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2
|
||||
val netty = "io.netty" % "netty" % V.Netty // ApacheV2
|
||||
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
|
||||
val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License
|
||||
val redis = "net.debasishg" % "redisclient_2.9.1" % "2.4.0" // ApacheV2
|
||||
val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala)
|
||||
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
|
||||
val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2
|
||||
val springContext = "org.springframework" % "spring-context" % V.Spring // ApacheV2
|
||||
val twttrUtilCore = "com.twitter" % "util-core" % "1.8.1" // ApacheV2
|
||||
val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2
|
||||
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2
|
||||
val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2
|
||||
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.5" // ApacheV2
|
||||
|
||||
// Runtime
|
||||
|
||||
|
|
@ -556,8 +538,6 @@ object Dependency {
|
|||
object Test {
|
||||
val commonsColl = "commons-collections" % "commons-collections" % "3.2.1" % "test" // ApacheV2
|
||||
val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2
|
||||
val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "test" // Eclipse license
|
||||
val jettyWebapp = "org.eclipse.jetty" % "jetty-webapp" % V.Jetty % "test" // Eclipse license
|
||||
val junit = "junit" % "junit" % "4.5" % "test" // Common Public License 1.0
|
||||
val logback = "ch.qos.logback" % "logback-classic" % V.Logback % "test" // EPL 1.0 / LGPL 2.1
|
||||
val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue