Removing DaemonMsgWatch, oh yeah baby. However, still no cigar

This commit is contained in:
Viktor Klang 2012-05-29 14:09:22 +02:00
parent 4bb1a40581
commit e3e391e5aa
11 changed files with 102 additions and 811 deletions

View file

@ -36,7 +36,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"notify with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props.empty)
startWatching(terminal) ! "hallo"
expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill
expectMsg("hallo")
terminal ! PoisonPill

View file

@ -460,28 +460,22 @@ private[akka] class ActorCell(
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
override final def watch(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
subject match {
case a: InternalActorRef
if (!watching.contains(a)) {
watching += a
a.sendSystemMessage(Watch(a, self))
}
}
subject
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (!watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
}
override final def unwatch(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
subject match {
case a: InternalActorRef
if (watching.contains(a)) {
watching -= a
a.sendSystemMessage(Unwatch(a, self))
}
}
subject
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
}
final def children: Iterable[ActorRef] = childrenRefs.children
@ -579,18 +573,26 @@ private[akka] class ActorCell(
def resume(): Unit = if (isNormal) dispatcher resume this
def addWatcher(watcher: ActorRef): Unit = if (!isTerminating) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher))
}
def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
if (watchee == self) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher))
}
} else if (watcher == self) {
watch(watchee)
} else println("addNOOOOOOOOO: " + watchee + " => " + watcher)
}
def remWatcher(watcher: ActorRef): Unit = if (!isTerminating) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher))
}
def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
if (watchee == self) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher))
}
} else if (watcher == self) {
unwatch(watchee)
} else println("remNOOOOOOOOO: " + watchee + " => " + watcher)
}
def terminate() {
@ -617,17 +619,15 @@ private[akka] class ActorCell(
try {
message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Watch(`self`, watcher) addWatcher(watcher)
case Watch(watchee, `self`) watch(watchee)
case Unwatch(`self`, watcher) remWatcher(watcher)
case Unwatch(watchee, `self`) unwatch(watchee)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case Create() create()
case Recreate(cause) recreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
@ -714,27 +714,33 @@ private[akka] class ActorCell(
} finally {
try {
parent.sendSystemMessage(ChildTerminated(self))
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(stopped = true)
watchedBy foreach {
watcher
try watcher.tell(terminated) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watchedBy = emptyActorRefSet
}
if (!watching.isEmpty) {
watching foreach {
watchee
try watchee.tell(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
try {
watching foreach {
case watchee: InternalActorRef
try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped"))
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally {
behaviorStack = ActorCell.behaviorStackPlaceHolder
behaviorStack = behaviorStackPlaceHolder
clearActorFields(a)
actor = null
}

View file

@ -409,13 +409,17 @@ private[akka] object DeadLetterActorRef {
*
* INTERNAL API
*/
private[akka] class EmptyLocalActorRef(
override val provider: ActorRefProvider,
override val path: ActorPath,
val eventStream: EventStream) extends MinimalActorRef {
private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
override val path: ActorPath,
val eventStream: EventStream) extends MinimalActorRef {
override def isTerminated(): Boolean = true
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case Watch(maybeThis, watcher) if maybeThis == this watcher ! Terminated(this)(stopped = false)
case _
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops, since deadLetters will resend!
case _ eventStream.publish(DeadLetter(message, sender, this))
@ -428,8 +432,15 @@ private[akka] class EmptyLocalActorRef(
*
* INTERNAL API
*/
private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, _path: ActorPath, _eventStream: EventStream)
extends EmptyLocalActorRef(_provider, _path, _eventStream) {
private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
_path: ActorPath,
_eventStream: EventStream) extends EmptyLocalActorRef(_provider, _path, _eventStream) {
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case Watch(maybeThis, watcher) if maybeThis == this
case Watch(other, watcher) watcher ! Terminated(other)(stopped = false)
case _
}
override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
case d: DeadLetter eventStream.publish(d)

View file

@ -228,14 +228,11 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
if (!completedJustNow) provider.deadLetters ! message
}
override def sendSystemMessage(message: SystemMessage): Unit = {
val self = this
message match {
case _: Terminate stop()
case Watch(`self`, watcher) //FIXME IMPLEMENT
case Unwatch(`self`, watcher) //FIXME IMPLEMENT
case _
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case Watch(watchee, watcher) //FIXME IMPLEMENT
case Unwatch(watchee, watcher) //FIXME IMPLEMENT
case _
}
override def isTerminated: Boolean = state match {
@ -254,8 +251,8 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
try {
ensureCompleted()
val termination = Terminated(this)(stopped = true)
// watchedBy foreach { w => w.tell(termination) }
// watching foreach { w.sendSystemMessage(Unwatch(w, self)) }
// FIXME watchedBy foreach { w => w.tell(termination) }
// FIXME watching foreach { w.sendSystemMessage(Unwatch(w, self)) }
} finally {
provider.unregisterTempActor(p)
}

View file

@ -309,7 +309,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -981,7 +981,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -1977,7 +1977,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2527,7 +2527,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2936,7 +2936,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -3410,7 +3410,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -3909,7 +3909,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -4487,7 +4487,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -5367,7 +5367,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -6067,7 +6067,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -6351,605 +6351,6 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(class_scope:DeployProtocol)
}
public interface DaemonMsgWatchProtocolOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required .ActorRefProtocol watcher = 1;
boolean hasWatcher();
akka.remote.RemoteProtocol.ActorRefProtocol getWatcher();
akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder();
// required .ActorRefProtocol watched = 2;
boolean hasWatched();
akka.remote.RemoteProtocol.ActorRefProtocol getWatched();
akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder();
}
public static final class DaemonMsgWatchProtocol extends
com.google.protobuf.GeneratedMessage
implements DaemonMsgWatchProtocolOrBuilder {
// Use DaemonMsgWatchProtocol.newBuilder() to construct.
private DaemonMsgWatchProtocol(Builder builder) {
super(builder);
}
private DaemonMsgWatchProtocol(boolean noInit) {}
private static final DaemonMsgWatchProtocol defaultInstance;
public static DaemonMsgWatchProtocol getDefaultInstance() {
return defaultInstance;
}
public DaemonMsgWatchProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
}
private int bitField0_;
// required .ActorRefProtocol watcher = 1;
public static final int WATCHER_FIELD_NUMBER = 1;
private akka.remote.RemoteProtocol.ActorRefProtocol watcher_;
public boolean hasWatcher() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() {
return watcher_;
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() {
return watcher_;
}
// required .ActorRefProtocol watched = 2;
public static final int WATCHED_FIELD_NUMBER = 2;
private akka.remote.RemoteProtocol.ActorRefProtocol watched_;
public boolean hasWatched() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() {
return watched_;
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() {
return watched_;
}
private void initFields() {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasWatcher()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasWatched()) {
memoizedIsInitialized = 0;
return false;
}
if (!getWatcher().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
if (!getWatched().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, watcher_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, watched_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, watcher_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, watched_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.remote.RemoteProtocol.DaemonMsgWatchProtocolOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
}
// Construct using akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getWatcherFieldBuilder();
getWatchedFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
if (watcherBuilder_ == null) {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
} else {
watcherBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
if (watchedBuilder_ == null) {
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
} else {
watchedBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDescriptor();
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol getDefaultInstanceForType() {
return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance();
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol build() {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildPartial() {
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = new akka.remote.RemoteProtocol.DaemonMsgWatchProtocol(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
if (watcherBuilder_ == null) {
result.watcher_ = watcher_;
} else {
result.watcher_ = watcherBuilder_.build();
}
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
if (watchedBuilder_ == null) {
result.watched_ = watched_;
} else {
result.watched_ = watchedBuilder_.build();
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.remote.RemoteProtocol.DaemonMsgWatchProtocol) {
return mergeFrom((akka.remote.RemoteProtocol.DaemonMsgWatchProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol other) {
if (other == akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance()) return this;
if (other.hasWatcher()) {
mergeWatcher(other.getWatcher());
}
if (other.hasWatched()) {
mergeWatched(other.getWatched());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasWatcher()) {
return false;
}
if (!hasWatched()) {
return false;
}
if (!getWatcher().isInitialized()) {
return false;
}
if (!getWatched().isInitialized()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 10: {
akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder();
if (hasWatcher()) {
subBuilder.mergeFrom(getWatcher());
}
input.readMessage(subBuilder, extensionRegistry);
setWatcher(subBuilder.buildPartial());
break;
}
case 18: {
akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder();
if (hasWatched()) {
subBuilder.mergeFrom(getWatched());
}
input.readMessage(subBuilder, extensionRegistry);
setWatched(subBuilder.buildPartial());
break;
}
}
}
}
private int bitField0_;
// required .ActorRefProtocol watcher = 1;
private akka.remote.RemoteProtocol.ActorRefProtocol watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watcherBuilder_;
public boolean hasWatcher() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() {
if (watcherBuilder_ == null) {
return watcher_;
} else {
return watcherBuilder_.getMessage();
}
}
public Builder setWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watcherBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
watcher_ = value;
onChanged();
} else {
watcherBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
return this;
}
public Builder setWatcher(
akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) {
if (watcherBuilder_ == null) {
watcher_ = builderForValue.build();
onChanged();
} else {
watcherBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
return this;
}
public Builder mergeWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watcherBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
watcher_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) {
watcher_ =
akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watcher_).mergeFrom(value).buildPartial();
} else {
watcher_ = value;
}
onChanged();
} else {
watcherBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000001;
return this;
}
public Builder clearWatcher() {
if (watcherBuilder_ == null) {
watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
onChanged();
} else {
watcherBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatcherBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getWatcherFieldBuilder().getBuilder();
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() {
if (watcherBuilder_ != null) {
return watcherBuilder_.getMessageOrBuilder();
} else {
return watcher_;
}
}
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>
getWatcherFieldBuilder() {
if (watcherBuilder_ == null) {
watcherBuilder_ = new com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>(
watcher_,
getParentForChildren(),
isClean());
watcher_ = null;
}
return watcherBuilder_;
}
// required .ActorRefProtocol watched = 2;
private akka.remote.RemoteProtocol.ActorRefProtocol watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watchedBuilder_;
public boolean hasWatched() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() {
if (watchedBuilder_ == null) {
return watched_;
} else {
return watchedBuilder_.getMessage();
}
}
public Builder setWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watchedBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
watched_ = value;
onChanged();
} else {
watchedBuilder_.setMessage(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder setWatched(
akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) {
if (watchedBuilder_ == null) {
watched_ = builderForValue.build();
onChanged();
} else {
watchedBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000002;
return this;
}
public Builder mergeWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) {
if (watchedBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002) &&
watched_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) {
watched_ =
akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watched_).mergeFrom(value).buildPartial();
} else {
watched_ = value;
}
onChanged();
} else {
watchedBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder clearWatched() {
if (watchedBuilder_ == null) {
watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance();
onChanged();
} else {
watchedBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatchedBuilder() {
bitField0_ |= 0x00000002;
onChanged();
return getWatchedFieldBuilder().getBuilder();
}
public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() {
if (watchedBuilder_ != null) {
return watchedBuilder_.getMessageOrBuilder();
} else {
return watched_;
}
}
private com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>
getWatchedFieldBuilder() {
if (watchedBuilder_ == null) {
watchedBuilder_ = new com.google.protobuf.SingleFieldBuilder<
akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>(
watched_,
getParentForChildren(),
isClean());
watched_ = null;
}
return watchedBuilder_;
}
// @@protoc_insertion_point(builder_scope:DaemonMsgWatchProtocol)
}
static {
defaultInstance = new DaemonMsgWatchProtocol(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:DaemonMsgWatchProtocol)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_AkkaRemoteProtocol_descriptor;
private static
@ -7000,11 +6401,6 @@ public final class RemoteProtocol {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_DeployProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_DaemonMsgWatchProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_DaemonMsgWatchProtocol_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -7039,11 +6435,9 @@ public final class RemoteProtocol {
"ssCreator\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014rout" +
"erConfig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path" +
"\030\001 \002(\t\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003" +
" \001(\014\022\r\n\005scope\030\004 \001(\014\"`\n\026DaemonMsgWatchPro" +
"tocol\022\"\n\007watcher\030\001 \002(\0132\021.ActorRefProtoco" +
"l\022\"\n\007watched\030\002 \002(\0132\021.ActorRefProtocol*7\n" +
"\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022" +
"\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001"
" \001(\014\022\r\n\005scope\030\004 \001(\014*7\n\013CommandType\022\013\n\007CO" +
"NNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013" +
"akka.remoteH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -7130,14 +6524,6 @@ public final class RemoteProtocol {
new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", },
akka.remote.RemoteProtocol.DeployProtocol.class,
akka.remote.RemoteProtocol.DeployProtocol.Builder.class);
internal_static_DaemonMsgWatchProtocol_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_DaemonMsgWatchProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_DaemonMsgWatchProtocol_descriptor,
new java.lang.String[] { "Watcher", "Watched", },
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.class,
akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.Builder.class);
return null;
}
};

View file

@ -107,12 +107,4 @@ message DeployProtocol {
optional bytes config = 2;
optional bytes routerConfig = 3;
optional bytes scope = 4;
}
/**
* Serialization of akka.remote.DaemonMsgWatch
*/
message DaemonMsgWatchProtocol {
required ActorRefProtocol watcher = 1;
required ActorRefProtocol watched = 2;
}
}

View file

@ -15,7 +15,6 @@ akka {
serializers {
proto = "akka.serialization.ProtobufSerializer"
daemon-create = "akka.serialization.DaemonMsgCreateSerializer"
daemon-watch = "akka.serialization.DaemonMsgWatchSerializer"
}
@ -24,7 +23,6 @@ akka {
# does, need to use the more specific one here in order to avoid ambiguity
"com.google.protobuf.GeneratedMessage" = proto
"akka.remote.DaemonMsgCreate" = daemon-create
"akka.remote.DaemonMsgWatch" = daemon-watch
}
deployment {

View file

@ -12,7 +12,6 @@ import akka.dispatch.Watch
private[akka] sealed trait DaemonMsg
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
private[akka] case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg
/**
* Internal system "daemon" actor for remote internal communication.
@ -67,15 +66,11 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
case _
log.error("remote path does not match path from message [{}]", message)
}
case DaemonMsgWatch(watcher, watched)
system.actorFor(watcher.path.root / "remote") match {
case a: InternalActorRef a.sendSystemMessage(Watch(watched, a))
}
}
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case t: Terminated //FIXME system.deathWatch.publish(t)
case t: Terminated
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}

View file

@ -1,43 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor.ActorRef
import akka.remote.DaemonMsgWatch
import akka.remote.RemoteProtocol.ActorRefProtocol
import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol
import akka.actor.ExtendedActorSystem
/**
* Serializes akka's internal DaemonMsgWatch using protobuf.
*
* INTERNAL API
*/
private[akka] class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer {
import ProtobufSerializer.serializeActorRef
import ProtobufSerializer.deserializeActorRef
def includeManifest: Boolean = false
def identifier = 4
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case DaemonMsgWatch(watcher, watched)
DaemonMsgWatchProtocol.newBuilder.
setWatcher(serializeActorRef(watcher)).
setWatched(serializeActorRef(watched)).
build.toByteArray
case _
throw new IllegalArgumentException(
"Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj))
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = {
val proto = DaemonMsgWatchProtocol.parseFrom(bytes)
DaemonMsgWatch(
watcher = deserializeActorRef(system, proto.getWatcher),
watched = deserializeActorRef(system, proto.getWatched))
}
}

View file

@ -1,49 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.testkit.AkkaSpec
import akka.remote.DaemonMsgWatch
import akka.actor.Actor
import akka.actor.Props
object DaemonMsgWatchSerializerSpec {
class MyActor extends Actor {
def receive = {
case _
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DaemonMsgWatchSerializerSpec extends AkkaSpec {
import DaemonMsgWatchSerializerSpec._
val ser = SerializationExtension(system)
"Serialization" must {
"resolve DaemonMsgWatchSerializer" in {
ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer])
}
"serialize and de-serialize DaemonMsgWatch" in {
val watcher = system.actorOf(Props[MyActor], "watcher")
val watched = system.actorOf(Props[MyActor], "watched")
val msg = DaemonMsgWatch(watcher, watched)
val bytes = ser.serialize(msg) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
}
ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match {
case Left(exception) fail(exception)
case Right(m) assert(m === msg)
}
}
}
}

View file

@ -111,9 +111,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
"akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true,
"akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG")
val system = ActorSystem("AkkaSpec1", ConfigFactory.parseMap(conf.asJava).withFallback(AkkaSpec.testConf))
val spec = new AkkaSpec(system) {
val ref = Seq(testActor, system.actorOf(Props.empty, "name"))
}
val spec = new AkkaSpec(system) { val ref = Seq(testActor, system.actorOf(Props.empty, "name")) }
spec.ref foreach (_.isTerminated must not be true)
system.shutdown()
spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds)