Internal stable markers

A marker on a class means that its name shouldn't change, if any of its
methods should not change they will be marked as well
This commit is contained in:
Christopher Batey 2019-05-31 14:15:09 +01:00
parent eca459e461
commit e7e628c831
8 changed files with 30 additions and 4 deletions

View file

@ -18,7 +18,7 @@ import akka.dispatch.sysmsg._
import akka.event.Logging.{ Debug, Error, LogEvent } import akka.event.Logging.{ Debug, Error, LogEvent }
import akka.japi.Procedure import akka.japi.Procedure
import akka.util.{ unused, Reflect } import akka.util.{ unused, Reflect }
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
/** /**
@ -312,6 +312,7 @@ private[akka] trait Cell {
* schedule the actor to run, depending on which type of cell it is. * schedule the actor to run, depending on which type of cell it is.
* Is only allowed to throw Fatal Throwables. * Is only allowed to throw Fatal Throwables.
*/ */
@InternalStableApi
def sendMessage(msg: Envelope): Unit def sendMessage(msg: Envelope): Unit
/** /**

View file

@ -8,6 +8,7 @@ import java.util.concurrent._
import java.{ util => ju } import java.{ util => ju }
import akka.actor._ import akka.actor._
import akka.annotation.InternalStableApi
import akka.dispatch.affinity.AffinityPoolConfigurator import akka.dispatch.affinity.AffinityPoolConfigurator
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.event.EventStream import akka.event.EventStream
@ -315,6 +316,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
* *
* INTERNAL API * INTERNAL API
*/ */
@InternalStableApi
protected[akka] def shutdown(): Unit protected[akka] def shutdown(): Unit
} }

View file

@ -10,6 +10,7 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.{ Comparator, Deque, PriorityQueue, Queue } import java.util.{ Comparator, Deque, PriorityQueue, Queue }
import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef } import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef }
import akka.annotation.InternalStableApi
import akka.dispatch.sysmsg._ import akka.dispatch.sysmsg._
import akka.event.Logging.Error import akka.event.Logging.Error
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
@ -448,6 +449,7 @@ private[akka] trait SystemMessageQueue {
/** /**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/ */
@InternalStableApi
def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit
/** /**

View file

@ -7,6 +7,7 @@ package akka.dispatch.sysmsg
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.{ ActorInitializationException, ActorRef, InternalActorRef, PossiblyHarmful } import akka.actor.{ ActorInitializationException, ActorRef, InternalActorRef, PossiblyHarmful }
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.annotation.InternalStableApi
/** /**
* INTERNAL API * INTERNAL API
@ -190,6 +191,7 @@ private[akka] class EarliestFirstSystemMessageList(val head: SystemMessage) exte
* *
* <b>NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS</b> * <b>NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS</b>
*/ */
@InternalStableApi
private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable { private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable {
// Next fields are only modifiable via the SystemMessageList value class // Next fields are only modifiable via the SystemMessageList value class
@transient @transient

View file

@ -31,6 +31,7 @@ import akka.util.OptionVal
import scala.collection.immutable import scala.collection.immutable
import akka.actor.ActorInitializationException import akka.actor.ActorInitializationException
import akka.annotation.InternalStableApi
import akka.util.ccompat._ import akka.util.ccompat._
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
@ -291,6 +292,7 @@ private[remote] object EndpointManager {
final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
case object StartupFinished extends RemotingCommand case object StartupFinished extends RemotingCommand
case object ShutdownAndFlush extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand
@InternalStableApi
final case class Send( final case class Send(
message: Any, message: Any,
senderOption: OptionVal[ActorRef], senderOption: OptionVal[ActorRef],

View file

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed import akka.NotUsed
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem, PoisonPill } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem, PoisonPill }
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi, InternalStableApi }
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
@ -429,6 +429,7 @@ private final case class SavedIslandData(
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, defaultAttributes) materialize(_runnableGraph, defaultAttributes)
@InternalStableApi
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat = override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat =
materialize( materialize(
_runnableGraph, _runnableGraph,
@ -612,14 +613,19 @@ private final case class SavedIslandData(
def name: String def name: String
@InternalStableApi
def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (M, Any) def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (M, Any)
@InternalStableApi
def assignPort(in: InPort, slot: Int, logic: M): Unit def assignPort(in: InPort, slot: Int, logic: M): Unit
@InternalStableApi
def assignPort(out: OutPort, slot: Int, logic: M): Unit def assignPort(out: OutPort, slot: Int, logic: M): Unit
@InternalStableApi
def createPublisher(out: OutPort, logic: M): Publisher[Any] def createPublisher(out: OutPort, logic: M): Publisher[Any]
@InternalStableApi
def takePublisher(slot: Int, publisher: Publisher[Any]): Unit def takePublisher(slot: Int, publisher: Publisher[Any]): Unit
def onIslandReady(): Unit def onIslandReady(): Unit

View file

@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.Done import akka.Done
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging import akka.event.Logging
import akka.stream._ import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.ReactiveStreamsCompliance._
@ -55,6 +55,7 @@ import scala.util.control.NonFatal
def props(shell: GraphInterpreterShell): Props = def props(shell: GraphInterpreterShell): Props =
Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local) Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local)
@InternalStableApi
class BatchingActorInputBoundary( class BatchingActorInputBoundary(
size: Int, size: Int,
shell: GraphInterpreterShell, shell: GraphInterpreterShell,
@ -144,6 +145,7 @@ import scala.util.control.NonFatal
}) })
} }
@InternalStableApi
private def dequeue(): Any = { private def dequeue(): Any = {
val elem = inputBuffer(nextInputElementCursor) val elem = inputBuffer(nextInputElementCursor)
if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null") if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null")
@ -160,6 +162,7 @@ import scala.util.control.NonFatal
elem elem
} }
@InternalStableApi
private def clear(): Unit = { private def clear(): Unit = {
java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0 inputBufferElements = 0
@ -174,6 +177,7 @@ import scala.util.control.NonFatal
} }
} }
@InternalStableApi
def onNext(elem: Any): Unit = { def onNext(elem: Any): Unit = {
if (!upstreamCompleted) { if (!upstreamCompleted) {
if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun") if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun")

View file

@ -11,7 +11,7 @@ import akka.stream._
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.Done import akka.Done
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -78,6 +78,7 @@ import akka.stream.snapshot._
* @param inHandler The handler that contains the callback for input events. * @param inHandler The handler that contains the callback for input events.
* @param outHandler The handler that contains the callback for output events. * @param outHandler The handler that contains the callback for output events.
*/ */
@InternalStableApi
final class Connection( final class Connection(
var id: Int, var id: Int,
var inOwner: GraphStageLogic, var inOwner: GraphStageLogic,
@ -467,6 +468,7 @@ import akka.stream.snapshot._
} }
// Decodes and processes a single event for the given connection // Decodes and processes a single event for the given connection
@InternalStableApi
private def processEvent(connection: Connection): Unit = { private def processEvent(connection: Connection): Unit = {
// this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage
@ -514,6 +516,7 @@ import akka.stream.snapshot._
} }
} }
@InternalStableApi
private def processPush(connection: Connection): Unit = { private def processPush(connection: Connection): Unit = {
if (Debug) if (Debug)
println( println(
@ -523,6 +526,7 @@ import akka.stream.snapshot._
connection.inHandler.onPush() connection.inHandler.onPush()
} }
@InternalStableApi
private def processPull(connection: Connection): Unit = { private def processPull(connection: Connection): Unit = {
if (Debug) if (Debug)
println( println(
@ -574,6 +578,7 @@ import akka.stream.snapshot._
if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag
else shutdownCounter(logic.stageId) &= KeepGoingMask else shutdownCounter(logic.stageId) &= KeepGoingMask
@InternalStableApi
private[stream] def finalizeStage(logic: GraphStageLogic): Unit = { private[stream] def finalizeStage(logic: GraphStageLogic): Unit = {
try { try {
logic.postStop() logic.postStop()
@ -612,6 +617,7 @@ import akka.stream.snapshot._
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
} }
@InternalStableApi
private[stream] def fail(connection: Connection, ex: Throwable): Unit = { private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
val currentState = connection.portState val currentState = connection.portState
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
@ -630,6 +636,7 @@ import akka.stream.snapshot._
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
} }
@InternalStableApi
private[stream] def cancel(connection: Connection): Unit = { private[stream] def cancel(connection: Connection): Unit = {
val currentState = connection.portState val currentState = connection.portState
if (Debug) println(s"$Name cancel($connection) [$currentState]") if (Debug) println(s"$Name cancel($connection) [$currentState]")