Merge pull request #18614 from akka/wip-no-newInstance-√
Removes newInstance from StageModule and reduces the noise for implem…
This commit is contained in:
commit
2838c8a512
1 changed files with 32 additions and 61 deletions
|
|
@ -93,153 +93,124 @@ private[stream] object Stages {
|
|||
import DefaultAttributes._
|
||||
|
||||
sealed trait StageModule extends FlowModule[Any, Any, Any] {
|
||||
|
||||
def attributes: Attributes
|
||||
def withAttributes(attributes: Attributes): StageModule
|
||||
|
||||
protected def newInstance: StageModule
|
||||
override def carbonCopy: Module = newInstance
|
||||
override def carbonCopy: Module = withAttributes(attributes)
|
||||
}
|
||||
|
||||
final case class StageFactory(mkStage: () ⇒ Stage[_, _], attributes: Attributes = stageFactory) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class MaterializingStageFactory(
|
||||
mkStageAndMaterialized: () ⇒ (Stage[_, _], Any),
|
||||
attributes: Attributes = stageFactory) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Identity(attributes: Attributes = Attributes.name("identity")) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Map(f: Any ⇒ Any, attributes: Attributes = map) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Log(name: String, extract: Any ⇒ Any, loggingAdapter: Option[LoggingAdapter], attributes: Attributes = map) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Filter(p: Any ⇒ Boolean, attributes: Attributes = filter) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Collect(pf: PartialFunction[Any, Any], attributes: Attributes = collect) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Recover(pf: PartialFunction[Any, Any], attributes: Attributes = recover) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class MapAsync(parallelism: Int, f: Any ⇒ Future[Any], attributes: Attributes = mapAsync) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class MapAsyncUnordered(parallelism: Int, f: Any ⇒ Future[Any], attributes: Attributes = mapAsyncUnordered) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Grouped(n: Int, attributes: Attributes = grouped) extends StageModule {
|
||||
require(n > 0, "n must be greater than 0")
|
||||
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Sliding(n: Int, step: Int, attributes: Attributes = sliding) extends StageModule {
|
||||
require(n > 0, "n must be greater than 0")
|
||||
require(step > 0, "step must be greater than 0")
|
||||
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Take(n: Long, attributes: Attributes = take) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Drop(n: Long, attributes: Attributes = drop) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class TakeWhile(p: Any ⇒ Boolean, attributes: Attributes = takeWhile) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
|
||||
}
|
||||
|
||||
final case class DropWhile(p: Any ⇒ Boolean, attributes: Attributes = dropWhile) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Scan(zero: Any, f: (Any, Any) ⇒ Any, attributes: Attributes = scan) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Fold(zero: Any, f: (Any, Any) ⇒ Any, attributes: Attributes = fold) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends StageModule {
|
||||
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
|
||||
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any, attributes: Attributes = conflate) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any), attributes: Attributes = expand) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class MapConcat(f: Any ⇒ immutable.Iterable[Any], attributes: Attributes = mapConcat) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class GroupBy(f: Any ⇒ Any, attributes: Attributes = groupBy) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class PrefixAndTail(n: Int, attributes: Attributes = prefixAndTail) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class Split(p: Any ⇒ SplitDecision, attributes: Attributes = split) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class ConcatAll(attributes: Attributes = concatAll) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule {
|
||||
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
override protected def newInstance: StageModule = this.copy()
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue