diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fcd17db043..4eaa33f088 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -796,32 +796,40 @@ class ShardRegion( regionByShard.get(shard) match { case Some(ref) if ref == self ⇒ val (id, m) = idExtractor(msg) - val name = URLEncoder.encode(id, "utf-8") - val entry = context.child(name).getOrElse { - if (entryProps.isEmpty) - throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") - log.debug("Starting entry [{}] in shard [{}]", id, shard) - val a = context.watch(context.actorOf(entryProps.get, name)) - entries = entries.updated(a, shard) - entriesByShard = entriesByShard.updated(shard, entriesByShard.getOrElse(shard, Set.empty) + a) - a - } - passivatingBuffers.get(entry) match { - case None ⇒ - log.debug("Message [{}] for shard [{}] sent to entry", m.getClass.getName, shard) - entry.tell(m, snd) - case Some(buf) ⇒ - if (totalBufferSize >= bufferSize) { - log.debug("Buffer is full, dropping message for passivated entry in shard [{}]", shard) - context.system.deadLetters ! msg - } else { - log.debug("Message for shard [{}] buffered due to entry being passivated", shard) - passivatingBuffers = passivatingBuffers.updated(entry, buf :+ ((msg, snd))) - } + if (id == null || id == "") { + log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) + context.system.deadLetters ! msg + } else { + val name = URLEncoder.encode(id, "utf-8") + val entry = context.child(name).getOrElse { + if (entryProps.isEmpty) + throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") + log.debug("Starting entry [{}] in shard [{}]", id, shard) + val a = context.watch(context.actorOf(entryProps.get, name)) + entries = entries.updated(a, shard) + entriesByShard = entriesByShard.updated(shard, entriesByShard.getOrElse(shard, Set.empty) + a) + a + } + passivatingBuffers.get(entry) match { + case None ⇒ + log.debug("Message [{}] for shard [{}] sent to entry", m.getClass.getName, shard) + entry.tell(m, snd) + case Some(buf) ⇒ + if (totalBufferSize >= bufferSize) { + log.debug("Buffer is full, dropping message for passivated entry in shard [{}]", shard) + context.system.deadLetters ! msg + } else { + log.debug("Message for shard [{}] buffered due to entry being passivated", shard) + passivatingBuffers = passivatingBuffers.updated(entry, buf :+ ((msg, snd))) + } + } } case Some(ref) ⇒ log.debug("Forwarding request for shard [{}] to [{}]", shard, ref) ref.tell(msg, snd) + case None if (shard == null || shard == "") ⇒ + log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName) + context.system.deadLetters ! msg case None ⇒ if (!shardBuffers.contains(shard)) { log.debug("Request shard [{}] home", shard) @@ -1193,7 +1201,6 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite override def receiveRecover: Receive = { case evt: DomainEvent ⇒ evt match { case ShardRegionRegistered(region) ⇒ - context.watch(region) persistentState = persistentState.updated(evt) case ShardRegionProxyRegistered(proxy) ⇒ persistentState = persistentState.updated(evt) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 4e28b75bde..7b8063caa2 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -65,8 +65,12 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) final case class Put(ref: ActorRef) @SerialVersionUID(1L) final case class Remove(path: String) - @SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) - @SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) + @SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) { + require(topic != null && topic != "", "topic must be defined") + } + @SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) { + require(topic != null && topic != "", "topic must be defined") + } @SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe) @SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe) @SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage