diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 7a1aa35485..97eec5be01 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -36,7 +36,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "notify with one Terminated message when an Actor is stopped" in { val terminal = system.actorOf(Props.empty) startWatching(terminal) ! "hallo" - expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill + expectMsg("hallo") terminal ! PoisonPill diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index cb804703ed..c09f40cebd 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -460,28 +460,22 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) - override final def watch(subject: ActorRef): ActorRef = { - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - subject match { - case a: InternalActorRef ⇒ - if (!watching.contains(a)) { - watching += a - a.sendSystemMessage(Watch(a, self)) - } - } - subject + override final def watch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (!watching.contains(a)) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } + a } - override final def unwatch(subject: ActorRef): ActorRef = { - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - subject match { - case a: InternalActorRef ⇒ - if (watching.contains(a)) { - watching -= a - a.sendSystemMessage(Unwatch(a, self)) - } - } - subject + override final def unwatch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (watching.contains(a)) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } + a } final def children: Iterable[ActorRef] = childrenRefs.children @@ -579,18 +573,26 @@ private[akka] class ActorCell( def resume(): Unit = if (isNormal) dispatcher resume this - def addWatcher(watcher: ActorRef): Unit = if (!isTerminating) { - if (!watchedBy.contains(watcher)) { - watchedBy += watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher)) - } + def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + if (watchee == self) { + if (!watchedBy.contains(watcher)) { + watchedBy += watcher + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher)) + } + } else if (watcher == self) { + watch(watchee) + } else println("addNOOOOOOOOO: " + watchee + " => " + watcher) } - def remWatcher(watcher: ActorRef): Unit = if (!isTerminating) { - if (watchedBy.contains(watcher)) { - watchedBy -= watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher)) - } + def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + if (watchee == self) { + if (watchedBy.contains(watcher)) { + watchedBy -= watcher + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher)) + } + } else if (watcher == self) { + unwatch(watchee) + } else println("remNOOOOOOOOO: " + watchee + " => " + watcher) } def terminate() { @@ -617,17 +619,15 @@ private[akka] class ActorCell( try { message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Watch(`self`, watcher) ⇒ addWatcher(watcher) - case Watch(watchee, `self`) ⇒ watch(watchee) - case Unwatch(`self`, watcher) ⇒ remWatcher(watcher) - case Unwatch(watchee, `self`) ⇒ unwatch(watchee) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - case ChildTerminated(child) ⇒ handleChildTerminated(child) + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) + case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) @@ -714,27 +714,33 @@ private[akka] class ActorCell( } finally { try { parent.sendSystemMessage(ChildTerminated(self)) + if (!watchedBy.isEmpty) { val terminated = Terminated(self)(stopped = true) - watchedBy foreach { - watcher ⇒ - try watcher.tell(terminated) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } + try { + watchedBy foreach { + watcher ⇒ + try watcher.tell(terminated, self) catch { + case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } + } + } finally watchedBy = emptyActorRefSet } + if (!watching.isEmpty) { - watching foreach { - watchee ⇒ - try watchee.tell(Unwatch(watchee, self)) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } + try { + watching foreach { + case watchee: InternalActorRef ⇒ + try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { + case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } + } + } finally watching = emptyActorRefSet } if (system.settings.DebugLifecycle) - system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) + system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped")) } finally { - behaviorStack = ActorCell.behaviorStackPlaceHolder + behaviorStack = behaviorStackPlaceHolder clearActorFields(a) actor = null } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 460bd02076..ad45f6ad09 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -409,13 +409,17 @@ private[akka] object DeadLetterActorRef { * * INTERNAL API */ -private[akka] class EmptyLocalActorRef( - override val provider: ActorRefProvider, - override val path: ActorPath, - val eventStream: EventStream) extends MinimalActorRef { +private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, + override val path: ActorPath, + val eventStream: EventStream) extends MinimalActorRef { override def isTerminated(): Boolean = true + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case Watch(maybeThis, watcher) if maybeThis == this ⇒ watcher ! Terminated(this)(stopped = false) + case _ ⇒ + } + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { case d: DeadLetter ⇒ // do NOT form endless loops, since deadLetters will resend! case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) @@ -428,8 +432,15 @@ private[akka] class EmptyLocalActorRef( * * INTERNAL API */ -private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, _path: ActorPath, _eventStream: EventStream) - extends EmptyLocalActorRef(_provider, _path, _eventStream) { +private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, + _path: ActorPath, + _eventStream: EventStream) extends EmptyLocalActorRef(_provider, _path, _eventStream) { + + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case Watch(maybeThis, watcher) if maybeThis == this ⇒ + case Watch(other, watcher) ⇒ watcher ! Terminated(other)(stopped = false) + case _ ⇒ + } override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { case d: DeadLetter ⇒ eventStream.publish(d) diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 634299248d..2837bd6546 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -228,14 +228,11 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide if (!completedJustNow) provider.deadLetters ! message } - override def sendSystemMessage(message: SystemMessage): Unit = { - val self = this - message match { - case _: Terminate ⇒ stop() - case Watch(`self`, watcher) ⇒ //FIXME IMPLEMENT - case Unwatch(`self`, watcher) ⇒ //FIXME IMPLEMENT - case _ ⇒ - } + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case _: Terminate ⇒ stop() + case Watch(watchee, watcher) ⇒ //FIXME IMPLEMENT + case Unwatch(watchee, watcher) ⇒ //FIXME IMPLEMENT + case _ ⇒ } override def isTerminated: Boolean = state match { @@ -254,8 +251,8 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide try { ensureCompleted() val termination = Terminated(this)(stopped = true) - // watchedBy foreach { w => w.tell(termination) } - // watching foreach { w.sendSystemMessage(Unwatch(w, self)) } + // FIXME watchedBy foreach { w => w.tell(termination) } + // FIXME watching foreach { w.sendSystemMessage(Unwatch(w, self)) } } finally { provider.unregisterTempActor(p) } diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 0794e54364..8f3ab4e1fb 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -309,7 +309,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -981,7 +981,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -1977,7 +1977,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -2527,7 +2527,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -2936,7 +2936,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3410,7 +3410,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3909,7 +3909,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -4487,7 +4487,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -5367,7 +5367,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -6067,7 +6067,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -6351,605 +6351,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:DeployProtocol) } - public interface DaemonMsgWatchProtocolOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // required .ActorRefProtocol watcher = 1; - boolean hasWatcher(); - akka.remote.RemoteProtocol.ActorRefProtocol getWatcher(); - akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder(); - - // required .ActorRefProtocol watched = 2; - boolean hasWatched(); - akka.remote.RemoteProtocol.ActorRefProtocol getWatched(); - akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder(); - } - public static final class DaemonMsgWatchProtocol extends - com.google.protobuf.GeneratedMessage - implements DaemonMsgWatchProtocolOrBuilder { - // Use DaemonMsgWatchProtocol.newBuilder() to construct. - private DaemonMsgWatchProtocol(Builder builder) { - super(builder); - } - private DaemonMsgWatchProtocol(boolean noInit) {} - - private static final DaemonMsgWatchProtocol defaultInstance; - public static DaemonMsgWatchProtocol getDefaultInstance() { - return defaultInstance; - } - - public DaemonMsgWatchProtocol getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable; - } - - private int bitField0_; - // required .ActorRefProtocol watcher = 1; - public static final int WATCHER_FIELD_NUMBER = 1; - private akka.remote.RemoteProtocol.ActorRefProtocol watcher_; - public boolean hasWatcher() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() { - return watcher_; - } - public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() { - return watcher_; - } - - // required .ActorRefProtocol watched = 2; - public static final int WATCHED_FIELD_NUMBER = 2; - private akka.remote.RemoteProtocol.ActorRefProtocol watched_; - public boolean hasWatched() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() { - return watched_; - } - public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() { - return watched_; - } - - private void initFields() { - watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - if (!hasWatcher()) { - memoizedIsInitialized = 0; - return false; - } - if (!hasWatched()) { - memoizedIsInitialized = 0; - return false; - } - if (!getWatcher().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - if (!getWatched().isInitialized()) { - memoizedIsInitialized = 0; - return false; - } - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeMessage(1, watcher_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeMessage(2, watched_); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(1, watcher_); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeMessageSize(2, watched_); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.DaemonMsgWatchProtocol parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements akka.remote.RemoteProtocol.DaemonMsgWatchProtocolOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_DaemonMsgWatchProtocol_fieldAccessorTable; - } - - // Construct using akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - getWatcherFieldBuilder(); - getWatchedFieldBuilder(); - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - if (watcherBuilder_ == null) { - watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - } else { - watcherBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000001); - if (watchedBuilder_ == null) { - watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - } else { - watchedBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDescriptor(); - } - - public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol getDefaultInstanceForType() { - return akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance(); - } - - public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol build() { - akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - private akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return result; - } - - public akka.remote.RemoteProtocol.DaemonMsgWatchProtocol buildPartial() { - akka.remote.RemoteProtocol.DaemonMsgWatchProtocol result = new akka.remote.RemoteProtocol.DaemonMsgWatchProtocol(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - if (watcherBuilder_ == null) { - result.watcher_ = watcher_; - } else { - result.watcher_ = watcherBuilder_.build(); - } - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - if (watchedBuilder_ == null) { - result.watched_ = watched_; - } else { - result.watched_ = watchedBuilder_.build(); - } - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof akka.remote.RemoteProtocol.DaemonMsgWatchProtocol) { - return mergeFrom((akka.remote.RemoteProtocol.DaemonMsgWatchProtocol)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(akka.remote.RemoteProtocol.DaemonMsgWatchProtocol other) { - if (other == akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.getDefaultInstance()) return this; - if (other.hasWatcher()) { - mergeWatcher(other.getWatcher()); - } - if (other.hasWatched()) { - mergeWatched(other.getWatched()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - if (!hasWatcher()) { - - return false; - } - if (!hasWatched()) { - - return false; - } - if (!getWatcher().isInitialized()) { - - return false; - } - if (!getWatched().isInitialized()) { - - return false; - } - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - } - break; - } - case 10: { - akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(); - if (hasWatcher()) { - subBuilder.mergeFrom(getWatcher()); - } - input.readMessage(subBuilder, extensionRegistry); - setWatcher(subBuilder.buildPartial()); - break; - } - case 18: { - akka.remote.RemoteProtocol.ActorRefProtocol.Builder subBuilder = akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(); - if (hasWatched()) { - subBuilder.mergeFrom(getWatched()); - } - input.readMessage(subBuilder, extensionRegistry); - setWatched(subBuilder.buildPartial()); - break; - } - } - } - } - - private int bitField0_; - - // required .ActorRefProtocol watcher = 1; - private akka.remote.RemoteProtocol.ActorRefProtocol watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watcherBuilder_; - public boolean hasWatcher() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public akka.remote.RemoteProtocol.ActorRefProtocol getWatcher() { - if (watcherBuilder_ == null) { - return watcher_; - } else { - return watcherBuilder_.getMessage(); - } - } - public Builder setWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) { - if (watcherBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - watcher_ = value; - onChanged(); - } else { - watcherBuilder_.setMessage(value); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder setWatcher( - akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) { - if (watcherBuilder_ == null) { - watcher_ = builderForValue.build(); - onChanged(); - } else { - watcherBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder mergeWatcher(akka.remote.RemoteProtocol.ActorRefProtocol value) { - if (watcherBuilder_ == null) { - if (((bitField0_ & 0x00000001) == 0x00000001) && - watcher_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { - watcher_ = - akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watcher_).mergeFrom(value).buildPartial(); - } else { - watcher_ = value; - } - onChanged(); - } else { - watcherBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000001; - return this; - } - public Builder clearWatcher() { - if (watcherBuilder_ == null) { - watcher_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - onChanged(); - } else { - watcherBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000001); - return this; - } - public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatcherBuilder() { - bitField0_ |= 0x00000001; - onChanged(); - return getWatcherFieldBuilder().getBuilder(); - } - public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatcherOrBuilder() { - if (watcherBuilder_ != null) { - return watcherBuilder_.getMessageOrBuilder(); - } else { - return watcher_; - } - } - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> - getWatcherFieldBuilder() { - if (watcherBuilder_ == null) { - watcherBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>( - watcher_, - getParentForChildren(), - isClean()); - watcher_ = null; - } - return watcherBuilder_; - } - - // required .ActorRefProtocol watched = 2; - private akka.remote.RemoteProtocol.ActorRefProtocol watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> watchedBuilder_; - public boolean hasWatched() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public akka.remote.RemoteProtocol.ActorRefProtocol getWatched() { - if (watchedBuilder_ == null) { - return watched_; - } else { - return watchedBuilder_.getMessage(); - } - } - public Builder setWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) { - if (watchedBuilder_ == null) { - if (value == null) { - throw new NullPointerException(); - } - watched_ = value; - onChanged(); - } else { - watchedBuilder_.setMessage(value); - } - bitField0_ |= 0x00000002; - return this; - } - public Builder setWatched( - akka.remote.RemoteProtocol.ActorRefProtocol.Builder builderForValue) { - if (watchedBuilder_ == null) { - watched_ = builderForValue.build(); - onChanged(); - } else { - watchedBuilder_.setMessage(builderForValue.build()); - } - bitField0_ |= 0x00000002; - return this; - } - public Builder mergeWatched(akka.remote.RemoteProtocol.ActorRefProtocol value) { - if (watchedBuilder_ == null) { - if (((bitField0_ & 0x00000002) == 0x00000002) && - watched_ != akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { - watched_ = - akka.remote.RemoteProtocol.ActorRefProtocol.newBuilder(watched_).mergeFrom(value).buildPartial(); - } else { - watched_ = value; - } - onChanged(); - } else { - watchedBuilder_.mergeFrom(value); - } - bitField0_ |= 0x00000002; - return this; - } - public Builder clearWatched() { - if (watchedBuilder_ == null) { - watched_ = akka.remote.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); - onChanged(); - } else { - watchedBuilder_.clear(); - } - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - public akka.remote.RemoteProtocol.ActorRefProtocol.Builder getWatchedBuilder() { - bitField0_ |= 0x00000002; - onChanged(); - return getWatchedFieldBuilder().getBuilder(); - } - public akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder getWatchedOrBuilder() { - if (watchedBuilder_ != null) { - return watchedBuilder_.getMessageOrBuilder(); - } else { - return watched_; - } - } - private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder> - getWatchedFieldBuilder() { - if (watchedBuilder_ == null) { - watchedBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.ActorRefProtocol, akka.remote.RemoteProtocol.ActorRefProtocol.Builder, akka.remote.RemoteProtocol.ActorRefProtocolOrBuilder>( - watched_, - getParentForChildren(), - isClean()); - watched_ = null; - } - return watchedBuilder_; - } - - // @@protoc_insertion_point(builder_scope:DaemonMsgWatchProtocol) - } - - static { - defaultInstance = new DaemonMsgWatchProtocol(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:DaemonMsgWatchProtocol) - } - private static com.google.protobuf.Descriptors.Descriptor internal_static_AkkaRemoteProtocol_descriptor; private static @@ -7000,11 +6401,6 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_DeployProtocol_fieldAccessorTable; - private static com.google.protobuf.Descriptors.Descriptor - internal_static_DaemonMsgWatchProtocol_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_DaemonMsgWatchProtocol_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -7039,11 +6435,9 @@ public final class RemoteProtocol { "ssCreator\030\003 \001(\t\022\017\n\007creator\030\004 \001(\014\022\024\n\014rout" + "erConfig\030\005 \001(\014\"S\n\016DeployProtocol\022\014\n\004path" + "\030\001 \002(\t\022\016\n\006config\030\002 \001(\014\022\024\n\014routerConfig\030\003" + - " \001(\014\022\r\n\005scope\030\004 \001(\014\"`\n\026DaemonMsgWatchPro" + - "tocol\022\"\n\007watcher\030\001 \002(\0132\021.ActorRefProtoco" + - "l\022\"\n\007watched\030\002 \002(\0132\021.ActorRefProtocol*7\n" + - "\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002\022" + - "\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001" + " \001(\014\022\r\n\005scope\030\004 \001(\014*7\n\013CommandType\022\013\n\007CO" + + "NNECT\020\001\022\014\n\010SHUTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013" + + "akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7130,14 +6524,6 @@ public final class RemoteProtocol { new java.lang.String[] { "Path", "Config", "RouterConfig", "Scope", }, akka.remote.RemoteProtocol.DeployProtocol.class, akka.remote.RemoteProtocol.DeployProtocol.Builder.class); - internal_static_DaemonMsgWatchProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); - internal_static_DaemonMsgWatchProtocol_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_DaemonMsgWatchProtocol_descriptor, - new java.lang.String[] { "Watcher", "Watched", }, - akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.class, - akka.remote.RemoteProtocol.DaemonMsgWatchProtocol.Builder.class); return null; } }; diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 72b04caa57..7d86d8a82b 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -107,12 +107,4 @@ message DeployProtocol { optional bytes config = 2; optional bytes routerConfig = 3; optional bytes scope = 4; -} - -/** - * Serialization of akka.remote.DaemonMsgWatch - */ -message DaemonMsgWatchProtocol { - required ActorRefProtocol watcher = 1; - required ActorRefProtocol watched = 2; -} +} \ No newline at end of file diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 97b85895ed..a56ea16c9a 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -15,7 +15,6 @@ akka { serializers { proto = "akka.serialization.ProtobufSerializer" daemon-create = "akka.serialization.DaemonMsgCreateSerializer" - daemon-watch = "akka.serialization.DaemonMsgWatchSerializer" } @@ -24,7 +23,6 @@ akka { # does, need to use the more specific one here in order to avoid ambiguity "com.google.protobuf.GeneratedMessage" = proto "akka.remote.DaemonMsgCreate" = daemon-create - "akka.remote.DaemonMsgWatch" = daemon-watch } deployment { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 1e81cfaac6..ddab54b2ad 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -12,7 +12,6 @@ import akka.dispatch.Watch private[akka] sealed trait DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg -private[akka] case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg /** * Internal system "daemon" actor for remote internal communication. @@ -67,15 +66,11 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath case _ ⇒ log.error("remote path does not match path from message [{}]", message) } - case DaemonMsgWatch(watcher, watched) ⇒ - system.actorFor(watcher.path.root / "remote") match { - case a: InternalActorRef ⇒ a.sendSystemMessage(Watch(watched, a)) - } } case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) - case t: Terminated ⇒ //FIXME system.deathWatch.publish(t) + case t: Terminated ⇒ case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } diff --git a/akka-remote/src/main/scala/akka/serialization/DaemonMsgWatchSerializer.scala b/akka-remote/src/main/scala/akka/serialization/DaemonMsgWatchSerializer.scala deleted file mode 100644 index 016d7f14cb..0000000000 --- a/akka-remote/src/main/scala/akka/serialization/DaemonMsgWatchSerializer.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.serialization - -import akka.actor.ActorRef -import akka.remote.DaemonMsgWatch -import akka.remote.RemoteProtocol.ActorRefProtocol -import akka.remote.RemoteProtocol.DaemonMsgWatchProtocol -import akka.actor.ExtendedActorSystem - -/** - * Serializes akka's internal DaemonMsgWatch using protobuf. - * - * INTERNAL API - */ -private[akka] class DaemonMsgWatchSerializer(val system: ExtendedActorSystem) extends Serializer { - import ProtobufSerializer.serializeActorRef - import ProtobufSerializer.deserializeActorRef - - def includeManifest: Boolean = false - def identifier = 4 - - def toBinary(obj: AnyRef): Array[Byte] = obj match { - case DaemonMsgWatch(watcher, watched) ⇒ - DaemonMsgWatchProtocol.newBuilder. - setWatcher(serializeActorRef(watcher)). - setWatched(serializeActorRef(watched)). - build.toByteArray - case _ ⇒ - throw new IllegalArgumentException( - "Can't serialize a non-DaemonMsgWatch message using DaemonMsgWatchSerializer [%s]".format(obj)) - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val proto = DaemonMsgWatchProtocol.parseFrom(bytes) - DaemonMsgWatch( - watcher = deserializeActorRef(system, proto.getWatcher), - watched = deserializeActorRef(system, proto.getWatched)) - } - -} \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/serialization/DaemonMsgWatchSerializerSpec.scala b/akka-remote/src/test/scala/akka/serialization/DaemonMsgWatchSerializerSpec.scala deleted file mode 100644 index a6069beac1..0000000000 --- a/akka-remote/src/test/scala/akka/serialization/DaemonMsgWatchSerializerSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.serialization - -import akka.testkit.AkkaSpec -import akka.remote.DaemonMsgWatch -import akka.actor.Actor -import akka.actor.Props - -object DaemonMsgWatchSerializerSpec { - class MyActor extends Actor { - def receive = { - case _ ⇒ - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DaemonMsgWatchSerializerSpec extends AkkaSpec { - - import DaemonMsgWatchSerializerSpec._ - - val ser = SerializationExtension(system) - - "Serialization" must { - - "resolve DaemonMsgWatchSerializer" in { - ser.serializerFor(classOf[DaemonMsgWatch]).getClass must be(classOf[DaemonMsgWatchSerializer]) - } - - "serialize and de-serialize DaemonMsgWatch" in { - val watcher = system.actorOf(Props[MyActor], "watcher") - val watched = system.actorOf(Props[MyActor], "watched") - val msg = DaemonMsgWatch(watcher, watched) - val bytes = ser.serialize(msg) match { - case Left(exception) ⇒ fail(exception) - case Right(bytes) ⇒ bytes - } - ser.deserialize(bytes.asInstanceOf[Array[Byte]], classOf[DaemonMsgWatch]) match { - case Left(exception) ⇒ fail(exception) - case Right(m) ⇒ assert(m === msg) - } - } - - } -} - diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index f24ea49b8c..d2eeeee776 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -111,9 +111,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { "akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true, "akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG") val system = ActorSystem("AkkaSpec1", ConfigFactory.parseMap(conf.asJava).withFallback(AkkaSpec.testConf)) - val spec = new AkkaSpec(system) { - val ref = Seq(testActor, system.actorOf(Props.empty, "name")) - } + val spec = new AkkaSpec(system) { val ref = Seq(testActor, system.actorOf(Props.empty, "name")) } spec.ref foreach (_.isTerminated must not be true) system.shutdown() spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds)