Remove or replace master/slave terminology from tests and API docs (#29252)

* Remove or replace master/slave terminology from tests and API docs
* Replace master where it would potentially imply being master/slave
This commit is contained in:
Johan Andrén 2020-06-18 08:17:46 +02:00 committed by GitHub
parent 0a986bf13d
commit 25ea7b7f5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 105 deletions

View file

@ -149,7 +149,7 @@ class ActorWithStashSpec extends AkkaSpec with DefaultTimeout with BeforeAndAfte
val restartLatch = new TestLatch val restartLatch = new TestLatch
val hasMsgLatch = new TestLatch val hasMsgLatch = new TestLatch
val slaveProps = Props(new Actor with Stash { val employeeProps = Props(new Actor with Stash {
def receive = { def receive = {
case "crash" => case "crash" =>
throw new Exception("Crashing...") throw new Exception("Crashing...")
@ -169,10 +169,10 @@ class ActorWithStashSpec extends AkkaSpec with DefaultTimeout with BeforeAndAfte
super.preRestart(reason, message) super.preRestart(reason, message)
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
slave ! "hello" employee ! "hello"
slave ! "crash" employee ! "crash"
Await.ready(restartLatch, 10 seconds) Await.ready(restartLatch, 10 seconds)
Await.ready(hasMsgLatch, 10 seconds) Await.ready(hasMsgLatch, 10 seconds)

View file

@ -31,7 +31,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
"A RestartStrategy" must { "A RestartStrategy" must {
"ensure that slave stays dead after max restarts within time range" in { "ensure that employee stays dead after max restarts within time range" in {
val boss = system.actorOf( val boss = system.actorOf(
Props( Props(
new Supervisor(OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable]))))) new Supervisor(OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
@ -41,7 +41,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val countDownLatch = new TestLatch(3) val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch val stopLatch = new TestLatch
val slaveProps = Props(new Actor { val employeeProps = Props(new Actor {
def receive = { def receive = {
case Ping => countDownLatch.countDown() case Ping => countDownLatch.countDown()
@ -59,32 +59,32 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open() stopLatch.open()
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
slave ! Ping employee ! Ping
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
// test restart and post restart ping // test restart and post restart ping
Await.ready(restartLatch, 10 seconds) Await.ready(restartLatch, 10 seconds)
// now crash again... should not restart // now crash again... should not restart
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
Await.ready(secondRestartLatch, 10 seconds) Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds) Await.ready(countDownLatch, 10 seconds)
slave ! Crash employee ! Crash
Await.ready(stopLatch, 10 seconds) Await.ready(stopLatch, 10 seconds)
} }
"ensure that slave is immortal without max restarts and time range" in { "ensure that employee is immortal without max restarts and time range" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable]))))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy()(List(classOf[Throwable])))))
val countDownLatch = new TestLatch(100) val countDownLatch = new TestLatch(100)
val slaveProps = Props(new Actor { val employeeProps = Props(new Actor {
def receive = { def receive = {
case Crash => throw new Exception("Crashing...") case Crash => throw new Exception("Crashing...")
@ -94,16 +94,16 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
countDownLatch.countDown() countDownLatch.countDown()
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
(1 to 100).foreach { _ => (1 to 100).foreach { _ =>
slave ! Crash employee ! Crash
} }
Await.ready(countDownLatch, 2 minutes) Await.ready(countDownLatch, 2 minutes)
assert(!slave.isTerminated) assert(!employee.isTerminated)
} }
"ensure that slave restarts after number of crashes not within time range" in { "ensure that employee restarts after number of crashes not within time range" in {
val boss = system.actorOf(Props( val boss = system.actorOf(Props(
new Supervisor(OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable]))))) new Supervisor(OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 500 millis)(List(classOf[Throwable])))))
@ -113,7 +113,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val pingLatch = new TestLatch val pingLatch = new TestLatch
val secondPingLatch = new TestLatch val secondPingLatch = new TestLatch
val slaveProps = Props(new Actor { val employeeProps = Props(new Actor {
def receive = { def receive = {
case Ping => case Ping =>
@ -135,16 +135,16 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
slave ! Ping employee ! Ping
slave ! Crash employee ! Crash
Await.ready(restartLatch, 10 seconds) Await.ready(restartLatch, 10 seconds)
Await.ready(pingLatch, 10 seconds) Await.ready(pingLatch, 10 seconds)
slave ! Ping employee ! Ping
slave ! Crash employee ! Crash
Await.ready(secondRestartLatch, 10 seconds) Await.ready(secondRestartLatch, 10 seconds)
Await.ready(secondPingLatch, 10 seconds) Await.ready(secondPingLatch, 10 seconds)
@ -153,15 +153,15 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
sleep(700L) sleep(700L)
// now crash again... should and post restart ping // now crash again... should and post restart ping
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
Await.ready(thirdRestartLatch, 1 second) Await.ready(thirdRestartLatch, 1 second)
assert(!slave.isTerminated) assert(!employee.isTerminated)
} }
"ensure that slave is not restarted after max retries" in { "ensure that employee is not restarted after max retries" in {
val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable]))))) val boss = system.actorOf(Props(new Supervisor(OneForOneStrategy(maxNrOfRetries = 2)(List(classOf[Throwable])))))
val restartLatch = new TestLatch val restartLatch = new TestLatch
@ -169,7 +169,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val countDownLatch = new TestLatch(3) val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch val stopLatch = new TestLatch
val slaveProps = Props(new Actor { val employeeProps = Props(new Actor {
def receive = { def receive = {
case Ping => countDownLatch.countDown() case Ping => countDownLatch.countDown()
@ -186,33 +186,33 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open() stopLatch.open()
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
slave ! Ping employee ! Ping
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
// test restart and post restart ping // test restart and post restart ping
Await.ready(restartLatch, 10 seconds) Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated) assert(!employee.isTerminated)
// now crash again... should not restart // now crash again... should not restart
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
Await.ready(secondRestartLatch, 10 seconds) Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds) Await.ready(countDownLatch, 10 seconds)
sleep(700L) sleep(700L)
slave ! Crash employee ! Crash
Await.ready(stopLatch, 10 seconds) Await.ready(stopLatch, 10 seconds)
sleep(500L) sleep(500L)
assert(slave.isTerminated) assert(employee.isTerminated)
} }
"ensure that slave is not restarted within time range" in { "ensure that employee is not restarted within time range" in {
val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch
val countDownLatch = new TestLatch(2) val countDownLatch = new TestLatch(2)
@ -224,7 +224,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
} }
})) }))
val slaveProps = Props(new Actor { val employeeProps = Props(new Actor {
def receive = { def receive = {
case Ping => countDownLatch.countDown() case Ping => countDownLatch.countDown()
@ -239,32 +239,32 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
stopLatch.open() stopLatch.open()
} }
}) })
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) val employee = Await.result((boss ? employeeProps).mapTo[ActorRef], timeout.duration)
slave ! Ping employee ! Ping
slave ! Crash employee ! Crash
slave ! Ping employee ! Ping
// test restart and post restart ping // test restart and post restart ping
Await.ready(restartLatch, 10 seconds) Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated) assert(!employee.isTerminated)
// now crash again... should not restart // now crash again... should not restart
slave ! Crash employee ! Crash
// may not be running // may not be running
slave ! Ping employee ! Ping
Await.ready(countDownLatch, 10 seconds) Await.ready(countDownLatch, 10 seconds)
// may not be running // may not be running
slave ! Crash employee ! Crash
Await.ready(stopLatch, 10 seconds) Await.ready(stopLatch, 10 seconds)
Await.ready(maxNoOfRestartsLatch, 10 seconds) Await.ready(maxNoOfRestartsLatch, 10 seconds)
sleep(500L) sleep(500L)
assert(slave.isTerminated) assert(employee.isTerminated)
} }
} }
} }

