diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala index 8346778ac8..e8f27fbabd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSelectionSpec.scala @@ -351,24 +351,45 @@ class ActorSelectionSpec extends AkkaSpec("akka.loglevel=DEBUG") with DefaultTim d.recipient.path.toStringWithoutAddress should be("/user/missing") } - "send ActorSelection wildcard targeted to missing actor to deadLetters" in { + "identify actors with wildcard selection correctly" in { val creator = TestProbe() implicit def self = creator.ref - val top = system.actorOf(p, "top") - top ! Create("child1") - top ! Create("child2") + val top = system.actorOf(p, "a") + val b1 = Await.result((top ? Create("b1")).mapTo[ActorRef], timeout.duration) + val b2 = Await.result((top ? Create("b2")).mapTo[ActorRef], timeout.duration) + val c = Await.result((b2 ? Create("c")).mapTo[ActorRef], timeout.duration) + val d = Await.result((c ? Create("d")).mapTo[ActorRef], timeout.duration) val probe = TestProbe() - system.eventStream.subscribe(probe.ref, classOf[DeadLetter]) - system.actorSelection("/user/top/*/a").tell("wild", testActor) - // wildcard matches both child1 and child2 - val d1 = probe.expectMsgType[DeadLetter] - val d2 = probe.expectMsgType[DeadLetter] - List(d1, d2) foreach { d ⇒ - d.message should be("wild") - d.sender should be(testActor) - d.recipient.path.toStringWithoutAddress should (equal("/user/top/child1/a") or equal("/user/top/child2/a")) - } + system.actorSelection("/user/a/*").tell(Identify(1), probe.ref) + probe.receiveN(2).map { case ActorIdentity(1, r) ⇒ r }.toSet should be(Set(Some(b1), Some(b2))) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/b1/*").tell(Identify(2), probe.ref) + probe.expectMsg(ActorIdentity(2, None)) + + system.actorSelection("/user/a/*/c").tell(Identify(3), probe.ref) + probe.expectMsg(ActorIdentity(3, Some(c))) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/b2/*/d").tell(Identify(4), probe.ref) + probe.expectMsg(ActorIdentity(4, Some(d))) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/*/*/d").tell(Identify(5), probe.ref) + probe.expectMsg(ActorIdentity(5, Some(d))) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/*/c/*").tell(Identify(6), probe.ref) + probe.expectMsg(ActorIdentity(6, Some(d))) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/b2/*/d/e").tell(Identify(7), probe.ref) + probe.expectMsg(ActorIdentity(7, None)) + probe.expectNoMsg(200.millis) + + system.actorSelection("/user/a/*/c/d/e").tell(Identify(8), probe.ref) + probe.expectNoMsg(500.millis) } "forward to selection" in { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 34b6f4fef6..99ddcd6396 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -517,7 +517,8 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, true case sel: ActorSelectionMessage ⇒ sel.identifyRequest match { - case Some(identify) ⇒ sender ! ActorIdentity(identify.messageId, None) + case Some(identify) ⇒ + if (!sel.wildcardFanOut) sender ! ActorIdentity(identify.messageId, None) case None ⇒ eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 35333e7087..c200c100b0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -38,7 +38,7 @@ abstract class ActorSelection extends Serializable { */ def tell(msg: Any, sender: ActorRef): Unit = ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender, - ActorSelectionMessage(msg, path)) + ActorSelectionMessage(msg, path, wildcardFanOut = false)) /** * Forwards the message and passes the original sender actor as the sender. @@ -177,11 +177,16 @@ object ActorSelection { if (sel.elements.isEmpty) anchor.tell(sel.msg, sender) else { + val iter = sel.elements.iterator @tailrec def rec(ref: InternalActorRef): Unit = { ref match { case refWithCell: ActorRefWithCell ⇒ + + def emptyRef = new EmptyLocalActorRef(refWithCell.provider, anchor.path / sel.elements.map(_.toString), + refWithCell.underlying.system.eventStream) + iter.next() match { case SelectParent ⇒ val parent = ref.getParent @@ -192,9 +197,8 @@ object ActorSelection { case SelectChildName(name) ⇒ val child = refWithCell.getSingleChild(name) if (child == Nobody) { - val emptyRef = new EmptyLocalActorRef(refWithCell.provider, anchor.path / sel.elements.map(_.toString), - refWithCell.underlying.system.eventStream) - emptyRef.tell(sel, sender) + // don't send to emptyRef after wildcard fan-out + if (!sel.wildcardFanOut) emptyRef.tell(sel, sender) } else if (iter.isEmpty) child.tell(sel.msg, sender) else @@ -202,13 +206,23 @@ object ActorSelection { case p: SelectChildPattern ⇒ // fan-out when there is a wildcard val chldr = refWithCell.children - if (iter.isEmpty) - for (c ← chldr if p.pattern.matcher(c.path.name).matches) - c.tell(sel.msg, sender) - else { - val m = sel.copy(elements = iter.toVector) - for (c ← chldr if p.pattern.matcher(c.path.name).matches) - deliverSelection(c.asInstanceOf[InternalActorRef], sender, m) + if (iter.isEmpty) { + // leaf + val matchingChildren = chldr.filter(c ⇒ p.pattern.matcher(c.path.name).matches) + if (matchingChildren.isEmpty && !sel.wildcardFanOut) + emptyRef.tell(sel, sender) + else + matchingChildren.foreach(_.tell(sel.msg, sender)) + } else { + val matchingChildren = chldr.filter(c ⇒ p.pattern.matcher(c.path.name).matches) + // don't send to emptyRef after wildcard fan-out + if (matchingChildren.isEmpty && !sel.wildcardFanOut) + emptyRef.tell(sel, sender) + else { + val m = sel.copy(elements = iter.toVector, + wildcardFanOut = sel.wildcardFanOut || matchingChildren.size > 1) + matchingChildren.foreach(c ⇒ deliverSelection(c.asInstanceOf[InternalActorRef], sender, m)) + } } } @@ -238,8 +252,11 @@ trait ScalaActorSelection { * nested path descriptions whenever using ! on them, the idea being that the * message is delivered by traversing the various actor paths involved. */ -@SerialVersionUID(1L) -private[akka] final case class ActorSelectionMessage(msg: Any, elements: immutable.Iterable[SelectionPathElement]) +@SerialVersionUID(2L) // it has protobuf serialization in akka-remote +private[akka] final case class ActorSelectionMessage( + msg: Any, + elements: immutable.Iterable[SelectionPathElement], + wildcardFanOut: Boolean) extends AutoReceivedMessage with PossiblyHarmful { def identifyRequest: Option[Identify] = msg match { diff --git a/akka-remote/src/main/java/akka/remote/ContainerFormats.java b/akka-remote/src/main/java/akka/remote/ContainerFormats.java index c8c2febbb8..52e6d76813 100644 --- a/akka-remote/src/main/java/akka/remote/ContainerFormats.java +++ b/akka-remote/src/main/java/akka/remote/ContainerFormats.java @@ -156,6 +156,16 @@ public final class ContainerFormats { * optional bytes messageManifest = 4; */ com.google.protobuf.ByteString getMessageManifest(); + + // optional bool wildcardFanOut = 5; + /** + * optional bool wildcardFanOut = 5; + */ + boolean hasWildcardFanOut(); + /** + * optional bool wildcardFanOut = 5; + */ + boolean getWildcardFanOut(); } /** * Protobuf type {@code SelectionEnvelope} @@ -231,6 +241,11 @@ public final class ContainerFormats { messageManifest_ = input.readBytes(); break; } + case 40: { + bitField0_ |= 0x00000008; + wildcardFanOut_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -358,11 +373,28 @@ public final class ContainerFormats { return messageManifest_; } + // optional bool wildcardFanOut = 5; + public static final int WILDCARDFANOUT_FIELD_NUMBER = 5; + private boolean wildcardFanOut_; + /** + * optional bool wildcardFanOut = 5; + */ + public boolean hasWildcardFanOut() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional bool wildcardFanOut = 5; + */ + public boolean getWildcardFanOut() { + return wildcardFanOut_; + } + private void initFields() { enclosedMessage_ = com.google.protobuf.ByteString.EMPTY; serializerId_ = 0; pattern_ = java.util.Collections.emptyList(); messageManifest_ = com.google.protobuf.ByteString.EMPTY; + wildcardFanOut_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -402,6 +434,9 @@ public final class ContainerFormats { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(4, messageManifest_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(5, wildcardFanOut_); + } getUnknownFields().writeTo(output); } @@ -427,6 +462,10 @@ public final class ContainerFormats { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, messageManifest_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(5, wildcardFanOut_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -556,6 +595,8 @@ public final class ContainerFormats { } messageManifest_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); + wildcardFanOut_ = false; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -605,6 +646,10 @@ public final class ContainerFormats { to_bitField0_ |= 0x00000004; } result.messageManifest_ = messageManifest_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.wildcardFanOut_ = wildcardFanOut_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -656,6 +701,9 @@ public final class ContainerFormats { if (other.hasMessageManifest()) { setMessageManifest(other.getMessageManifest()); } + if (other.hasWildcardFanOut()) { + setWildcardFanOut(other.getWildcardFanOut()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1042,6 +1090,39 @@ public final class ContainerFormats { return this; } + // optional bool wildcardFanOut = 5; + private boolean wildcardFanOut_ ; + /** + * optional bool wildcardFanOut = 5; + */ + public boolean hasWildcardFanOut() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool wildcardFanOut = 5; + */ + public boolean getWildcardFanOut() { + return wildcardFanOut_; + } + /** + * optional bool wildcardFanOut = 5; + */ + public Builder setWildcardFanOut(boolean value) { + bitField0_ |= 0x00000010; + wildcardFanOut_ = value; + onChanged(); + return this; + } + /** + * optional bool wildcardFanOut = 5; + */ + public Builder clearWildcardFanOut() { + bitField0_ = (bitField0_ & ~0x00000010); + wildcardFanOut_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SelectionEnvelope) } @@ -1641,14 +1722,14 @@ public final class ContainerFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\026ContainerFormats.proto\"x\n\021SelectionEnv" + - "elope\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serial" + - "izerId\030\002 \002(\005\022\033\n\007pattern\030\003 \003(\0132\n.Selectio" + - "n\022\027\n\017messageManifest\030\004 \001(\014\"8\n\tSelection\022" + - "\032\n\004type\030\001 \002(\0162\014.PatternType\022\017\n\007matcher\030\002" + - " \001(\t*<\n\013PatternType\022\n\n\006PARENT\020\000\022\016\n\nCHILD" + - "_NAME\020\001\022\021\n\rCHILD_PATTERN\020\002B\017\n\013akka.remot" + - "eH\001" + "\n\026ContainerFormats.proto\"\220\001\n\021SelectionEn" + + "velope\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014seria" + + "lizerId\030\002 \002(\005\022\033\n\007pattern\030\003 \003(\0132\n.Selecti" + + "on\022\027\n\017messageManifest\030\004 \001(\014\022\026\n\016wildcardF" + + "anOut\030\005 \001(\010\"8\n\tSelection\022\032\n\004type\030\001 \002(\0162\014" + + ".PatternType\022\017\n\007matcher\030\002 \001(\t*<\n\013Pattern" + + "Type\022\n\n\006PARENT\020\000\022\016\n\nCHILD_NAME\020\001\022\021\n\rCHIL" + + "D_PATTERN\020\002B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1660,7 +1741,7 @@ public final class ContainerFormats { internal_static_SelectionEnvelope_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SelectionEnvelope_descriptor, - new java.lang.String[] { "EnclosedMessage", "SerializerId", "Pattern", "MessageManifest", }); + new java.lang.String[] { "EnclosedMessage", "SerializerId", "Pattern", "MessageManifest", "WildcardFanOut", }); internal_static_Selection_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_Selection_fieldAccessorTable = new diff --git a/akka-remote/src/main/protobuf/ContainerFormats.proto b/akka-remote/src/main/protobuf/ContainerFormats.proto index f1f449e82a..cbd9561ee7 100644 --- a/akka-remote/src/main/protobuf/ContainerFormats.proto +++ b/akka-remote/src/main/protobuf/ContainerFormats.proto @@ -14,6 +14,7 @@ message SelectionEnvelope { required int32 serializerId = 2; repeated Selection pattern = 3; optional bytes messageManifest = 4; + optional bool wildcardFanOut = 5; // optional for pre 2.3.4 compatibility } enum PatternType { diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index e96122900d..b3ba7bb44d 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -583,7 +583,7 @@ private[remote] class EndpointWriter( def enqueueInBuffer(msg: AnyRef): Unit = msg match { case s @ Send(_: PriorityMessage, _, _, _) ⇒ prioBuffer offer s - case s @ Send(ActorSelectionMessage(_: PriorityMessage, _), _, _, _) ⇒ prioBuffer offer s + case s @ Send(ActorSelectionMessage(_: PriorityMessage, _, _), _, _, _) ⇒ prioBuffer offer s case _ ⇒ buffer offer msg } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index 09a66ef51b..0f6b82e085 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -34,7 +34,9 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial val serializer = SerializationExtension(system).findSerializerFor(message) builder. setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))). - setSerializerId(serializer.identifier) + setSerializerId(serializer.identifier). + setWildcardFanOut(sel.wildcardFanOut) + if (serializer.includeManifest) builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) @@ -73,6 +75,7 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends Serial } }(collection.breakOut) - ActorSelectionMessage(msg, elements) + val wildcardFanOut = if (selectionEnvelope.hasWildcardFanOut) selectionEnvelope.getWildcardFanOut else false + ActorSelectionMessage(msg, elements, wildcardFanOut) } } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala index a61d3fb622..f3a3e0b75f 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MessageContainerSerializerSpec.scala @@ -27,7 +27,7 @@ class MessageContainerSerializerSpec extends AkkaSpec { "serialize and de-serialize ActorSelectionMessage" in { verifySerialization(ActorSelectionMessage("hello", Vector( SelectChildName("user"), SelectChildName("a"), SelectChildName("b"), SelectParent, - SelectChildPattern("*"), SelectChildName("c")))) + SelectChildPattern("*"), SelectChildName("c")), wildcardFanOut = true)) } def verifySerialization(msg: AnyRef): Unit = {