diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 0d11d89817..347af6b5e7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -18,7 +18,7 @@ import akka.dispatch.sysmsg._ import akka.event.Logging.{ Debug, Error, LogEvent } import akka.japi.Procedure import akka.util.{ unused, Reflect } -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import com.github.ghik.silencer.silent /** @@ -312,6 +312,7 @@ private[akka] trait Cell { * schedule the actor to run, depending on which type of cell it is. * Is only allowed to throw Fatal Throwables. */ + @InternalStableApi def sendMessage(msg: Envelope): Unit /** diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 4ccfb24aad..6a2c34657b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -8,6 +8,7 @@ import java.util.concurrent._ import java.{ util => ju } import akka.actor._ +import akka.annotation.InternalStableApi import akka.dispatch.affinity.AffinityPoolConfigurator import akka.dispatch.sysmsg._ import akka.event.EventStream @@ -315,6 +316,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator * * INTERNAL API */ + @InternalStableApi protected[akka] def shutdown(): Unit } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 431f46e780..73682f1e55 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -10,6 +10,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.{ Comparator, Deque, PriorityQueue, Queue } import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef } +import akka.annotation.InternalStableApi import akka.dispatch.sysmsg._ import akka.event.Logging.Error import akka.util.Helpers.ConfigOps @@ -448,6 +449,7 @@ private[akka] trait SystemMessageQueue { /** * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. */ + @InternalStableApi def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit /** diff --git a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala index 991fd30511..05f50af8c7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala +++ b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala @@ -7,6 +7,7 @@ package akka.dispatch.sysmsg import scala.annotation.tailrec import akka.actor.{ ActorInitializationException, ActorRef, InternalActorRef, PossiblyHarmful } import akka.actor.DeadLetterSuppression +import akka.annotation.InternalStableApi /** * INTERNAL API @@ -190,6 +191,7 @@ private[akka] class EarliestFirstSystemMessageList(val head: SystemMessage) exte * * NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS */ +@InternalStableApi private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable { // Next fields are only modifiable via the SystemMessageList value class @transient diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 15afcd2053..92c9706577 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -31,6 +31,7 @@ import akka.util.OptionVal import scala.collection.immutable import akka.actor.ActorInitializationException +import akka.annotation.InternalStableApi import akka.util.ccompat._ import com.github.ghik.silencer.silent @@ -291,6 +292,7 @@ private[remote] object EndpointManager { final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand case object StartupFinished extends RemotingCommand case object ShutdownAndFlush extends RemotingCommand + @InternalStableApi final case class Send( message: Any, senderOption: OptionVal[ActorRef], diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 5d5df9fcab..4bdf8c8bb0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem, PoisonPill } -import akka.annotation.{ DoNotInherit, InternalApi } +import akka.annotation.{ DoNotInherit, InternalApi, InternalStableApi } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } import akka.stream.Attributes.InputBuffer @@ -429,6 +429,7 @@ private final case class SavedIslandData( override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = materialize(_runnableGraph, defaultAttributes) + @InternalStableApi override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat = materialize( _runnableGraph, @@ -612,14 +613,19 @@ private final case class SavedIslandData( def name: String + @InternalStableApi def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (M, Any) + @InternalStableApi def assignPort(in: InPort, slot: Int, logic: M): Unit + @InternalStableApi def assignPort(out: OutPort, slot: Int, logic: M): Unit + @InternalStableApi def createPublisher(out: OutPort, logic: M): Publisher[Any] + @InternalStableApi def takePublisher(slot: Int, publisher: Publisher[Any]): Unit def onIslandReady(): Unit diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 1924b98930..393fb884b3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.Done import akka.actor._ -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.event.Logging import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ @@ -55,6 +55,7 @@ import scala.util.control.NonFatal def props(shell: GraphInterpreterShell): Props = Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local) + @InternalStableApi class BatchingActorInputBoundary( size: Int, shell: GraphInterpreterShell, @@ -144,6 +145,7 @@ import scala.util.control.NonFatal }) } + @InternalStableApi private def dequeue(): Any = { val elem = inputBuffer(nextInputElementCursor) if (elem eq null) throw new IllegalArgumentException("Internal queue must never contain a null") @@ -160,6 +162,7 @@ import scala.util.control.NonFatal elem } + @InternalStableApi private def clear(): Unit = { java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 @@ -174,6 +177,7 @@ import scala.util.control.NonFatal } } + @InternalStableApi def onNext(elem: Any): Unit = { if (!upstreamCompleted) { if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index e859aca9a1..75342a7a35 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -11,7 +11,7 @@ import akka.stream._ import java.util.concurrent.ThreadLocalRandom import akka.Done -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import scala.concurrent.Promise import scala.util.control.NonFatal @@ -78,6 +78,7 @@ import akka.stream.snapshot._ * @param inHandler The handler that contains the callback for input events. * @param outHandler The handler that contains the callback for output events. */ + @InternalStableApi final class Connection( var id: Int, var inOwner: GraphStageLogic, @@ -467,6 +468,7 @@ import akka.stream.snapshot._ } // Decodes and processes a single event for the given connection + @InternalStableApi private def processEvent(connection: Connection): Unit = { // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage @@ -514,6 +516,7 @@ import akka.stream.snapshot._ } } + @InternalStableApi private def processPush(connection: Connection): Unit = { if (Debug) println( @@ -523,6 +526,7 @@ import akka.stream.snapshot._ connection.inHandler.onPush() } + @InternalStableApi private def processPull(connection: Connection): Unit = { if (Debug) println( @@ -574,6 +578,7 @@ import akka.stream.snapshot._ if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag else shutdownCounter(logic.stageId) &= KeepGoingMask + @InternalStableApi private[stream] def finalizeStage(logic: GraphStageLogic): Unit = { try { logic.postStop() @@ -612,6 +617,7 @@ import akka.stream.snapshot._ if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } + @InternalStableApi private[stream] def fail(connection: Connection, ex: Throwable): Unit = { val currentState = connection.portState if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") @@ -630,6 +636,7 @@ import akka.stream.snapshot._ if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } + @InternalStableApi private[stream] def cancel(connection: Connection): Unit = { val currentState = connection.portState if (Debug) println(s"$Name cancel($connection) [$currentState]")