diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala index 470a685d1e..0dca57656d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolGateway.scala @@ -41,7 +41,7 @@ private[http] class PoolGateway(hcps: HostConnectionPoolSetup, private val state = { val shutdownCompletedPromise = Promise[Unit]() val props = Props(new PoolInterfaceActor(hcps, shutdownCompletedPromise, this)).withDeploy(Deploy.local) - val ref = system.actorOf(props, PoolInterfaceActor.name.next()) + val ref = system.actorOf(props, PoolInterfaceActor.names.next()) new AtomicReference[State](Running(ref, _shutdownStartedPromise, shutdownCompletedPromise)) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 5768324378..f4babf1915 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -27,7 +27,7 @@ private object PoolInterfaceActor { case object Shutdown extends DeadLetterSuppression - val name = new SeqActorName("PoolInterfaceActor") + val names = SeqActorName("PoolInterfaceActor") } /** diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 01c3584c5e..ce5b7ab76c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -35,7 +35,7 @@ private object PoolSlot { final case class Disconnected(slotIx: Int, failedRequests: Int) extends SlotEvent } - private val slotProcessorActorName = new SeqActorName("SlotProcessor") + private val slotProcessorActorName = SeqActorName("SlotProcessor") /* Stream Setup diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index fcef4e3243..12664eb557 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -59,8 +59,7 @@ object ActorMaterializer { system.dispatchers, context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), haveShutDown, - FlowNameCounter(system).counter, - namePrefix) + FlowNames(system).names.copy(namePrefix)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 037c488f31..7452eefa36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -27,8 +27,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, dispatchers: Dispatchers, supervisor: ActorRef, haveShutDown: AtomicBoolean, - flowNameCounter: AtomicLong, - namePrefix: String) extends ActorMaterializer { + flowNames: SeqActorName) extends ActorMaterializer { import akka.stream.impl.Stages._ private val logger = Logging.getLogger(system, this) @@ -42,11 +41,9 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, override def isShutdown: Boolean = haveShutDown.get() - override def withNamePrefix(name: String): Materializer = this.copy(namePrefix = name) + override def withNamePrefix(name: String): Materializer = this.copy(flowNames = flowNames.copy(name)) - private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() // TODO use SeqActorName instead - - private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + private[this] def createFlowName(): String = flowNames.next() private val initialAttributes = Attributes( Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) :: @@ -195,17 +192,17 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, /** * INTERNAL API */ -private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { - override def get(system: ActorSystem): FlowNameCounter = super.get(system) - override def lookup() = FlowNameCounter - override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter +private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { + override def get(system: ActorSystem): FlowNames = super.get(system) + override def lookup() = FlowNames + override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames } /** * INTERNAL API */ -private[akka] class FlowNameCounter extends Extension { - val counter = new AtomicLong(0) +private[akka] class FlowNames extends Extension { + val names = SeqActorName("Flow") } /** @@ -215,7 +212,7 @@ private[akka] object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) - private val actorName = new SeqActorName("StreamSupervisor") + private val actorName = SeqActorName("StreamSupervisor") def nextName(): String = actorName.next() final case class Materialize(props: Props, name: String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala index f88949c1a2..b088e0b95b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala @@ -13,6 +13,16 @@ import java.util.concurrent.atomic.AtomicLong * Generator of sequentially numbered actor names. * Pulled out from HTTP internals, most often used used by streams which materialize actors directly */ -private[akka] final class SeqActorName(prefix: String) extends AtomicLong { - def next(): String = prefix + '-' + getAndIncrement() +abstract class SeqActorName { + def next(): String + def copy(name: String): SeqActorName +} +object SeqActorName { + def apply(prefix: String) = new SeqActorNameImpl(prefix, new AtomicLong(0)) +} + +private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName { + def next(): String = prefix + '-' + counter.getAndIncrement() + + def copy(newPrefix: String): SeqActorName = new SeqActorNameImpl(newPrefix, counter) }