diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 7ca842da18..f6c5ca182c 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -37,7 +37,7 @@ trait ScriptedTest extends Matchers { } } - case class Script[In, Out]( + final case class Script[In, Out]( providedInputs: Vector[In], expectedOutputs: Vector[Out], jumps: Vector[Int], @@ -129,7 +129,7 @@ trait ScriptedTest extends Matchers { case _ ⇒ false // Ignore } val d = downstream.probe.receiveWhile(1.milliseconds) { - case OnNext(elem: Out) ⇒ + case OnNext(elem: Out @unchecked) ⇒ debugLog(s"operation produces [$elem]") if (outstandingDemand == 0) fail("operation produced while there was no demand") outstandingDemand -= 1 diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala index fb66ea5e4c..2589a57830 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -16,7 +16,7 @@ import akka.actor.Actor * This mailbox is only used in tests to verify that stream actors are using * the dispatcher defined in MaterializerSettings. */ -private[akka] case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { +private[akka] final case class StreamTestDefaultMailbox() extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { def this(settings: ActorSystem.Settings, config: Config) = this() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 9ad1daca25..3209c2d6ec 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -7,8 +7,8 @@ import akka.actor.ActorSystem import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber, Subscription } - import scala.concurrent.duration.FiniteDuration +import akka.actor.DeadLetterSuppression object StreamTestKit { @@ -35,12 +35,12 @@ object StreamTestKit { subscriber.onSubscribe(FailedSubscription(subscriber, cause)) } - private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { + private final case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { override def request(elements: Long): Unit = subscriber.onError(cause) override def cancel(): Unit = () } - private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { + private final case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { override def request(elements: Long): Unit = subscriber.onComplete() override def cancel(): Unit = () } @@ -60,18 +60,18 @@ object StreamTestKit { def sendError(cause: Exception): Unit = subscription.sendError(cause) } - sealed trait SubscriberEvent - case class OnSubscribe(subscription: Subscription) extends SubscriberEvent - case class OnNext[I](element: I) extends SubscriberEvent - case object OnComplete extends SubscriberEvent - case class OnError(cause: Throwable) extends SubscriberEvent + sealed trait SubscriberEvent extends DeadLetterSuppression + final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent + final case class OnNext[I](element: I) extends SubscriberEvent + final case object OnComplete extends SubscriberEvent + final case class OnError(cause: Throwable) extends SubscriberEvent - sealed trait PublisherEvent - case class Subscribe(subscription: Subscription) extends PublisherEvent - case class CancelSubscription(subscription: Subscription) extends PublisherEvent - case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent + sealed trait PublisherEvent extends DeadLetterSuppression + final case class Subscribe(subscription: Subscription) extends PublisherEvent + final case class CancelSubscription(subscription: Subscription) extends PublisherEvent + final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent - case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription { + final case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription { def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements) def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) @@ -116,8 +116,8 @@ object StreamTestKit { case OnNext(n) ⇒ true case OnError(`cause`) ⇒ true } match { - case OnNext(n: I) ⇒ Right(n) - case OnError(err) ⇒ Left(err) + case OnNext(n: I @unchecked) ⇒ Right(n) + case OnError(err) ⇒ Left(err) } } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index f2d4619f26..5bfda96307 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -14,9 +14,7 @@ abstract class TwoStreamsSetup extends AkkaSpec { implicit val materializer = FlowMaterializer(settings) - case class TE(message: String) extends RuntimeException(message) with NoStackTrace - - val TestException = TE("test") + val TestException = new RuntimeException("test") with NoStackTrace type Outputs diff --git a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala index 41862c6339..356286a243 100644 --- a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala @@ -17,5 +17,5 @@ object FlattenStrategy { */ def concat[T]: FlattenStrategy[scaladsl.Source[T], T] = Concat[T]() - private[akka] case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T], T] + private[akka] final case class Concat[T]() extends FlattenStrategy[scaladsl.Source[T], T] } diff --git a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala index 0614a330ab..c87e8addde 100644 --- a/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala +++ b/akka-stream/src/main/scala/akka/stream/TimerTransformer.scala @@ -4,15 +4,15 @@ package akka.stream import akka.actor.{ ActorContext, Cancellable } - import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration +import akka.actor.DeadLetterSuppression /** * Transformer with support for scheduling keyed (named) timer events. */ // TODO: TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410 -@deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410") +@deprecated("TimerTransformer is meant to be replaced; See https://github.com/akka/akka/issues/16410", "1.0-M1") private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, U] { import TimerTransformer._ private val timers = mutable.Map[Any, Timer]() @@ -118,14 +118,14 @@ private[akka] abstract class TimerTransformer[-T, +U] extends TransformerLike[T, * INTERNAL API */ private object TimerTransformer { - case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) + final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression sealed trait Queued - case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued - case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued - case class QueuedCancelTimer(timerKey: Any) extends Queued + final case class QueuedSchedule(timerKey: Any, interval: FiniteDuration) extends Queued + final case class QueuedScheduleOnce(timerKey: Any, delay: FiniteDuration) extends Queued + final case class QueuedCancelTimer(timerKey: Any) extends Queued - case class Timer(id: Int, task: Cancellable) + final case class Timer(id: Int, task: Cancellable) } diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index b9fda1f651..f0aa11b09a 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -16,9 +16,9 @@ import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.UntypedActor - import concurrent.duration.Duration import concurrent.duration.FiniteDuration +import akka.actor.DeadLetterSuppression object ActorPublisher { @@ -33,18 +33,18 @@ object ActorPublisher { * INTERNAL API */ private[akka] object Internal { - case class Subscribe(subscriber: Subscriber[Any]) + final case class Subscribe(subscriber: Subscriber[Any]) extends DeadLetterSuppression sealed trait LifecycleState case object PreSubscriber extends LifecycleState case object Active extends LifecycleState case object Canceled extends LifecycleState case object Completed extends LifecycleState - case class ErrorEmitted(cause: Throwable) extends LifecycleState + final case class ErrorEmitted(cause: Throwable) extends LifecycleState } } -sealed abstract class ActorPublisherMessage +sealed abstract class ActorPublisherMessage extends DeadLetterSuppression object ActorPublisherMessage { /** @@ -52,19 +52,19 @@ object ActorPublisherMessage { * more elements. * @param n number of requested elements */ - @SerialVersionUID(1L) case class Request(n: Long) extends ActorPublisherMessage + @SerialVersionUID(1L) final case class Request(n: Long) extends ActorPublisherMessage /** * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the * subscription. */ - @SerialVersionUID(1L) case object Cancel extends ActorPublisherMessage + @SerialVersionUID(1L) final case object Cancel extends ActorPublisherMessage /** * This message is delivered to the [[ActorPublisher]] actor in order to signal the exceeding of an subscription timeout. * Once the actor receives this message, this publisher will already be in cancelled state, thus the actor should clean-up and stop itself. */ - @SerialVersionUID(1L) case object SubscriptionTimeoutExceeded extends ActorPublisherMessage + @SerialVersionUID(1L) final case object SubscriptionTimeoutExceeded extends ActorPublisherMessage /** * Java API: get the singleton instance of the `Cancel` message @@ -330,7 +330,7 @@ trait ActorPublisher[T] extends Actor { /** * INTERNAL API */ -private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] { +private[akka] final case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[T] { import ActorPublisher._ import ActorPublisher.Internal._ @@ -363,7 +363,7 @@ private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState override def createExtension(system: ExtendedActorSystem): ActorPublisherState = new ActorPublisherState - case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState) + final case class State(subscriber: Option[Subscriber[Any]], demand: Long, lifecycleState: LifecycleState) } diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index a9d25d6007..2094ef226f 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -14,6 +14,7 @@ import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.UntypedActor +import akka.actor.DeadLetterSuppression object ActorSubscriber { @@ -26,15 +27,16 @@ object ActorSubscriber { /** * INTERNAL API */ - @SerialVersionUID(1L) private[akka] case class OnSubscribe(subscription: Subscription) + @SerialVersionUID(1L) private[akka] final case class OnSubscribe(subscription: Subscription) + extends DeadLetterSuppression } -sealed abstract class ActorSubscriberMessage +sealed abstract class ActorSubscriberMessage extends DeadLetterSuppression object ActorSubscriberMessage { - @SerialVersionUID(1L) case class OnNext(element: Any) extends ActorSubscriberMessage - @SerialVersionUID(1L) case class OnError(cause: Throwable) extends ActorSubscriberMessage + @SerialVersionUID(1L) final case class OnNext(element: Any) extends ActorSubscriberMessage + @SerialVersionUID(1L) final case class OnError(cause: Throwable) extends ActorSubscriberMessage @SerialVersionUID(1L) case object OnComplete extends ActorSubscriberMessage /** @@ -98,7 +100,7 @@ object WatermarkRequestStrategy { * Requests up to the `highWatermark` when the `remainingRequested` is * below the `lowWatermark`. This a good strategy when the actor performs work itself. */ -case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { +final case class WatermarkRequestStrategy(highWatermark: Int, lowWatermark: Int) extends RequestStrategy { /** * Create [[WatermarkRequestStrategy]] with `lowWatermark` as half of @@ -213,7 +215,7 @@ trait ActorSubscriber extends Actor { */ protected[akka] override def aroundPostRestart(reason: Throwable): Unit = { state.get(self) foreach { s ⇒ - // restore previous state + // restore previous state subscription = s.subscription requested = s.requested canceled = s.canceled @@ -290,7 +292,7 @@ private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberSta override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = new ActorSubscriberState - case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) + final case class State(subscription: Option[Subscription], requested: Long, canceled: Boolean) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index b1dd6acae2..158a7e4df1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -159,11 +159,11 @@ private[akka] object Ast { override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } - case class DirectProcessor(p: () ⇒ Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode { + final case class DirectProcessor(p: () ⇒ Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode { override def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } - case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode { + final case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key[_], attributes: OperationAttributes = processorWithKey) extends AstNode { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) } @@ -206,7 +206,7 @@ private[akka] object Ast { final case class Zip22With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22](f: Function22[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, Any], attributes: OperationAttributes) extends ZipWith // FIXME Why do we need this? - case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode + final case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode final case class Merge(attributes: OperationAttributes) extends FanInAstNode final case class MergePreferred(attributes: OperationAttributes) extends FanInAstNode @@ -544,7 +544,7 @@ private[akka] class FlowNameCounter extends Extension { private[akka] object StreamSupervisor { def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings)) - case class Materialize(props: Props, name: String) + final case class Materialize(props: Props, name: String) extends DeadLetterSuppression } private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index a672d2548a..f8431217af 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -17,7 +17,7 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { /** * INTERNAL API */ -private[akka] case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { +private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = ReactiveStreamsCompliance.tryOnError(subscriber, t) // FIXME how to deal with spec violations here? def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 33dad6bfc7..a99c515ec3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -8,16 +8,17 @@ import akka.actor.Props import akka.stream.MaterializerSettings import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import org.reactivestreams.{ Subscription, Subscriber } +import akka.actor.DeadLetterSuppression /** * INTERNAL API */ private[akka] object FanIn { - case class OnError(id: Int, cause: Throwable) - case class OnComplete(id: Int) - case class OnNext(id: Int, e: Any) - case class OnSubscribe(id: Int, subscription: Subscription) + final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression + final case class OnComplete(id: Int) extends DeadLetterSuppression + final case class OnNext(id: Int, e: Any) extends DeadLetterSuppression + final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { override def onError(cause: Throwable): Unit = impl ! OnError(id, cause) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index a8dd5cb71b..29e85c9fce 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -4,22 +4,22 @@ package akka.stream.impl import scala.collection.immutable - import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Props import akka.stream.MaterializerSettings import org.reactivestreams.Subscription +import akka.actor.DeadLetterSuppression /** * INTERNAL API */ private[akka] object FanOut { - case class SubstreamRequestMore(id: Int, demand: Long) - case class SubstreamCancel(id: Int) - case class SubstreamSubscribePending(id: Int) + final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression + final case class SubstreamCancel(id: Int) extends DeadLetterSuppression + final case class SubstreamSubscribePending(id: Int) extends DeadLetterSuppression class SubstreamSubscription(val parent: ActorRef, val id: Int) extends Subscription { override def request(elements: Long): Unit = @@ -33,7 +33,7 @@ private[akka] object FanOut { override def createSubscription(): Subscription = new SubstreamSubscription(actor, id) } - case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) + final case class ExposedPublishers(publishers: immutable.Seq[ActorPublisher[Any]]) extends DeadLetterSuppression class OutputBunch(outputCount: Int, impl: ActorRef, pump: Pump) { private var bunchCancelled = false diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index b84234e2d7..6a7f6b21b8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -7,17 +7,16 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.actor.Status import akka.actor.SupervisorStrategy import akka.stream.MaterializerSettings - import akka.pattern.pipe import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -27,8 +26,8 @@ private[akka] object FuturePublisher { Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher) object FutureSubscription { - case class Cancel(subscription: FutureSubscription) - case class RequestMore(subscription: FutureSubscription, elements: Long) + final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression + final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression } class FutureSubscription(ref: ActorRef) extends Subscription { diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index 2ea5fc030b..0a2b69298f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -27,7 +27,7 @@ private[akka] object IteratorPublisher { private case object Initialized extends State private case object Cancelled extends StopState private case object Completed extends StopState - private case class Errored(cause: Throwable) extends StopState + private final case class Errored(cause: Throwable) extends StopState } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala index f55e1fed77..65d5150739 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala @@ -12,6 +12,7 @@ import akka.stream.MaterializerSettings import akka.pattern.pipe import scala.annotation.tailrec import akka.actor.Props +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -29,8 +30,8 @@ private[akka] object MapAsyncProcessorImpl { } } - case class FutureElement(seqNo: Long, element: Any) - case class FutureFailure(cause: Throwable) + final case class FutureElement(seqNo: Long, element: Any) extends DeadLetterSuppression + final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala index e6a35505f9..fa51c8e588 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncUnorderedProcessorImpl.scala @@ -9,6 +9,7 @@ import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings import akka.pattern.pipe import akka.actor.Props +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -17,8 +18,8 @@ private[akka] object MapAsyncUnorderedProcessorImpl { def props(settings: MaterializerSettings, f: Any ⇒ Future[Any]): Props = Props(new MapAsyncUnorderedProcessorImpl(settings, f)) - case class FutureElement(element: Any) - case class FutureFailure(cause: Throwable) + final case class FutureElement(element: Any) extends DeadLetterSuppression + final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 882e410047..39734637cc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -5,21 +5,28 @@ package akka.stream.impl import language.existentials import org.reactivestreams.Subscription +import akka.actor.DeadLetterSuppression /** * INTERNAL API */ -private[akka] case object SubscribePending -/** - * INTERNAL API - */ -private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Long) -/** - * INTERNAL API - */ -private[akka] case class Cancel(subscription: ActorSubscription[_]) -/** - * INTERNAL API - */ -private[akka] case class ExposedPublisher(publisher: ActorPublisher[Any]) +private[akka] case object SubscribePending extends DeadLetterSuppression + +/** + * INTERNAL API + */ +private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long) + extends DeadLetterSuppression + +/** + * INTERNAL API + */ +private[akka] final case class Cancel(subscription: ActorSubscription[_]) + extends DeadLetterSuppression + +/** + * INTERNAL API + */ +private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any]) + extends DeadLetterSuppression diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 7260e973fa..ed2fe7f66c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -6,22 +6,22 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorLogging import akka.actor.Cancellable - import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.collection.mutable import scala.concurrent.duration.FiniteDuration +import akka.actor.DeadLetterSuppression /** * INTERNAL API */ private[akka] object MultiStreamOutputProcessor { - case class SubstreamKey(id: Long) - case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) - case class SubstreamCancel(substream: SubstreamKey) - case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) - case class SubstreamSubscriptionTimeout(substream: SubstreamKey) + final case class SubstreamKey(id: Long) + final case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) extends DeadLetterSuppression + final case class SubstreamCancel(substream: SubstreamKey) extends DeadLetterSuppression + final case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) extends DeadLetterSuppression + final case class SubstreamSubscriptionTimeout(substream: SubstreamKey) extends DeadLetterSuppression class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { override def request(elements: Long): Unit = parent ! SubstreamRequestMore(substreamKey, elements) @@ -191,7 +191,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS super.pumpFinished() } - override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement + override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement } /** @@ -205,10 +205,10 @@ private[akka] object TwoStreamInputProcessor { override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription) } - case object OtherStreamOnComplete - case class OtherStreamOnNext(element: Any) - case class OtherStreamOnSubscribe(subscription: Subscription) - case class OtherStreamOnError(ex: Throwable) + case object OtherStreamOnComplete extends DeadLetterSuppression + final case class OtherStreamOnNext(element: Any) extends DeadLetterSuppression + final case class OtherStreamOnSubscribe(subscription: Subscription) extends DeadLetterSuppression + final case class OtherStreamOnError(ex: Throwable) extends DeadLetterSuppression } /** @@ -358,4 +358,4 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe } override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 0c19f98ae0..6285dc2153 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -26,7 +26,7 @@ private[akka] object SubscriberManagement { def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onComplete() } - case class ErrorCompleted(cause: Throwable) extends EndOfStream { + final case class ErrorCompleted(cause: Throwable) extends EndOfStream { def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 48ebdfeba5..00d29b3784 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -4,14 +4,13 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean - import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } import akka.stream.MaterializerSettings import org.reactivestreams.{ Subscriber, Subscription } - import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -22,8 +21,8 @@ private[akka] object TickPublisher { Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher) object TickPublisherSubscription { - case object Cancel - case class RequestMore(elements: Long) + case object Cancel extends DeadLetterSuppression + final case class RequestMore(elements: Long) extends DeadLetterSuppression } class TickPublisherSubscription(ref: ActorRef) extends Subscription { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index 6a1a948514..be68e3f2c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -135,7 +135,7 @@ private[akka] object Always extends TransferState { /** * INTERNAL API */ -private[akka] case class TransferPhase(precondition: TransferState)(val action: () ⇒ Unit) +private[akka] final case class TransferPhase(precondition: TransferState)(val action: () ⇒ Unit) /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 4bbb7482ee..cb267d4f53 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -386,6 +386,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: case Completing ⇒ padding + "---|" case Cancelling ⇒ padding + "|---" case Failing(e) ⇒ padding + s"---X ${e.getMessage}" + case other ⇒ padding + s"---? $state" } println(icon) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 83e2a4ecf1..5cf745685f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -35,8 +35,8 @@ private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) ex import Collect.NotApplied override def onPush(elem: In, ctx: Context[Out]): Directive = pf.applyOrElse(elem, NotApplied) match { - case NotApplied ⇒ ctx.pull() - case result: Out ⇒ ctx.push(result) + case NotApplied ⇒ ctx.pull() + case result: Out @unchecked ⇒ ctx.push(result) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala index 1b8ce3936d..af0e663c75 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -20,6 +20,7 @@ import akka.stream.scaladsl.StreamTcp import akka.util.ByteString import org.reactivestreams.Processor import org.reactivestreams.Subscriber +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -28,29 +29,34 @@ private[akka] object StreamTcpManager { /** * INTERNAL API */ - private[akka] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]], - localAddressPromise: Promise[InetSocketAddress], - remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: immutable.Traversable[SocketOption], - connectTimeout: Duration, - idleTimeout: Duration) + private[akka] final case class Connect( + processorPromise: Promise[Processor[ByteString, ByteString]], + localAddressPromise: Promise[InetSocketAddress], + remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress], + options: immutable.Traversable[SocketOption], + connectTimeout: Duration, + idleTimeout: Duration) + extends DeadLetterSuppression /** * INTERNAL API */ - private[akka] case class Bind(localAddressPromise: Promise[InetSocketAddress], - unbindPromise: Promise[() ⇒ Future[Unit]], - flowSubscriber: Subscriber[StreamTcp.IncomingConnection], - endpoint: InetSocketAddress, - backlog: Int, - options: immutable.Traversable[SocketOption], - idleTimeout: Duration) + private[akka] final case class Bind( + localAddressPromise: Promise[InetSocketAddress], + unbindPromise: Promise[() ⇒ Future[Unit]], + flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + endpoint: InetSocketAddress, + backlog: Int, + options: immutable.Traversable[SocketOption], + idleTimeout: Duration) + extends DeadLetterSuppression /** * INTERNAL API */ - private[akka] case class ExposedProcessor(processor: Processor[ByteString, ByteString]) + private[akka] final case class ExposedProcessor(processor: Processor[ByteString, ByteString]) + extends DeadLetterSuppression } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 5b2654e035..22ce2fcf3f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -1200,11 +1200,12 @@ class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[Flo type E = Edge[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex] - case class Memo(visited: Set[E] = Set.empty, - downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty, - upstreamPublishers: Map[E, Publisher[Any]] = Map.empty, - sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty, - materializedMap: MaterializedMap = MaterializedMap.empty) + final case class Memo( + visited: Set[E] = Set.empty, + downstreamSubscriber: Map[E, Subscriber[Any]] = Map.empty, + upstreamPublishers: Map[E, Publisher[Any]] = Map.empty, + sources: Map[SourceVertex, SinkPipe[Any]] = Map.empty, + materializedMap: MaterializedMap = MaterializedMap.empty) val result = startingNodes.foldLeft(Memo()) { case (memo, start) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala index 73c6247fc2..a40d1781c9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala @@ -40,7 +40,7 @@ private[scaladsl] object GraphFlow { } } -private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( +private[scaladsl] final case class GraphFlow[-In, CIn, COut, +Out]( inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph, @@ -125,7 +125,7 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } -private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { +private[scaladsl] final case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { override type Repr[+O] = GraphSource[COut, O] private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSink[COut] = { @@ -173,7 +173,7 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } -private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { +private[scaladsl] final case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { private[scaladsl] def remap(builder: FlowGraphBuilder): UndefinedSource[CIn] = { val nIn = UndefinedSource[CIn] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala index 3d2702eefb..c86fe4c204 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/MaterializedMap.scala @@ -66,7 +66,7 @@ trait Key[M] extends KeyedMaterializable[M] { def materialize(map: MaterializedMap): MaterializedType } -private[stream] case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap { +private[stream] final case class MaterializedMapImpl(map: Map[AnyRef, Any]) extends MaterializedMap { private def failure(key: KeyedMaterializable[_]) = { val keyType = key match { case _: KeyedSource[_, _] ⇒ "Source" diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala index 6d69435792..946cf60eca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -10,7 +10,7 @@ import akka.stream.impl.Ast.AstNode * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] * materialization. */ -case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) { +final case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) { import OperationAttributes._ @@ -59,9 +59,9 @@ case class OperationAttributes private (private val attributes: List[OperationAt object OperationAttributes { private[OperationAttributes] trait Attribute - private[OperationAttributes] case class Name(n: String) extends Attribute - private[OperationAttributes] case class InputBuffer(initial: Int, max: Int) extends Attribute - private[OperationAttributes] case class Dispatcher(dispatcher: String) extends Attribute + private[OperationAttributes] final case class Name(n: String) extends Attribute + private[OperationAttributes] final case class InputBuffer(initial: Int, max: Int) extends Attribute + private[OperationAttributes] final case class Dispatcher(dispatcher: String) extends Attribute private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = apply(List(attribute))