=str #16602 Suppress dead letters

* akka 2.3.9
* also added missing `final` to case classes
This commit is contained in:
Patrik Nordwall 2015-01-23 17:18:09 +01:00
parent 84a5505ca9
commit 7cf80ab3f3
28 changed files with 145 additions and 129 deletions

View file

@ -37,7 +37,7 @@ trait ScriptedTest extends Matchers {
} }
} }
case class Script[In, Out]( final case class Script[In, Out](
providedInputs: Vector[In], providedInputs: Vector[In],
expectedOutputs: Vector[Out], expectedOutputs: Vector[Out],
jumps: Vector[Int], jumps: Vector[Int],
@ -129,7 +129,7 @@ trait ScriptedTest extends Matchers {
case _ false // Ignore case _ false // Ignore
} }
val d = downstream.probe.receiveWhile(1.milliseconds) { val d = downstream.probe.receiveWhile(1.milliseconds) {
case OnNext(elem: Out) case OnNext(elem: Out @unchecked)
debugLog(s"operation produces [$elem]") debugLog(s"operation produces [$elem]")
if (outstandingDemand == 0) fail("operation produced while there was no demand") if (outstandingDemand == 0) fail("operation produced while there was no demand")
outstandingDemand -= 1 outstandingDemand -= 1

View file

@ -16,7 +16,7 @@ import akka.actor.Actor
* This mailbox is only used in tests to verify that stream actors are using * This mailbox is only used in tests to verify that stream actors are using
* the dispatcher defined in MaterializerSettings. * the dispatcher defined in MaterializerSettings.
*/ */
private[akka] case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { private[akka] final case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this() def this(settings: ActorSystem.Settings, config: Config) = this()

View file

@ -7,8 +7,8 @@ import akka.actor.ActorSystem
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import akka.stream.impl.{ EmptyPublisher, ErrorPublisher }
import akka.testkit.TestProbe import akka.testkit.TestProbe
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
object StreamTestKit { object StreamTestKit {
@ -35,12 +35,12 @@ object StreamTestKit {
subscriber.onSubscribe(FailedSubscription(subscriber, cause)) subscriber.onSubscribe(FailedSubscription(subscriber, cause))
} }
private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { private final case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription {
override def request(elements: Long): Unit = subscriber.onError(cause) override def request(elements: Long): Unit = subscriber.onError(cause)
override def cancel(): Unit = () override def cancel(): Unit = ()
} }
private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { private final case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription {
override def request(elements: Long): Unit = subscriber.onComplete() override def request(elements: Long): Unit = subscriber.onComplete()
override def cancel(): Unit = () override def cancel(): Unit = ()
} }
@ -60,18 +60,18 @@ object StreamTestKit {
def sendError(cause: Exception): Unit = subscription.sendError(cause) def sendError(cause: Exception): Unit = subscription.sendError(cause)
} }
sealed trait SubscriberEvent sealed trait SubscriberEvent extends DeadLetterSuppression
case class OnSubscribe(subscription: Subscription) extends SubscriberEvent final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
case class OnNext[I](element: I) extends SubscriberEvent final case class OnNext[I](element: I) extends SubscriberEvent
case object OnComplete extends SubscriberEvent final case object OnComplete extends SubscriberEvent
case class OnError(cause: Throwable) extends SubscriberEvent final case class OnError(cause: Throwable) extends SubscriberEvent
sealed trait PublisherEvent sealed trait PublisherEvent extends DeadLetterSuppression
case class Subscribe(subscription: Subscription) extends PublisherEvent final case class Subscribe(subscription: Subscription) extends PublisherEvent
case class CancelSubscription(subscription: Subscription) extends PublisherEvent final case class CancelSubscription(subscription: Subscription) extends PublisherEvent
case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent
case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription { final case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription {
def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements) def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements)
def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this)
@ -116,7 +116,7 @@ object StreamTestKit {
case OnNext(n) true case OnNext(n) true
case OnError(`cause`) true case OnError(`cause`) true
} match { } match {
case OnNext(n: I) Right(n) case OnNext(n: I @unchecked) Right(n)
case OnError(err) Left(err) case OnError(err) Left(err)
} }
} }

View file

@ -14,9 +14,7 @@ abstract class TwoStreamsSetup extends AkkaSpec {
implicit val materializer = FlowMaterializer(settings) implicit val materializer = FlowMaterializer(settings)
case class TE(message: String) extends RuntimeException(message) with NoStackTrace val TestException = new RuntimeException("test") with NoStackTrace
val TestException = TE("test")
type Outputs type Outputs

View file

@ -17,5 +17,5 @@ object FlattenStrategy {
*/ */
def concat[T]: FlattenStrategy[scaladsl.Source[T], T] = Concat[T]() def concat[T]: FlattenStrategy[scaladsl.Source[T], T] = Concat[T]()
private[akka] case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T], T] private[akka] final case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T], T]
} }

View file

@ -4,15 +4,15 @@
package akka.stream package akka.stream
import akka.actor.{ ActorContext, Cancellable } import akka.actor.{ ActorContext, Cancellable }
import scala.collection.{ immutable, mutable } import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
/** /**
* Transformer with support for scheduling keyed (named) timer events. * Transformer with support for scheduling keyed (named) timer events.
*/ */
// TODO: TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410 // TODO: TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410
@deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410") @deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410", "1.0-M1")
private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] { private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] {
import TimerTransformer._ import TimerTransformer._
private val timers = mutable.Map[Any, Timer]() private val timers = mutable.Map[Any, Timer]()
@ -118,14 +118,14 @@ private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T,
* INTERNAL API * INTERNAL API
*/ */
private object TimerTransformer { private object TimerTransformer {
case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression
sealed trait Queued sealed trait Queued
case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued final case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued
case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued
case class QueuedCancelTimer(timerKey: Any) extends Queued final case class QueuedCancelTimer(timerKey: Any) extends Queued
case class Timer(id: Int, task: Cancellable) final case class Timer(id: Int, task: Cancellable)
} }

View file

@ -16,9 +16,9 @@ import akka.actor.Extension
import akka.actor.ExtensionId import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.actor.UntypedActor import akka.actor.UntypedActor
import concurrent.duration.Duration import concurrent.duration.Duration
import concurrent.duration.FiniteDuration import concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
object ActorPublisher { object ActorPublisher {
@ -33,18 +33,18 @@ object ActorPublisher {
* INTERNAL API * INTERNAL API
*/ */
private[akka] object Internal { private[akka] object Internal {
case class Subscribe(subscriber: Subscriber[Any]) final case class Subscribe(subscriber: Subscriber[Any]) extends DeadLetterSuppression
sealed trait LifecycleState sealed trait LifecycleState
case object PreSubscriber extends LifecycleState case object PreSubscriber extends LifecycleState
case object Active extends LifecycleState case object Active extends LifecycleState
case object Canceled extends LifecycleState case object Canceled extends LifecycleState
case object Completed extends LifecycleState case object Completed extends LifecycleState
case class ErrorEmitted(cause: Throwable) extends LifecycleState final case class ErrorEmitted(cause: Throwable) extends LifecycleState
} }
} }
sealed abstract class ActorPublisherMessage sealed abstract class ActorPublisherMessage extends DeadLetterSuppression
object ActorPublisherMessage { object ActorPublisherMessage {
/** /**
@ -52,19 +52,19 @@ object ActorPublisherMessage {
* more elements. * more elements.
* @param n number of requested elements * @param n number of requested elements
*/ */
@SerialVersionUID(1L) case class Request(n: Long) extends ActorPublisherMessage @SerialVersionUID(1L) final case class Request(n: Long) extends ActorPublisherMessage
/** /**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
* subscription. * subscription.
*/ */
@SerialVersionUID(1L) case object Cancel extends ActorPublisherMessage @SerialVersionUID(1L) final case object Cancel extends ActorPublisherMessage
/** /**
* This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout. * This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout.
* Once the actor receives this message, this publisher will already be in cancelled state, thus the actor should clean-up and stop itself. * Once the actor receives this message, this publisher will already be in cancelled state, thus the actor should clean-up and stop itself.
*/ */
@SerialVersionUID(1L) case object SubscriptionTimeoutExceeded extends ActorPublisherMessage @SerialVersionUID(1L) final case object SubscriptionTimeoutExceeded extends ActorPublisherMessage
/** /**
* Java API: get the singleton instance of the `Cancel` message * Java API: get the singleton instance of the `Cancel` message
@ -330,7 +330,7 @@ trait ActorPublisher[T] extends Actor {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] { private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] {
import ActorPublisher._ import ActorPublisher._
import ActorPublisher.Internal._ import ActorPublisher.Internal._
@ -363,7 +363,7 @@ private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState
override def createExtension(system: ExtendedActorSystem): ActorPublisherState = override def createExtension(system: ExtendedActorSystem): ActorPublisherState =
new ActorPublisherState new ActorPublisherState
case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState) final case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState)
} }

View file

@ -14,6 +14,7 @@ import akka.actor.Extension
import akka.actor.ExtensionId import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.actor.UntypedActor import akka.actor.UntypedActor
import akka.actor.DeadLetterSuppression
object ActorSubscriber { object ActorSubscriber {
@ -26,15 +27,16 @@ object ActorSubscriber {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@SerialVersionUID(1L) private[akka] case class OnSubscribe(subscription: Subscription) @SerialVersionUID(1L) private[akka] final case class OnSubscribe(subscription: Subscription)
extends DeadLetterSuppression
} }
sealed abstract class ActorSubscriberMessage sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression
object ActorSubscriberMessage { object ActorSubscriberMessage {
@SerialVersionUID(1L) case class OnNext(element: Any) extends ActorSubscriberMessage @SerialVersionUID(1L) final case class OnNext(element: Any) extends ActorSubscriberMessage
@SerialVersionUID(1L) case class OnError(cause: Throwable) extends ActorSubscriberMessage @SerialVersionUID(1L) final case class OnError(cause: Throwable) extends ActorSubscriberMessage
@SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage @SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage
/** /**
@ -98,7 +100,7 @@ object WatermarkRequestStrategy {
* Requests up to the `highWatermark` when the `remainingRequested` is * Requests up to the `highWatermark` when the `remainingRequested` is
* below the `lowWatermark`. This a good strategy when the actor performs work itself. * below the `lowWatermark`. This a good strategy when the actor performs work itself.
*/ */
case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy {
/** /**
* Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of
@ -290,7 +292,7 @@ private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberSta
override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = override def createExtension(system: ExtendedActorSystem): ActorSubscriberState =
new ActorSubscriberState new ActorSubscriberState
case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean)
} }

View file

@ -159,11 +159,11 @@ private[akka] object Ast {
override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
} }
case class DirectProcessor(p: () Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode { final case class DirectProcessor(p: () Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode {
override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
} }
case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode { final case class DirectProcessorWithKey(p: () (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
} }
@ -206,7 +206,7 @@ private[akka] object Ast {
final case class Zip22With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22](f: Function22[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, Any], attributes: OperationAttributes) extends ZipWith final case class Zip22With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22](f: Function22[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, Any], attributes: OperationAttributes) extends ZipWith
// FIXME Why do we need this? // FIXME Why do we need this?
case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode final case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode
final case class Merge(attributes: OperationAttributes) extends FanInAstNode final case class Merge(attributes: OperationAttributes) extends FanInAstNode
final case class MergePreferred(attributes: OperationAttributes) extends FanInAstNode final case class MergePreferred(attributes: OperationAttributes) extends FanInAstNode
@ -544,7 +544,7 @@ private[akka] class FlowNameCounter extends Extension {
private[akka] object StreamSupervisor { private[akka] object StreamSupervisor {
def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings)) def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings))
case class Materialize(props: Props, name: String) final case class Materialize(props: Props, name: String) extends DeadLetterSuppression
} }
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {

View file

@ -17,7 +17,7 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here? ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here?
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]

View file

@ -8,16 +8,17 @@ import akka.actor.Props
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber }
import org.reactivestreams.{ Subscription, Subscriber } import org.reactivestreams.{ Subscription, Subscriber }
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] object FanIn { private[akka] object FanIn {
case class OnError(id: Int, cause: Throwable) final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression
case class OnComplete(id: Int) final case class OnComplete(id: Int) extends DeadLetterSuppression
case class OnNext(id: Int, e: Any) final case class OnNext(id: Int, e: Any) extends DeadLetterSuppression
case class OnSubscribe(id: Int, subscription: Subscription) final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression
private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(id, cause) override def onError(cause: Throwable): Unit = impl ! OnError(id, cause)

View file

@ -4,22 +4,22 @@
package akka.stream.impl package akka.stream.impl
import scala.collection.immutable import scala.collection.immutable
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Props import akka.actor.Props
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.Subscription import org.reactivestreams.Subscription
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] object FanOut { private[akka] object FanOut {
case class SubstreamRequestMore(id: Int, demand: Long) final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression
case class SubstreamCancel(id: Int) final case class SubstreamCancel(id: Int) extends DeadLetterSuppression
case class SubstreamSubscribePending(id: Int) final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression
class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription { class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription {
override def request(elements: Long): Unit = override def request(elements: Long): Unit =
@ -33,7 +33,7 @@ private[akka] object FanOut {
override def createSubscription(): Subscription = new SubstreamSubscription(actor, id) override def createSubscription(): Subscription = new SubstreamSubscription(actor, id)
} }
case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) final case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) extends DeadLetterSuppression
class OutputBunch(outputCount: Int, impl: ActorRef, pump: Pump) { class OutputBunch(outputCount: Int, impl: ActorRef, pump: Pump) {
private var bunchCancelled = false private var bunchCancelled = false

View file

@ -7,17 +7,16 @@ import scala.concurrent.Future
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
import scala.util.Try import scala.util.Try
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Props import akka.actor.Props
import akka.actor.Status import akka.actor.Status
import akka.actor.SupervisorStrategy import akka.actor.SupervisorStrategy
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import akka.pattern.pipe import akka.pattern.pipe
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription import org.reactivestreams.Subscription
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
@ -27,8 +26,8 @@ private[akka] object FuturePublisher {
Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher) Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher)
object FutureSubscription { object FutureSubscription {
case class Cancel(subscription: FutureSubscription) final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression
case class RequestMore(subscription: FutureSubscription, elements: Long) final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression
} }
class FutureSubscription(ref: ActorRef) extends Subscription { class FutureSubscription(ref: ActorRef) extends Subscription {

View file

@ -27,7 +27,7 @@ private[akka] object IteratorPublisher {
private case object Initialized extends State private case object Initialized extends State
private case object Cancelled extends StopState private case object Cancelled extends StopState
private case object Completed extends StopState private case object Completed extends StopState
private case class Errored(cause: Throwable) extends StopState private final case class Errored(cause: Throwable) extends StopState
} }
/** /**

View file

@ -12,6 +12,7 @@ import akka.stream.MaterializerSettings
import akka.pattern.pipe import akka.pattern.pipe
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.Props import akka.actor.Props
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
@ -29,8 +30,8 @@ private[akka] object MapAsyncProcessorImpl {
} }
} }
case class FutureElement(seqNo: Long, element: Any) final case class FutureElement(seqNo: Long, element: Any) extends DeadLetterSuppression
case class FutureFailure(cause: Throwable) final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression
} }
/** /**

View file

@ -9,6 +9,7 @@ import akka.stream.MaterializerSettings
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import akka.pattern.pipe import akka.pattern.pipe
import akka.actor.Props import akka.actor.Props
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
@ -17,8 +18,8 @@ private[akka] object MapAsyncUnorderedProcessorImpl {
def props(settings: MaterializerSettings, f: Any Future[Any]): Props = def props(settings: MaterializerSettings, f: Any Future[Any]): Props =
Props(new MapAsyncUnorderedProcessorImpl(settings, f)) Props(new MapAsyncUnorderedProcessorImpl(settings, f))
case class FutureElement(element: Any) final case class FutureElement(element: Any) extends DeadLetterSuppression
case class FutureFailure(cause: Throwable) final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression
} }
/** /**

View file

@ -5,21 +5,28 @@ package akka.stream.impl
import language.existentials import language.existentials
import org.reactivestreams.Subscription import org.reactivestreams.Subscription
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case object SubscribePending private[akka] case object SubscribePending extends DeadLetterSuppression
/**
* INTERNAL API /**
*/ * INTERNAL API
private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Long) */
/** private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long)
* INTERNAL API extends DeadLetterSuppression
*/
private[akka] case class Cancel(subscription: ActorSubscription[_]) /**
/** * INTERNAL API
* INTERNAL API */
*/ private[akka] final case class Cancel(subscription: ActorSubscription[_])
private[akka] case class ExposedPublisher(publisher: ActorPublisher[Any]) extends DeadLetterSuppression
/**
* INTERNAL API
*/
private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any])
extends DeadLetterSuppression

View file

@ -6,22 +6,22 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.{ Actor, ActorRef } import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] object MultiStreamOutputProcessor { private[akka] object MultiStreamOutputProcessor {
case class SubstreamKey(id: Long) final case class SubstreamKey(id: Long)
case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression
case class SubstreamCancel(substream: SubstreamKey) final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression
case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression
case class SubstreamSubscriptionTimeout(substream: SubstreamKey) final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements) override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements)
@ -191,7 +191,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
super.pumpFinished() super.pumpFinished()
} }
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement
} }
/** /**
@ -205,10 +205,10 @@ private[akka] object TwoStreamInputProcessor {
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription) override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
} }
case object OtherStreamOnComplete case object OtherStreamOnComplete extends DeadLetterSuppression
case class OtherStreamOnNext(element: Any) final case class OtherStreamOnNext(element: Any) extends DeadLetterSuppression
case class OtherStreamOnSubscribe(subscription: Subscription) final case class OtherStreamOnSubscribe(subscription: Subscription) extends DeadLetterSuppression
case class OtherStreamOnError(ex: Throwable) final case class OtherStreamOnError(ex: Throwable) extends DeadLetterSuppression
} }
/** /**

View file

@ -26,7 +26,7 @@ private[akka] object SubscriberManagement {
def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onComplete() def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onComplete()
} }
case class ErrorCompleted(cause: Throwable) extends EndOfStream { final case class ErrorCompleted(cause: Throwable) extends EndOfStream {
def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause) def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause)
} }

View file

@ -4,14 +4,13 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
@ -22,8 +21,8 @@ private[akka] object TickPublisher {
Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher) Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher)
object TickPublisherSubscription { object TickPublisherSubscription {
case object Cancel case object Cancel extends DeadLetterSuppression
case class RequestMore(elements: Long) final case class RequestMore(elements: Long) extends DeadLetterSuppression
} }
class TickPublisherSubscription(ref: ActorRef) extends Subscription { class TickPublisherSubscription(ref: ActorRef) extends Subscription {

View file

@ -135,7 +135,7 @@ private[akka] object Always extends TransferState {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class TransferPhase(precondition: TransferState)(val action: () Unit) private[akka] final case class TransferPhase(precondition: TransferState)(val action: () Unit)
/** /**
* INTERNAL API * INTERNAL API

View file

@ -386,6 +386,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
case Completing padding + "---|" case Completing padding + "---|"
case Cancelling padding + "|---" case Cancelling padding + "|---"
case Failing(e) padding + s"---X ${e.getMessage}" case Failing(e) padding + s"---X ${e.getMessage}"
case other padding + s"---? $state"
} }
println(icon) println(icon)
} }

View file

@ -36,7 +36,7 @@ private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) ex
override def onPush(elem: In, ctx: Context[Out]): Directive = override def onPush(elem: In, ctx: Context[Out]): Directive =
pf.applyOrElse(elem, NotApplied) match { pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctx.pull() case NotApplied ctx.pull()
case result: Out ctx.push(result) case result: Out @unchecked ctx.push(result)
} }
} }

View file

@ -20,6 +20,7 @@ import akka.stream.scaladsl.StreamTcp
import akka.util.ByteString import akka.util.ByteString
import org.reactivestreams.Processor import org.reactivestreams.Processor
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import akka.actor.DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
@ -28,29 +29,34 @@ private[akka] object StreamTcpManager {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]], private[akka] final case class Connect(
processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress], localAddressPromise: Promise[InetSocketAddress],
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress], localAddress: Option[InetSocketAddress],
options: immutable.Traversable[SocketOption], options: immutable.Traversable[SocketOption],
connectTimeout: Duration, connectTimeout: Duration,
idleTimeout: Duration) idleTimeout: Duration)
extends DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Bind(localAddressPromise: Promise[InetSocketAddress], private[akka] final case class Bind(
localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]], unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection], flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
endpoint: InetSocketAddress, endpoint: InetSocketAddress,
backlog: Int, backlog: Int,
options: immutable.Traversable[SocketOption], options: immutable.Traversable[SocketOption],
idleTimeout: Duration) idleTimeout: Duration)
extends DeadLetterSuppression
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ExposedProcessor(processor: Processor[ByteString, ByteString]) private[akka] final case class ExposedProcessor(processor: Processor[ByteString, ByteString])
extends DeadLetterSuppression
} }

View file

@ -1200,7 +1200,8 @@ class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[Flo
type E = Edge[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex] type E = Edge[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex]
case class Memo(visited: Set[E] = Set.empty, final case class Memo(
visited: Set[E] = Set.empty,
downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty, downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty,
upstreamPublishers: Map[E, Publisher[Any]] = Map.empty, upstreamPublishers: Map[E, Publisher[Any]] = Map.empty,
sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty, sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty,

View file

@ -40,7 +40,7 @@ private[scaladsl] object GraphFlow {
} }
} }
private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out](
inPipe: Pipe[In, CIn], inPipe: Pipe[In, CIn],
in: UndefinedSource[CIn], in: UndefinedSource[CIn],
graph: PartialFlowGraph, graph: PartialFlowGraph,
@ -125,7 +125,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out](
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
} }
private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] {
override type Repr[+O] = GraphSource[COut, O] override type Repr[+O] = GraphSource[COut, O]
private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = { private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = {
@ -173,7 +173,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou
def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr))
} }
private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] {
private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = { private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = {
val nIn = UndefinedSource[CIn] val nIn = UndefinedSource[CIn]

View file

@ -66,7 +66,7 @@ trait Key[M] extends KeyedMaterializable[M] {
def materialize(map: MaterializedMap): MaterializedType def materialize(map: MaterializedMap): MaterializedType
} }
private[stream] case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap { private[stream] final case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap {
private def failure(key: KeyedMaterializable[_]) = { private def failure(key: KeyedMaterializable[_]) = {
val keyType = key match { val keyType = key match {
case _: KeyedSource[_, _] "Source" case _: KeyedSource[_, _] "Source"

View file

@ -10,7 +10,7 @@ import akka.stream.impl.Ast.AstNode
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]]
* materialization. * materialization.
*/ */
case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) { final case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) {
import OperationAttributes._ import OperationAttributes._
@ -59,9 +59,9 @@ case class OperationAttributes private (private val attributes: List[OperationAt
object OperationAttributes { object OperationAttributes {
private[OperationAttributes] trait Attribute private[OperationAttributes] trait Attribute
private[OperationAttributes] case class Name(n: String) extends Attribute private[OperationAttributes] final case class Name(n: String) extends Attribute
private[OperationAttributes] case class InputBuffer(initial: Int, max: Int) extends Attribute private[OperationAttributes] final case class InputBuffer(initial: Int, max: Int) extends Attribute
private[OperationAttributes] case class Dispatcher(dispatcher: String) extends Attribute private[OperationAttributes] final case class Dispatcher(dispatcher: String) extends Attribute
private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes =
apply(List(attribute)) apply(List(attribute))