Merge remote-tracking branch 'origin/master' into wip-improve-docs-rk

This commit is contained in:
Roland 2011-12-28 16:15:20 +01:00
commit 05e11b6fa3
46 changed files with 788 additions and 195 deletions

View file

@ -182,7 +182,7 @@ object FSMTimingSpec {
when(TestCancelTimer) {
case Ev(Tick)
setTimer("hallo", Tock, 1 milli, false)
TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second)
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
cancelTimer("hallo")
sender ! Tick
setTimer("hallo", Tock, 500 millis, false)
@ -209,7 +209,7 @@ object FSMTimingSpec {
case Ev(Tick)
suspend(self)
setTimer("named", Tock, 1 millis, false)
TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second)
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
stay forMax (1 millis) replying Tick
case Ev(Tock)
goto(TestCancelStateTimerInNamedTimerMessage2)

View file

@ -64,9 +64,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val future = Promise[String]().complete(Left(new RuntimeException(message)))
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with a j.u.c.TimeoutException" must {
val message = "Boxed TimeoutException"
val future = Promise[String]().complete(Left(new TimeoutException(message)))
"completed with an InterruptedException" must {
val message = "Boxed InterruptedException"
val future = Promise[String]().complete(Left(new InterruptedException(message)))
behave like futureWithException[RuntimeException](_(future, message))
}
"completed with a NonLocalReturnControl" must {

View file

@ -75,7 +75,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
result
}
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system)
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null

View file

@ -10,6 +10,9 @@ import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import akka.actor._
import java.io._
import akka.dispatch.Await
import akka.util.Timeout
import akka.util.duration._
object SerializeSpec {
@ -129,3 +132,67 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
}
}
}
object VerifySerializabilitySpec {
val conf = ConfigFactory.parseString("""
akka {
actor {
serialize-messages = on
serialize-creators = on
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.testing.ProtobufSerializer"
sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
sjson = ["akka.serialization.SerializeSpec$Person"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}
}
""")
class FooActor extends Actor {
def receive = {
case s: String sender ! s
}
}
class NonSerializableActor(system: ActorSystem) extends Actor {
def receive = {
case s: String sender ! s
}
}
}
class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf) {
import VerifySerializabilitySpec._
implicit val timeout = Timeout(5 seconds)
"verify config" in {
system.settings.SerializeAllCreators must be(true)
system.settings.SerializeAllMessages must be(true)
}
"verify creators" in {
val a = system.actorOf(Props[FooActor])
intercept[NotSerializableException] {
Await.result(a ? new AnyRef, timeout.duration)
}
system stop a
}
"verify messages" in {
val a = system.actorOf(Props[FooActor])
Await.result(a ? "pigdog", timeout.duration) must be("pigdog")
intercept[java.io.NotSerializableException] {
val b = system.actorOf(Props(new NonSerializableActor(system)))
}
system stop a
}
}

View file

@ -50,10 +50,14 @@ akka {
# - TypedActor: methods with non-void return type
timeout = 5s
# Does a deep clone of (non-primitive) messages to ensure immutability
# FIXME: not used, make use of it or remove the option
# Serializes and deserializes (non-primitive) messages to ensure immutability,
# this is only intended for testing.
serialize-messages = off
# Serializes and deserializes creators (in Props) to ensure that they can be sent over the network,
# this is only intended for testing.
serialize-creators = off
deployment {
# deployment id pattern - on the format: /parent/child etc.

View file

@ -25,8 +25,17 @@ import java.util.regex.Pattern
*/
trait AutoReceivedMessage extends Serializable
/**
* Marker trait to indicate that a message might be potentially harmful,
* this is used to block messages coming in over remoting.
*/
trait PossiblyHarmful
/**
* Marker trait to signal that this class should not be verified for serializability.
*/
trait NoSerializationVerificationNeeded
case class Failed(cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful
case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful

View file

@ -13,6 +13,7 @@ import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.{ Duration, Helpers }
import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
//TODO: everything here for current compatibility - could be limited more
@ -214,6 +215,16 @@ private[akka] class ActorCell(
final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
private def _actorOf(props: Props, name: String): ActorRef = {
if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass, None) match {
case Left(t) throw t
case _ //All good
}
}
}
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None)
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor))
actor
@ -308,7 +319,7 @@ private[akka] class ActorCell(
}
final def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender))
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)(system))
final def sender: ActorRef = currentMessage match {
case null system.deadLetters
@ -566,7 +577,7 @@ private[akka] class ActorCell(
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) {
if (recvtimeout._1 > 0 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))

View file

@ -110,6 +110,7 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
}
final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name)
def address: Address = root.address

View file

@ -13,7 +13,7 @@ import java.util.concurrent.TimeUnit
import akka.event.EventStream
import akka.event.DeathWatch
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ ConcurrentHashMap, TimeoutException }
import akka.event.LoggingAdapter
import java.util.concurrent.atomic.AtomicBoolean
@ -106,6 +106,8 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
* Akka Java API.
*
* Sends a message asynchronously returns a future holding the eventual reply message.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'tell' together with the sender
@ -177,6 +179,9 @@ trait ScalaActorRef { ref: ActorRef ⇒
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
* sender parameter to implement non-blocking request/response message exchanges.
@ -489,6 +494,14 @@ class VirtualPathContainer(val path: ActorPath, override val getParent: Internal
}
}
/**
* This is what is used to complete a Future that is returned from an ask/? call,
* when it times out.
*/
class AskTimeoutException(message: String, cause: Throwable) extends TimeoutException {
def this(message: String) = this(message, null: Throwable)
}
class AskActorRef(
val path: ActorPath,
override val getParent: InternalActorRef,

View file

@ -6,14 +6,12 @@ package akka.actor
import java.util.concurrent.atomic.AtomicLong
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
import akka.util.Timeout
import akka.util.Timeout.intToTimeout
import akka.config.ConfigurationException
import akka.dispatch._
import akka.routing._
import akka.util.Timeout
import akka.AkkaException
import akka.util.{ Duration, Switch, Helpers }
import akka.util.{ Duration, Switch, Helpers, Timeout }
import akka.event._
import java.io.Closeable
@ -485,15 +483,15 @@ class LocalActorRefProvider(
def ask(within: Timeout): Option[AskActorRef] = {
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0
None
case t if t.duration.length <= 0 None
case t
val path = tempPath()
val name = path.name
val a = new AskActorRef(path, tempContainer, dispatcher, deathWatch)
tempContainer.addChild(name, a)
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { tempContainer.removeChild(name); a.stop() }
a.result onComplete { _
val result = a.result
val f = dispatcher.prerequisites.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
result onComplete { _
try { a.stop(); f.cancel() }
finally { tempContainer.removeChild(name) }
}

View file

@ -58,9 +58,9 @@ object ActorSystem {
def create(): ActorSystem = apply()
def apply(): ActorSystem = apply("default")
class Settings(cfg: Config, val name: String) {
class Settings(cfg: Config, final val name: String) {
val config: Config = {
final val config: Config = {
val config = cfg.withFallback(ConfigFactory.defaultReference)
config.checkValid(ConfigFactory.defaultReference, "akka")
config
@ -69,81 +69,39 @@ object ActorSystem {
import scala.collection.JavaConverters._
import config._
val ConfigVersion = getString("akka.version")
final val ConfigVersion = getString("akka.version")
val ProviderClass = getString("akka.actor.provider")
final val ProviderClass = getString("akka.actor.provider")
val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators")
val LogLevel = getString("akka.loglevel")
val StdoutLogLevel = getString("akka.stdout-loglevel")
val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart")
val AddLoggingReceive = getBoolean("akka.actor.debug.receive")
val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive")
val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle")
val FsmDebugEvent = getBoolean("akka.actor.debug.fsm")
val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
final val LogLevel = getString("akka.loglevel")
final val StdoutLogLevel = getString("akka.stdout-loglevel")
final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
final val LogConfigOnStart = config.getBoolean("akka.logConfigOnStart")
final val AddLoggingReceive = getBoolean("akka.actor.debug.receive")
final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive")
final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle")
final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm")
final val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
val Home = config.getString("akka.home") match {
final val Home = config.getString("akka.home") match {
case "" None
case x Some(x)
}
val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
override def toString: String = config.root.render
}
// TODO move to migration kit
object OldConfigurationLoader {
val defaultConfig: Config = {
val cfg = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig
cfg.withFallback(ConfigFactory.defaultReference).resolve(ConfigResolveOptions.defaults)
}
// file extensions (.conf, .json, .properties), are handled by parseFileAnySyntax
val defaultLocation: String = (systemMode orElse envMode).map("akka." + _).getOrElse("akka")
private def envMode = System.getenv("AKKA_MODE") match {
case null | "" None
case value Some(value)
}
private def systemMode = System.getProperty("akka.mode") match {
case null | "" None
case value Some(value)
}
private def configParseOptions = ConfigParseOptions.defaults.setAllowMissing(false)
private def fromProperties = try {
val property = Option(System.getProperty("akka.config"))
property.map(p
ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(p), configParseOptions)))
} catch { case _ None }
private def fromClasspath = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseResourcesAnySyntax(ActorSystem.getClass, "/" + defaultLocation, configParseOptions)))
} catch { case _ None }
private def fromHome = try {
Option(ConfigFactory.systemProperties.withFallback(
ConfigFactory.parseFileAnySyntax(new File(GlobalHome.get + "/config/" + defaultLocation), configParseOptions)))
} catch { case _ None }
private def emptyConfig = ConfigFactory.systemProperties
}
}
/**
@ -323,7 +281,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
import ActorSystem._
val settings = new Settings(applicationConfig, name)
final val settings = new Settings(applicationConfig, name)
def logConfiguration(): Unit = log.info(settings.toString)

View file

@ -46,15 +46,53 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
object FaultHandlingStrategy {
sealed trait Action
/**
* Resumes message processing for the failed Actor
*/
case object Resume extends Action
/**
* Discards the old Actor instance and replaces it with a new,
* then resumes message processing.
*/
case object Restart extends Action
/**
* Stops the Actor
*/
case object Stop extends Action
/**
* Escalates the failure to the supervisor of the supervisor,
* by rethrowing the cause of the failure.
*/
case object Escalate extends Action
// Java API
/**
* Resumes message processing for the failed Actor
* Java API
*/
def resume = Resume
/**
* Discards the old Actor instance and replaces it with a new,
* then resumes message processing.
* Java API
*/
def restart = Restart
/**
* Stops the Actor
* Java API
*/
def stop = Stop
/**
* Escalates the failure to the supervisor of the supervisor,
* by rethrowing the cause of the failure.
* Java API
*/
def escalate = Escalate
type Decider = PartialFunction[Throwable, Action]

View file

@ -6,21 +6,30 @@ package akka.dispatch
import java.util.concurrent._
import akka.event.Logging.Error
import akka.util.{ Duration, Switch, ReentrantGuard }
import atomic.{ AtomicInteger, AtomicLong }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
import akka.util.Duration
import akka.actor._
import akka.actor.ActorSystem
import locks.ReentrantLock
import scala.annotation.tailrec
import akka.event.EventStream
import akka.actor.ActorSystem.Settings
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicReference
import akka.util.ReflectiveAccess
import akka.serialization.SerializationExtension
final case class Envelope(val message: Any, val sender: ActorRef) {
if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null")
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
val msg = message.asInstanceOf[AnyRef]
if (msg eq null) throw new InvalidMessageException("Message is null")
if (system.settings.SerializeAllMessages && !msg.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.serialize(msg) match { //Verify serializability
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, msg.getClass, None) match { //Verify deserializability
case Left(t) throw t
case _ //All good
}
}
}
}
}
object SystemMessage {
@ -103,7 +112,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
def name: String
/**
* Identfier of this dispatcher, corresponds to the full key
* Identifier of this dispatcher, corresponds to the full key
* of the dispatcher configuration.
*/
def id: String
@ -257,16 +266,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
* Must be idempotent
*/
protected[akka] def shutdown(): Unit
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages
}
/**

View file

@ -89,7 +89,7 @@ object Futures {
/**
* Java API
* A non-blocking fold over the specified futures.
* A non-blocking fold over the specified futures, with the start value of the given zero.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
@ -201,13 +201,13 @@ object Future {
}
/**
* A non-blocking fold over the specified futures.
* A non-blocking fold over the specified futures, with the start value of the given zero.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
* Example:
* <pre>
* val result = Await.result(Futures.fold(0)(futures)(_ + _), 5 seconds)
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* </pre>
*/
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
@ -231,7 +231,7 @@ object Future {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
* <pre>
* val myFutureList = Futures.traverse(myList)(x Future(myFunc(x)))
* val myFutureList = Future.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
@ -337,7 +337,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
case Left(t: TimeoutException) Left(new RuntimeException("Boxed TimeoutException", t))
case Left(t: InterruptedException) Left(new RuntimeException("Boxed InterruptedException", t))
case _ source
}

View file

@ -111,10 +111,16 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(corePoolSize = size))
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(maxPoolSize = size))
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
@ -206,6 +212,11 @@ trait ExecutorServiceDelegate extends ExecutorService {
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**
* The RejectedExecutionHandler used by Akka, it improves on CallerRunsPolicy
* by throwing a RejectedExecutionException if the executor isShutdown.
* (CallerRunsPolicy silently discards the runnable in this case, which is arguably broken)
*/
class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = {
if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown")

View file

@ -363,7 +363,7 @@ object Logging {
* message. This is necessary to ensure that additional subscriptions are in
* effect when the logging system finished starting.
*/
case class InitializeLogger(bus: LoggingBus)
case class InitializeLogger(bus: LoggingBus) extends NoSerializationVerificationNeeded
/**
* Response message each logger must send within 1 second after receiving the

View file

@ -165,7 +165,7 @@ trait SmallestMailboxSelector {
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
def mailboxSize(a: ActorRef): Int = a match {
case l: LocalActorRef l.underlying.dispatcher.mailboxSize(l.underlying)
case l: LocalActorRef l.underlying.mailbox.numberOfMessages
case _ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
}
@ -282,7 +282,7 @@ trait MailboxPressureCapacitor {
def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int =
delegates count {
case a: LocalActorRef a.underlying.dispatcher.mailboxSize(a.underlying) > pressureThreshold
case a: LocalActorRef a.underlying.mailbox.numberOfMessages > pressureThreshold
case _ false
}
}

View file

@ -34,7 +34,7 @@ public class SchedulerDocTestBase {
@Before
public void setUp() {
system = ActorSystem.create("MySystem", AkkaSpec.testConf());
testActor = system.actorOf(new Props().withCreator(MyUntypedActor.class));
testActor = system.actorOf(new Props(MyUntypedActor.class));
}
@After

View file

@ -100,7 +100,7 @@ public class UntypedActorDocTestBase {
public void propsActorOf() {
ActorSystem system = ActorSystem.create("MySystem");
//#creating-props
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor");
//#creating-props
myActor.tell("test");

View file

@ -57,9 +57,9 @@ public class DispatcherDocTestBase {
@Test
public void defineDispatcher() {
//#defining-dispatcher
ActorRef myActor1 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor1");
ActorRef myActor2 = system.actorOf(new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher"),
ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor2");
//#defining-dispatcher
}
@ -68,7 +68,7 @@ public class DispatcherDocTestBase {
public void definePinnedDispatcher() {
//#defining-pinned-dispatcher
String name = "myactor";
ActorRef myActor = system.actorOf(new Props().withCreator(MyUntypedActor.class)
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
.withDispatcher("myactor-dispatcher"), name);
//#defining-pinned-dispatcher
}

View file

@ -317,8 +317,9 @@ If invoked without the sender parameter the sender will be
Ask: Send-And-Receive-Future
----------------------------
Using ``ask`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future`:
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: java

View file

@ -353,7 +353,8 @@ Ask: Send-And-Receive-Future
----------------------------
Using ``?`` will send a message to the receiving Actor asynchronously and
will immediately return a :class:`Future`:
will immediately return a :class:`Future` which will be completed with
an ``akka.actor.AskTimeoutException`` after the specified timeout:
.. code-block:: scala

View file

@ -17,69 +17,97 @@ In you SBT project you should add the following as a dependency::
"com.typesafe.akka" % "akka-remote" % "2.0-SNAPSHOT"
First of all you have to change the actor provider from ``LocalActorRefProvider`` to ``RemoteActorRefProvider``::
To enable remote capabilities in your Akka project you should, at a minimum, add the following changes
to your ``application.conf`` file::
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
provider = "akka.remote.RemoteActorRefProvider"
}
}
After that you must also add the following settings::
akka {
remote {
transport = "akka.remote.netty.NettyRemoteSupport"
server {
# The hostname or ip to bind the remoting to,
# InetAddress.getLocalHost.getHostAddress is used if empty
hostname = ""
# The default remote server port clients should connect to.
# Default is 2552 (AKKA)
hostname = "127.0.0.1"
port = 2552
}
}
}
}
These are the bare minimal settings that must exist in order to get started with remoting.
There are, of course, more properties that can be tweaked. We refer to the following
As you can see in the example above there are four things you need to add to get started:
* Change provider from ``akka.actor.LocalActorRefProvider`` to ``akka.remote.RemoteActorRefProvider``
* Add host name - the machine you want to run the actor system on
* Add port number - the port the actor system should listen on
The example above only illustrates the bare minimum of properties you have to add to enable remoting.
There are lots of more properties that are related to remoting in Akka. We refer to the following
reference file for more information:
* `reference.conf of akka-remote <https://github.com/jboner/akka/blob/master/akka-remote/src/main/resources/reference.conf#L39>`_
Types of Remote Interaction
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Akka has two ways of using remoting:
* Lookup : used to look up an actor on a remote node with ``actorFor(path)``
* Creation : used to create an actor on a remote node with ``actorOf(Props(...), actorName)``
In the next sections the two alternatives are described in detail.
Looking up Remote Actors
^^^^^^^^^^^^^^^^^^^^^^^^
``actorFor(path)`` will obtain an ``ActorRef`` to an Actor on a remote node::
``actorFor(path)`` will obtain an ``ActorRef`` to an Actor on a remote node, e.g.::
val actor = context.actorFor("akka://app@10.0.0.1:2552/user/serviceA/retrieval")
val actor = context.actorFor("akka://actorSystemName@10.0.0.1:2552/user/actorName")
As you can see from the example above the following pattern is used to find an ``ActorRef`` on a remote node::
akka://<actorsystemname>@<hostname>:<port>/<actor path>
akka://<actor system>@<hostname>:<port>/<actor path>
Once you a reference to the actor you can interact with it they same way you would with a local actor, e.g.::
actor ! "Pretty awesome feature"
For more details on how actor addresses and paths are formed and used, please refer to :ref:`addressing`.
Creating Actors Remotely
^^^^^^^^^^^^^^^^^^^^^^^^
The configuration below instructs the system to deploy the actor "retrieval” on the specific host "app@10.0.0.1".
The "app" in this case refers to the name of the ``ActorSystem``::
If you want to use the creation functionality in Akka remoting you have to further amend the
``application.conf`` file in the following way::
akka {
actor {
deployment {
/serviceA/retrieval {
remote = “akka://app@10.0.0.1:2552”
}
}
provider = "akka.remote.RemoteActorRefProvider"
deployment { /sampleActor {
remote = "akka://sampleActorSystem@127.0.0.1:2553"
}}
}
}
...
Logical path lookup is supported on the node you are on, i.e. to use the
actor created above you would do the following::
The configuration above instructs Akka to react when an actor with path /sampleActor is created, i.e.
using ``system.actorOf(Props(...)`, sampleActor)``. This specific actor will not be directly instantiated,
but instead the remote daemon of the remote system will be asked to create the actor,
which in this sample corresponds to ``sampleActorSystem@127.0.0.1:2553``.
val actor = context.actorFor("/serviceA/retrieval")
Once you have configured the properties above you would do the following in code::
class SampleActor extends Actor { def receive = { case _ => println("Got something") } }
val actor = context.actorOf(Props[SampleActor], "sampleActor")
actor ! "Pretty slick"
``SampleActor`` has to be available to the runtimes using it, i.e. the classloader of the
actor systems has to have a JAR containing the class.
Remote Sample Code
^^^^^^^^^^^^^^^^^^
There is a more extensive remote example that comes with the Akka distribution.
Please have a look here for more information:
`Remote Sample <https://github.com/jboner/akka/tree/master/akka-samples/akka-sample-remote>`_
Serialization
^^^^^^^^^^^^^

View file

@ -3,35 +3,19 @@
*/
package akka.actor.mailbox
import akka.util.ReflectiveAccess
import akka.AkkaException
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.SerializedActorRef
import akka.dispatch.Envelope
import akka.dispatch.DefaultSystemMessageQueue
import akka.dispatch.Dispatcher
import akka.dispatch.CustomMailbox
import akka.dispatch.MailboxType
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageQueue
import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.RemoteProtocol.RemoteMessageProtocol
import akka.remote.RemoteActorRefProvider
import akka.remote.netty.NettyRemoteServer
import akka.serialization.Serialization
import com.typesafe.config.Config
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
}
class DurableMailboxException private[akka] (message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(message: String) = this(message, null)
}
abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
import DurableExecutableMailboxConfig._
@ -67,7 +51,7 @@ trait DurableMessageSerialization {
val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage)
val sender = deserializeActorRef(durableMessage.getSender)
new Envelope(message, sender)
new Envelope(message, sender)(owner.system)
}
}

View file

@ -72,7 +72,7 @@ class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
doc match {
case Some(msg) {
log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg)
envelopePromise.success(msg.envelope())
envelopePromise.success(msg.envelope(system))
log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise)
}
case None

View file

@ -10,9 +10,8 @@ import org.bson.io.OutputBuffer
import org.bson.types.ObjectId
import java.io.InputStream
import org.bson.collection._
import akka.actor.LocalActorRef
import akka.actor.ActorRef
import akka.dispatch.Envelope
import akka.actor.{ ActorSystem, LocalActorRef, ActorRef }
/**
* A container message for durable mailbox messages, which can be easily stuffed into
@ -32,7 +31,7 @@ case class MongoDurableMessage(
val sender: ActorRef,
val _id: ObjectId = new ObjectId) {
def envelope() = Envelope(message, sender)
def envelope(system: ActorSystem) = Envelope(message, sender)(system)
}
// vim: set ts=2 sw=2 sts=2 et:

View file

@ -128,7 +128,7 @@ akka {
cluster {
name = "default-cluster"
nodename = ""
nodename = "default"
seed-nodes = []
}
}

View file

@ -111,7 +111,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
}
}
log.info("Starting remote server on [{}]", remoteAddress)
log.info("Starting remote server on [{}] with node name [{}]", remoteAddress, provider.nodename)
}
}
@ -142,9 +142,9 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
val full = Vector() ++ names
rec(full.mkString("/"), 0) match {
case (Nobody, _) Nobody
case (ref, n) if n == 0 ref
case (ref, n) ref.getChild(full.takeRight(n).iterator)
case (Nobody, _) Nobody
case (ref, 0) ref
case (ref, n) ref.getChild(full.takeRight(n).iterator)
}
}

View file

@ -31,7 +31,7 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi
}
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" throw new ConfigurationException("akka.cluster.nodename configuration property must be defined")
case "" throw new ConfigurationException("Configuration option 'akka.cluster.nodename' must be non-empty.")
case value value
}

View file

@ -669,7 +669,7 @@ class RemoteServerHandler(
private def getClientAddress(c: Channel): Option[RemoteNettyAddress] =
c.getRemoteAddress match {
case inet: InetSocketAddress Some(RemoteNettyAddress(inet.getHostName, inet.getPort))
case inet: InetSocketAddress Some(RemoteNettyAddress(inet.getHostName, Some(inet.getAddress), inet.getPort))
case _ None
}
}

View file

@ -11,7 +11,12 @@ The Sample Explained
In order to showcase the remote capabilities of Akka 2.0 we thought a remote calculator could do the trick.
There are two implementations of the sample; one in Scala and one in Java.
The explanation below is for Scala, but everything is similar in Java except that the class names begin with a ``J``,
e.g. ``JCalcApp`` instead of ``CalcApp``, and that the Java classes reside in another package structure.
There are three actor systems used in the sample:
* CalculatorApplication : the actor system performing the number crunching
* LookupApplication : illustrates how to look up an actor on a remote node and and how communicate with that actor
* CreationApplication : illustrates how to create an actor on a remote node and how to communicate with that actor
@ -70,7 +75,7 @@ Open up a new terminal window and run SBT once more:
> run
Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1:
Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1::
Multiple main classes detected, select one to run:
@ -80,7 +85,7 @@ Select to run "sample.remote.calculator.LookupApp" which in the case below is nu
Enter number: 1
Now you should see something like this:
Now you should see something like this::
[info] Running sample.remote.calculator.LookupApp
[INFO] [12/22/2011 14:54:38.630] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://LookupApplication@127.0.0.1:2553
@ -102,7 +107,7 @@ Once more you should open a new terminal window and run SBT:
> run
Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2:
Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2::
Multiple main classes detected, select one to run:
@ -112,7 +117,7 @@ Select to run "sample.remote.calculator.CreationApp" which in the case below is
Enter number: 2
Now you should see something like this:
Now you should see something like this::
[info] Running sample.remote.calculator.CreationApp
[INFO] [12/22/2011 14:57:02.150] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://RemoteCreation@127.0.0.1:2554
@ -134,5 +139,5 @@ The sample application is just that, i.e. a sample. Parts of it are not the way
Some improvements are to remove all hard coded addresses from the code as they reduce the flexibility of how and
where the application can be run. We leave this to the astute reader to refine the sample into a real-world app.
[akka]: http://akka.io
[sbt]: http://code.google.com/p/simple-build-tool/
* `Akka <http://akka.io/>`_
* `SBT <http://https://github.com/harrah/xsbt/wiki/>`_

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.ActorRef;
public class InternalMsg {
static class MathOpMsg {
private final ActorRef actor;
private final Op.MathOp mathOp;
MathOpMsg(ActorRef actor, Op.MathOp mathOp) {
this.actor = actor;
this.mathOp = mathOp;
}
public ActorRef getActor() {
return actor;
}
public Op.MathOp getMathOp() {
return mathOp;
}
}
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.UntypedActor;
public class JAdvancedCalculatorActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Op.Multiply) {
Op.Multiply multiply = (Op.Multiply) message;
System.out.println("Calculating " + multiply.getN1() + " * " + multiply.getN2());
getSender().tell(new Op.MultiplicationResult(multiply.getN1(), multiply.getN2(), multiply.getN1() * multiply.getN2()));
} else if (message instanceof Op.Divide) {
Op.Divide divide = (Op.Divide) message;
System.out.println("Calculating " + divide.getN1() + " / " + divide.getN2());
getSender().tell(new Op.DivisionResult(divide.getN1(), divide.getN2(), divide.getN1() / divide.getN2()));
}
}
}

View file

@ -0,0 +1,13 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
public class JCalcApp {
public static void main(String[] args) {
JCalculatorApplication app = new JCalculatorApplication();
System.out.println("Started Calculator Application - waiting for messages");
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
public class JCalculatorApplication implements Bootable {
private ActorSystem system;
public JCalculatorApplication() {
system = ActorSystem.create("CalculatorApplication", ConfigFactory.load().getConfig("calculator"));
ActorRef actor = system.actorOf(new Props(JSimpleCalculatorActor.class), "simpleCalculator");
}
@Override
public void startup() {
}
@Override
public void shutdown() {
system.shutdown();
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.UntypedActor;
import java.text.DecimalFormat;
import java.text.NumberFormat;
public class JCreationActor extends UntypedActor {
private static final NumberFormat formatter = new DecimalFormat("#0.00");
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof InternalMsg.MathOpMsg) {
InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
msg.getActor().tell(msg.getMathOp(), getSelf());
} else if (message instanceof Op.MathResult) {
if (message instanceof Op.MultiplicationResult) {
Op.MultiplicationResult result = (Op.MultiplicationResult) message;
System.out.println("Mul result: " + result.getN1() + " * " +
result.getN2() + " = " + result.getResult());
} else if (message instanceof Op.DivisionResult) {
Op.DivisionResult result = (Op.DivisionResult) message;
System.out.println("Div result: " + result.getN1() + " / " +
result.getN2() + " = " + formatter.format(result.getResult()));
}
}
}
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import java.util.Random;
public class JCreationApp {
public static void main(String[] args) {
JCreationApplication app = new JCreationApplication();
System.out.println("Started Creation Application");
Random r = new Random();
while (true) {
if (r.nextInt(100) % 2 == 0) {
app.doSomething(new Op.Multiply(r.nextInt(100), r.nextInt(100)));
} else {
app.doSomething(new Op.Divide(r.nextInt(10000), r.nextInt(99) + 1));
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
}
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
public class JCreationApplication implements Bootable {
private ActorSystem system;
private ActorRef actor;
private ActorRef remoteActor;
public JCreationApplication() {
system = ActorSystem.create("CreationApplication", ConfigFactory.load().getConfig("remotecreation"));
actor = system.actorOf(new Props(JCreationActor.class));
remoteActor = system.actorOf(new Props(JAdvancedCalculatorActor.class), "advancedCalculator");
}
public void doSomething(Op.MathOp mathOp) {
actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp));
}
@Override
public void startup() {
}
@Override
public void shutdown() {
system.shutdown();
}
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.UntypedActor;
public class JLookupActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof InternalMsg.MathOpMsg) {
InternalMsg.MathOpMsg msg = (InternalMsg.MathOpMsg) message;
msg.getActor().tell(msg.getMathOp(), getSelf());
} else if (message instanceof Op.MathResult) {
if (message instanceof Op.AddResult) {
Op.AddResult result = (Op.AddResult) message;
System.out.println("Add result: " + result.getN1() + " + " +
result.getN2() + " = " + result.getResult());
} else if (message instanceof Op.SubtractResult) {
Op.SubtractResult result = (Op.SubtractResult) message;
System.out.println("Sub result: " + result.getN1() + " - " +
result.getN2() + " = " + result.getResult());
}
}
}
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import java.util.Random;
public class JLookupApp {
public static void main(String[] args) {
JLookupApplication app = new JLookupApplication();
System.out.println("Started Lookup Application");
Random r = new Random();
while (true) {
if (r.nextInt(100) % 2 == 0) {
app.doSomething(new Op.Add(r.nextInt(100), r.nextInt(100)));
} else {
app.doSomething(new Op.Subtract(r.nextInt(100), r.nextInt(100)));
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
}
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;
import com.typesafe.config.ConfigFactory;
public class JLookupApplication implements Bootable {
private ActorSystem system;
private ActorRef actor;
private ActorRef remoteActor;
public JLookupApplication() {
system = ActorSystem.create("LookupApplication", ConfigFactory.load().getConfig("remotelookup"));
actor = system.actorOf(new Props(JLookupActor.class));
remoteActor = system.actorFor("akka://CalculatorApplication@127.0.0.1:2552/user/simpleCalculator");
}
public void doSomething(Op.MathOp mathOp) {
actor.tell(new InternalMsg.MathOpMsg(remoteActor, mathOp));
}
@Override
public void startup() {
}
@Override
public void shutdown() {
system.shutdown();
}
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import akka.actor.UntypedActor;
public class JSimpleCalculatorActor extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof Op.Add) {
Op.Add add = (Op.Add) message;
System.out.println("Calculating " + add.getN1() + " + " + add.getN2());
getSender().tell(new Op.AddResult(add.getN1(), add.getN2(), add.getN1() + add.getN2()));
} else if (message instanceof Op.Subtract) {
Op.Subtract subtract = (Op.Subtract) message;
System.out.println("Calculating " + subtract.getN1() + " - " + subtract.getN2());
getSender().tell(new Op.SubtractResult(subtract.getN1(), subtract.getN2(), subtract.getN1() - subtract.getN2()));
}
}
}

View file

@ -0,0 +1,181 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.remote.calculator.java;
import java.io.Serializable;
public class Op {
public interface MathOp extends Serializable {}
public interface MathResult extends Serializable {}
static class Add implements MathOp {
private final int n1;
private final int n2;
public Add(int n1, int n2) {
this.n1 = n1;
this.n2 = n2;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
}
static class AddResult implements MathResult {
private final int n1;
private final int n2;
private final int result;
public AddResult(int n1, int n2, int result) {
this.n1 = n1;
this.n2 = n2;
this.result = result;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
public int getResult() {
return result;
}
}
static class Subtract implements MathOp {
private final int n1;
private final int n2;
public Subtract(int n1, int n2) {
this.n1 = n1;
this.n2 = n2;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
}
static class SubtractResult implements MathResult {
private final int n1;
private final int n2;
private final int result;
public SubtractResult(int n1, int n2, int result) {
this.n1 = n1;
this.n2 = n2;
this.result = result;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
public int getResult() {
return result;
}
}
static class Multiply implements MathOp {
private final int n1;
private final int n2;
public Multiply(int n1, int n2) {
this.n1 = n1;
this.n2 = n2;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
}
static class MultiplicationResult implements MathResult {
private final int n1;
private final int n2;
private final int result;
public MultiplicationResult(int n1, int n2, int result) {
this.n1 = n1;
this.n2 = n2;
this.result = result;
}
public int getN1() {
return n1;
}
public int getN2() {
return n2;
}
public int getResult() {
return result;
}
}
static class Divide implements MathOp {
private final double n1;
private final int n2;
public Divide(double n1, int n2) {
this.n1 = n1;
this.n2 = n2;
}
public double getN1() {
return n1;
}
public int getN2() {
return n2;
}
}
static class DivisionResult implements MathResult {
private final double n1;
private final int n2;
private final double result;
public DivisionResult(double n1, int n2, double result) {
this.n1 = n1;
this.n2 = n2;
this.result = result;
}
public double getN1() {
return n1;
}
public int getN2() {
return n2;
}
public double getResult() {
return result;
}
}
}

View file

@ -13,7 +13,7 @@ case class Subtract(nbr1: Int, nbr2: Int) extends MathOp
case class Multiply(nbr1: Int, nbr2: Int) extends MathOp
case class Divide(nbr1: Int, nbr2: Int) extends MathOp
case class Divide(nbr1: Double, nbr2: Int) extends MathOp
trait MathResult
@ -31,7 +31,7 @@ class AdvancedCalculatorActor extends Actor {
println("Calculating %d * %d".format(n1, n2))
sender ! MultiplicationResult(n1, n2, n1 * n2)
case Divide(n1, n2)
println("Calculating %d / %d".format(n1, n2))
println("Calculating %.0f / %d".format(n1, n2))
sender ! DivisionResult(n1, n2, n1 / n2)
}
}

View file

@ -165,10 +165,6 @@ class CallingThreadDispatcher(
}
}
override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
receiver.mailbox match {
case mbox: CallingThreadMailbox
@ -304,6 +300,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
override def enqueue(receiver: ActorRef, msg: Envelope) {}
override def dequeue() = null
override def hasMessages = true
override def numberOfMessages = 0
override def hasMessages = queue.isEmpty
override def numberOfMessages = queue.size
}

View file

@ -56,9 +56,9 @@ object TestActorRefSpec {
class WorkerActor() extends TActor {
def receiveT = {
case "work" sender ! "workDone"; context.stop(self)
case replyTo: Promise[Any] replyTo.success("complexReply")
case replyTo: ActorRef replyTo ! "complexReply"
case "work" sender ! "workDone"; context.stop(self)
case replyTo: Promise[_] replyTo.asInstanceOf[Promise[Any]].success("complexReply")
case replyTo: ActorRef replyTo ! "complexReply"
}
}