View file

@ -35,8 +35,7 @@ object Inet {
def beforeConnect(@unused s: Socket): Unit = () def beforeConnect(@unused s: Socket): Unit = ()
/** /**
* Action to be taken for this option after connect returned (i.e. on * Action to be taken for this option after connect returned.
* the slave socket for servers).
*/ */
def afterConnect(@unused s: Socket): Unit = () def afterConnect(@unused s: Socket): Unit = ()
} }
@ -50,20 +49,17 @@ object Inet {
trait SocketOptionV2 extends SocketOption { trait SocketOptionV2 extends SocketOption {
/** /**
* Action to be taken for this option after connect returned (i.e. on * Action to be taken for this option after connect returned.
* the slave socket for servers).
*/ */
def afterBind(@unused s: DatagramSocket): Unit = () def afterBind(@unused s: DatagramSocket): Unit = ()
/** /**
* Action to be taken for this option after connect returned (i.e. on * Action to be taken for this option after connect returned.
* the slave socket for servers).
*/ */
def afterBind(@unused s: ServerSocket): Unit = () def afterBind(@unused s: ServerSocket): Unit = ()
/** /**
* Action to be taken for this option after connect returned (i.e. on * Action to be taken for this option after connect returned.
* the slave socket for servers).
*/ */
def afterConnect(@unused s: DatagramSocket): Unit = () def afterConnect(@unused s: DatagramSocket): Unit = ()

View file

@ -54,9 +54,9 @@ Java
![stream-substream-groupBy2.png](../../images/stream-substream-groupBy2.png) ![stream-substream-groupBy2.png](../../images/stream-substream-groupBy2.png)
Also substreams, more precisely, `SubFlow` and `SubSource` have methods that allow you to Also substreams, more precisely, `SubFlow` and `SubSource` have methods that allow you to
merge or concat substreams into the master stream again. merge or concat substreams into the main stream again.
The `mergeSubstreams` method merges an unbounded number of substreams back to the master stream. The `mergeSubstreams` method merges an unbounded number of substreams back to the main stream.
Scala Scala
: @@snip [SubstreamDocSpec.scala](/akka-docs/src/test/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy3 } : @@snip [SubstreamDocSpec.scala](/akka-docs/src/test/scala/docs/stream/SubstreamDocSpec.scala) { #groupBy3 }

View file

@ -21,8 +21,8 @@ class LookupRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
akka.remote.artery.enabled = $artery akka.remote.artery.enabled = $artery
""")).withFallback(RemotingMultiNodeSpec.commonConfig)) """)).withFallback(RemotingMultiNodeSpec.commonConfig))
val master = role("master") val leader = role("leader")
val slave = role("slave") val follower = role("follower")
} }
@ -49,19 +49,19 @@ abstract class LookupRemoteActorSpec(multiNodeConfig: LookupRemoteActorMultiJvmS
def initialParticipants = 2 def initialParticipants = 2
runOn(master) { runOn(leader) {
system.actorOf(Props[SomeActor](), "service-hello") system.actorOf(Props[SomeActor](), "service-hello")
} }
"Remoting" must { "Remoting" must {
"lookup remote actor" taggedAs LongRunningTest in { "lookup remote actor" taggedAs LongRunningTest in {
runOn(slave) { runOn(follower) {
val hello = { val hello = {
system.actorSelection(node(master) / "user" / "service-hello") ! Identify("id1") system.actorSelection(node(leader) / "user" / "service-hello") ! Identify("id1")
expectMsgType[ActorIdentity].ref.get expectMsgType[ActorIdentity].ref.get
} }
hello.isInstanceOf[RemoteActorRef] should ===(true) hello.isInstanceOf[RemoteActorRef] should ===(true)
val masterAddress = testConductor.getAddressFor(master).await val masterAddress = testConductor.getAddressFor(leader).await
(hello ? "identify").await.asInstanceOf[ActorRef].path.address should ===(masterAddress) (hello ? "identify").await.asInstanceOf[ActorRef].path.address should ===(masterAddress)
} }
enterBarrier("done") enterBarrier("done")

View file

@ -24,16 +24,16 @@ class NewRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig {
akka.remote.use-unsafe-remote-features-outside-cluster = on akka.remote.use-unsafe-remote-features-outside-cluster = on
""").withFallback(RemotingMultiNodeSpec.commonConfig))) """).withFallback(RemotingMultiNodeSpec.commonConfig)))
val master = role("master") val leader = role("leader")
val slave = role("slave") val follower = role("follower")
deployOn(master, """ deployOn(leader, """
/service-hello.remote = "@slave@" /service-hello.remote = "@follower@"
/service-hello-null.remote = "@slave@" /service-hello-null.remote = "@follower@"
/service-hello3.remote = "@slave@" /service-hello3.remote = "@follower@"
""") """)
deployOnAll("""/service-hello2.remote = "@slave@" """) deployOnAll("""/service-hello2.remote = "@follower@" """)
} }
class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = false)) class NewRemoteActorMultiJvmNode1 extends NewRemoteActorSpec(new NewRemoteActorMultiJvmSpec(artery = false))
@ -69,14 +69,14 @@ abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec)
"A new remote actor" must { "A new remote actor" must {
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" in {
runOn(master) { runOn(leader) {
val actor = system.actorOf(Props[SomeActor](), "service-hello") val actor = system.actorOf(Props[SomeActor](), "service-hello")
actor.isInstanceOf[RemoteActorRef] should ===(true) actor.isInstanceOf[RemoteActorRef] should ===(true)
actor.path.address should ===(node(slave).address) actor.path.address should ===(node(follower).address)
val slaveAddress = testConductor.getAddressFor(slave).await val followerAddress = testConductor.getAddressFor(follower).await
actor ! "identify" actor ! "identify"
expectMsgType[ActorRef].path.address should ===(slaveAddress) expectMsgType[ActorRef].path.address should ===(followerAddress)
} }
enterBarrier("done") enterBarrier("done")
@ -84,14 +84,14 @@ abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec)
"be locally instantiated on a remote node (with null parameter) and be able to communicate through its RemoteActorRef" in { "be locally instantiated on a remote node (with null parameter) and be able to communicate through its RemoteActorRef" in {
runOn(master) { runOn(leader) {
val actor = system.actorOf(Props(classOf[SomeActorWithParam], null), "service-hello-null") val actor = system.actorOf(Props(classOf[SomeActorWithParam], null), "service-hello-null")
actor.isInstanceOf[RemoteActorRef] should ===(true) actor.isInstanceOf[RemoteActorRef] should ===(true)
actor.path.address should ===(node(slave).address) actor.path.address should ===(node(follower).address)
val slaveAddress = testConductor.getAddressFor(slave).await val followerAddress = testConductor.getAddressFor(follower).await
actor ! "identify" actor ! "identify"
expectMsgType[ActorRef].path.address should ===(slaveAddress) expectMsgType[ActorRef].path.address should ===(followerAddress)
} }
enterBarrier("done") enterBarrier("done")
@ -99,24 +99,24 @@ abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec)
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" in {
runOn(master) { runOn(leader) {
val actor = system.actorOf(Props[SomeActor](), "service-hello2") val actor = system.actorOf(Props[SomeActor](), "service-hello2")
actor.isInstanceOf[RemoteActorRef] should ===(true) actor.isInstanceOf[RemoteActorRef] should ===(true)
actor.path.address should ===(node(slave).address) actor.path.address should ===(node(follower).address)
val slaveAddress = testConductor.getAddressFor(slave).await val followerAddress = testConductor.getAddressFor(follower).await
actor ! "identify" actor ! "identify"
expectMsgType[ActorRef].path.address should ===(slaveAddress) expectMsgType[ActorRef].path.address should ===(followerAddress)
} }
enterBarrier("done") enterBarrier("done")
} }
"be able to shutdown system when using remote deployed actor" in within(20 seconds) { "be able to shutdown system when using remote deployed actor" in within(20 seconds) {
runOn(master) { runOn(leader) {
val actor = system.actorOf(Props[SomeActor](), "service-hello3") val actor = system.actorOf(Props[SomeActor](), "service-hello3")
actor.isInstanceOf[RemoteActorRef] should ===(true) actor.isInstanceOf[RemoteActorRef] should ===(true)
actor.path.address should ===(node(slave).address) actor.path.address should ===(node(follower).address)
// This watch is in race with the shutdown of the watched system. This race should remain, as the test should // This watch is in race with the shutdown of the watched system. This race should remain, as the test should
// handle both cases: // handle both cases:
// - remote system receives watch, replies with DeathWatchNotification // - remote system receives watch, replies with DeathWatchNotification
@ -126,12 +126,12 @@ abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec)
enterBarrier("deployed") enterBarrier("deployed")
// master system is supposed to be shutdown after slave // master system is supposed to be shutdown after follower
// this should be triggered by slave system.terminate // this should be triggered by follower system.terminate
expectMsgPF() { case Terminated(`actor`) => true } expectMsgPF() { case Terminated(`actor`) => true }
} }
runOn(slave) { runOn(follower) {
enterBarrier("deployed") enterBarrier("deployed")
} }

View file

@ -20,8 +20,8 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig {
akka.remote.artery.enabled = false akka.remote.artery.enabled = false
""")).withFallback(RemotingMultiNodeSpec.commonConfig)) """)).withFallback(RemotingMultiNodeSpec.commonConfig))
val master = role("master") val leader = role("leader")
val slave = role("slave") val follower = role("follower")
testTransport(on = true) testTransport(on = true)
} }
@ -36,14 +36,14 @@ class TestConductorSpec extends RemotingMultiNodeSpec(TestConductorMultiJvmSpec)
def initialParticipants = 2 def initialParticipants = 2
lazy val echo = { lazy val echo = {
system.actorSelection(node(master) / "user" / "echo") ! Identify(None) system.actorSelection(node(leader) / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get expectMsgType[ActorIdentity].ref.get
} }
"A TestConductor" must { "A TestConductor" must {
"enter a barrier" taggedAs LongRunningTest in { "enter a barrier" taggedAs LongRunningTest in {
runOn(master) { runOn(leader) {
system.actorOf(Props(new Actor { system.actorOf(Props(new Actor {
def receive = { def receive = {
case x => testActor ! x; sender() ! x case x => testActor ! x; sender() ! x
@ -56,20 +56,20 @@ class TestConductorSpec extends RemotingMultiNodeSpec(TestConductorMultiJvmSpec)
"support throttling of network connections" taggedAs LongRunningTest in { "support throttling of network connections" taggedAs LongRunningTest in {
runOn(slave) { runOn(follower) {
// start remote network connection so that it can be throttled // start remote network connection so that it can be throttled
echo ! "start" echo ! "start"
} }
expectMsg("start") expectMsg("start")
runOn(master) { runOn(leader) {
testConductor.throttle(slave, master, Direction.Send, rateMBit = 0.01).await testConductor.throttle(follower, leader, Direction.Send, rateMBit = 0.01).await
} }
enterBarrier("throttled_send") enterBarrier("throttled_send")
runOn(slave) { runOn(follower) {
for (i <- 0 to 9) echo ! i for (i <- 0 to 9) echo ! i
} }
@ -80,19 +80,19 @@ class TestConductorSpec extends RemotingMultiNodeSpec(TestConductorMultiJvmSpec)
enterBarrier("throttled_send2") enterBarrier("throttled_send2")
runOn(master) { runOn(leader) {
testConductor.throttle(slave, master, Direction.Send, -1).await testConductor.throttle(follower, leader, Direction.Send, -1).await
testConductor.throttle(slave, master, Direction.Receive, rateMBit = 0.01).await testConductor.throttle(follower, leader, Direction.Receive, rateMBit = 0.01).await
} }
enterBarrier("throttled_recv") enterBarrier("throttled_recv")
runOn(slave) { runOn(follower) {
for (i <- 10 to 19) echo ! i for (i <- 10 to 19) echo ! i
} }
val (min, max) = val (min, max) =
if (isNode(master)) (0 seconds, 500 millis) if (isNode(leader)) (0 seconds, 500 millis)
else (0.3 seconds, 3 seconds) else (0.3 seconds, 3 seconds)
within(min, max) { within(min, max) {
@ -102,8 +102,8 @@ class TestConductorSpec extends RemotingMultiNodeSpec(TestConductorMultiJvmSpec)
enterBarrier("throttled_recv2") enterBarrier("throttled_recv2")
runOn(master) { runOn(leader) {
testConductor.throttle(slave, master, Direction.Receive, -1).await testConductor.throttle(follower, leader, Direction.Receive, -1).await
} }
enterBarrier("after") enterBarrier("after")