=str Use SeqActorName in more places
This commit is contained in:
parent
5b3c04572f
commit
df93b2f883
6 changed files with 26 additions and 20 deletions
|
|
@ -41,7 +41,7 @@ private[http] class PoolGateway(hcps: HostConnectionPoolSetup,
|
||||||
private val state = {
|
private val state = {
|
||||||
val shutdownCompletedPromise = Promise[Unit]()
|
val shutdownCompletedPromise = Promise[Unit]()
|
||||||
val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local)
|
val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local)
|
||||||
val ref = system.actorOf(props, PoolInterfaceActor.name.next())
|
val ref = system.actorOf(props, PoolInterfaceActor.names.next())
|
||||||
new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise))
|
new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ private object PoolInterfaceActor {
|
||||||
|
|
||||||
case object Shutdown extends DeadLetterSuppression
|
case object Shutdown extends DeadLetterSuppression
|
||||||
|
|
||||||
val name = new SeqActorName("PoolInterfaceActor")
|
val names = SeqActorName("PoolInterfaceActor")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ private object PoolSlot {
|
||||||
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent
|
final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
private val slotProcessorActorName = new SeqActorName("SlotProcessor")
|
private val slotProcessorActorName = SeqActorName("SlotProcessor")
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Stream Setup
|
Stream Setup
|
||||||
|
|
|
||||||
|
|
@ -59,8 +59,7 @@ object ActorMaterializer {
|
||||||
system.dispatchers,
|
system.dispatchers,
|
||||||
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
|
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
|
||||||
haveShutDown,
|
haveShutDown,
|
||||||
FlowNameCounter(system).counter,
|
FlowNames(system).names.copy(namePrefix))
|
||||||
namePrefix)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
||||||
dispatchers: Dispatchers,
|
dispatchers: Dispatchers,
|
||||||
supervisor: ActorRef,
|
supervisor: ActorRef,
|
||||||
haveShutDown: AtomicBoolean,
|
haveShutDown: AtomicBoolean,
|
||||||
flowNameCounter: AtomicLong,
|
flowNames: SeqActorName) extends ActorMaterializer {
|
||||||
namePrefix: String) extends ActorMaterializer {
|
|
||||||
import akka.stream.impl.Stages._
|
import akka.stream.impl.Stages._
|
||||||
private val logger = Logging.getLogger(system, this)
|
private val logger = Logging.getLogger(system, this)
|
||||||
|
|
||||||
|
|
@ -42,11 +41,9 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
||||||
|
|
||||||
override def isShutdown: Boolean = haveShutDown.get()
|
override def isShutdown: Boolean = haveShutDown.get()
|
||||||
|
|
||||||
override def withNamePrefix(name: String): Materializer = this.copy(namePrefix = name)
|
override def withNamePrefix(name: String): Materializer = this.copy(flowNames = flowNames.copy(name))
|
||||||
|
|
||||||
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() // TODO use SeqActorName instead
|
private[this] def createFlowName(): String = flowNames.next()
|
||||||
|
|
||||||
private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
|
|
||||||
|
|
||||||
private val initialAttributes = Attributes(
|
private val initialAttributes = Attributes(
|
||||||
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
|
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
|
||||||
|
|
@ -195,17 +192,17 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider {
|
private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
|
||||||
override def get(system: ActorSystem): FlowNameCounter = super.get(system)
|
override def get(system: ActorSystem): FlowNames = super.get(system)
|
||||||
override def lookup() = FlowNameCounter
|
override def lookup() = FlowNames
|
||||||
override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter
|
override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class FlowNameCounter extends Extension {
|
private[akka] class FlowNames extends Extension {
|
||||||
val counter = new AtomicLong(0)
|
val names = SeqActorName("Flow")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -215,7 +212,7 @@ private[akka] object StreamSupervisor {
|
||||||
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
||||||
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
||||||
|
|
||||||
private val actorName = new SeqActorName("StreamSupervisor")
|
private val actorName = SeqActorName("StreamSupervisor")
|
||||||
def nextName(): String = actorName.next()
|
def nextName(): String = actorName.next()
|
||||||
|
|
||||||
final case class Materialize(props: Props, name: String)
|
final case class Materialize(props: Props, name: String)
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,16 @@ import java.util.concurrent.atomic.AtomicLong
|
||||||
* Generator of sequentially numbered actor names.
|
* Generator of sequentially numbered actor names.
|
||||||
* Pulled out from HTTP internals, most often used used by streams which materialize actors directly
|
* Pulled out from HTTP internals, most often used used by streams which materialize actors directly
|
||||||
*/
|
*/
|
||||||
private[akka] final class SeqActorName(prefix: String) extends AtomicLong {
|
abstract class SeqActorName {
|
||||||
def next(): String = prefix + '-' + getAndIncrement()
|
def next(): String
|
||||||
|
def copy(name: String): SeqActorName
|
||||||
|
}
|
||||||
|
object SeqActorName {
|
||||||
|
def apply(prefix: String) = new SeqActorNameImpl(prefix, new AtomicLong(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName {
|
||||||
|
def next(): String = prefix + '-' + counter.getAndIncrement()
|
||||||
|
|
||||||
|
def copy(newPrefix: String): SeqActorName = new SeqActorNameImpl(newPrefix, counter)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue