=act #13981 Encode name of Balancing dispatcher config path
This commit is contained in:
parent
dd71de5f93
commit
1ffd05db49
3 changed files with 36 additions and 7 deletions
|
|
@ -11,6 +11,7 @@ import akka.actor.{ Props, Actor }
|
||||||
import akka.testkit.{ TestLatch, ImplicitSender, AkkaSpec }
|
import akka.testkit.{ TestLatch, ImplicitSender, AkkaSpec }
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import org.scalatest.BeforeAndAfterEach
|
import org.scalatest.BeforeAndAfterEach
|
||||||
|
import java.net.URLEncoder
|
||||||
|
|
||||||
object BalancingSpec {
|
object BalancingSpec {
|
||||||
val counter = new AtomicInteger(1)
|
val counter = new AtomicInteger(1)
|
||||||
|
|
@ -24,6 +25,15 @@ object BalancingSpec {
|
||||||
sender() ! id
|
sender() ! id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class Parent extends Actor {
|
||||||
|
val pool = context.actorOf(BalancingPool(2).props(routeeProps =
|
||||||
|
Props(classOf[Worker], TestLatch(0)(context.system))))
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case msg ⇒ pool.forward(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -98,5 +108,18 @@ class BalancingSpec extends AkkaSpec(
|
||||||
test(pool, latch)
|
test(pool, latch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"work with anonymous actor names" in {
|
||||||
|
// the dispatcher-id must not contain invalid config key characters (e.g. $a)
|
||||||
|
system.actorOf(Props[Parent]) ! "hello"
|
||||||
|
expectMsgType[Int]
|
||||||
|
}
|
||||||
|
|
||||||
|
"work with encoded actor names" in {
|
||||||
|
val encName = URLEncoder.encode("abcå6#$€xyz", "utf-8")
|
||||||
|
// % is a valid config key character (e.g. %C3%A5)
|
||||||
|
system.actorOf(Props[Parent], encName) ! "hello"
|
||||||
|
expectMsgType[Int]
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,10 @@ final case class BalancingPool(
|
||||||
*/
|
*/
|
||||||
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
|
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
|
||||||
|
|
||||||
val deployPath = context.self.path.elements.drop(1).mkString("/", "/", "")
|
val rawDeployPath = context.self.path.elements.drop(1).mkString("/", "/", "")
|
||||||
|
val deployPath = BalancingPool.invalidConfigKeyChars.foldLeft(rawDeployPath) { (replaced, c) ⇒
|
||||||
|
replaced.replace(c, '_')
|
||||||
|
}
|
||||||
val dispatcherId = s"BalancingPool-$deployPath"
|
val dispatcherId = s"BalancingPool-$deployPath"
|
||||||
def dispatchers = context.system.dispatchers
|
def dispatchers = context.system.dispatchers
|
||||||
|
|
||||||
|
|
@ -147,3 +150,6 @@ final case class BalancingPool(
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object BalancingPool {
|
||||||
|
private val invalidConfigKeyChars = List('$', '@', ':')
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package docs.persistence
|
package docs.persistence
|
||||||
|
|
||||||
import akka.actor.{Actor, ActorSystem, Props}
|
import akka.actor.{ Actor, ActorSystem, Props }
|
||||||
import akka.persistence._
|
import akka.persistence._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
|
@ -29,7 +29,7 @@ trait PersistenceDocSpec {
|
||||||
|
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
//#definition
|
//#definition
|
||||||
import akka.persistence.{PersistenceFailure, Persistent, Processor}
|
import akka.persistence.{ PersistenceFailure, Persistent, Processor }
|
||||||
|
|
||||||
class MyProcessor extends Processor {
|
class MyProcessor extends Processor {
|
||||||
def receive = {
|
def receive = {
|
||||||
|
|
@ -135,7 +135,7 @@ trait PersistenceDocSpec {
|
||||||
|
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
//#at-least-once-example
|
//#at-least-once-example
|
||||||
import akka.actor.{Actor, ActorPath}
|
import akka.actor.{ Actor, ActorPath }
|
||||||
import akka.persistence.AtLeastOnceDelivery
|
import akka.persistence.AtLeastOnceDelivery
|
||||||
|
|
||||||
case class Msg(deliveryId: Long, s: String)
|
case class Msg(deliveryId: Long, s: String)
|
||||||
|
|
@ -177,8 +177,8 @@ trait PersistenceDocSpec {
|
||||||
|
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
//#channel-example
|
//#channel-example
|
||||||
import akka.actor.{Actor, Props}
|
import akka.actor.{ Actor, Props }
|
||||||
import akka.persistence.{Channel, Deliver, Persistent, Processor}
|
import akka.persistence.{ Channel, Deliver, Persistent, Processor }
|
||||||
|
|
||||||
class MyProcessor extends Processor {
|
class MyProcessor extends Processor {
|
||||||
val destination = context.actorOf(Props[MyDestination])
|
val destination = context.actorOf(Props[MyDestination])
|
||||||
|
|
@ -252,7 +252,7 @@ trait PersistenceDocSpec {
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
//#fsm-example
|
//#fsm-example
|
||||||
import akka.actor.FSM
|
import akka.actor.FSM
|
||||||
import akka.persistence.{Persistent, Processor}
|
import akka.persistence.{ Persistent, Processor }
|
||||||
|
|
||||||
class PersistentDoor extends Processor with FSM[String, Int] {
|
class PersistentDoor extends Processor with FSM[String, Int] {
|
||||||
startWith("closed", 0)
|
startWith("closed", 0)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue