Remove SourceModule/SinkModule newInstance (#30244)

* Remove SourceModule/SinkModule newInstance

AFAICS it's not public API and not called anywhere
Extracted from #30223

* mima excludes
This commit is contained in:
Arnout Engelen 2021-05-19 20:51:30 +02:00 committed by GitHub
parent a19e73a253
commit 79b48ca354
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 3 additions and 30 deletions

View file

@ -0,0 +1 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.testkit.StreamTestKit#ProbeS*.newInstance")

View file

@ -918,8 +918,6 @@ private[testkit] object StreamTestKit {
val probe = TestPublisher.probe[T]()
(probe, probe)
}
override protected def newInstance(shape: SourceShape[T]): SourceModule[T, TestPublisher.Probe[T]] =
new ProbeSource[T](attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[T, TestPublisher.Probe[T]] =
new ProbeSource[T](attr, amendShape(attr))
}
@ -930,8 +928,6 @@ private[testkit] object StreamTestKit {
val probe = TestSubscriber.probe[T]()
(probe, probe)
}
override protected def newInstance(shape: SinkShape[T]): SinkModule[T, TestSubscriber.Probe[T]] =
new ProbeSink[T](attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[T, TestSubscriber.Probe[T]] =
new ProbeSink[T](attr, amendShape(attr))
}

View file

@ -569,9 +569,6 @@ class FlowGroupBySpec extends StreamSpec("""
promise.success(probe)
(probe, probe)
}
override protected def newInstance(
shape: SinkShape[ByteString]): SinkModule[ByteString, TestSubscriber.Probe[ByteString]] =
new ProbeSink(attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[ByteString, TestSubscriber.Probe[ByteString]] =
new ProbeSink(attr, amendShape(attr))
}

View file

@ -0,0 +1 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.*.newInstance")

View file

@ -25,13 +25,9 @@ import akka.stream.impl.StreamLayout.AtomicModule
def create(context: MaterializationContext): (Publisher[Out] @uncheckedVariance, Mat)
// TODO: Remove this, no longer needed?
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
// TODO: Amendshape changed the name of ports. Is it needed anymore?
def attributes: Attributes
// TODO: Amendshape changed the name of ports. Is it needed anymore?
protected def amendShape(attr: Attributes): SourceShape[Out] = {
val thisN = traversalBuilder.attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
@ -58,8 +54,6 @@ import akka.stream.impl.StreamLayout.AtomicModule
(processor, processor)
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Subscriber[Out]] =
new SubscriberSource[Out](attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[Out, Subscriber[Out]] =
new SubscriberSource[Out](attr, amendShape(attr))
}
@ -81,8 +75,6 @@ import akka.stream.impl.StreamLayout.AtomicModule
override def create(context: MaterializationContext) = (p, NotUsed)
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, NotUsed] =
new PublisherSource[Out](p, attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[Out, NotUsed] =
new PublisherSource[Out](p, attr, amendShape(attr))
}

View file

@ -5,7 +5,6 @@
package akka.stream.impl
import java.util.function.BinaryOperator
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
@ -55,10 +54,6 @@ import akka.util.ccompat._
override def traversalBuilder: TraversalBuilder =
LinearTraversalBuilder.fromModule(this, attributes).makeIsland(SinkModuleIslandTag)
// This is okay since we the only caller of this method is right below.
// TODO: Remove this, no longer needed
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
protected def amendShape(attr: Attributes): SinkShape[In] = {
val thisN = traversalBuilder.attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
@ -104,8 +99,6 @@ import akka.util.ccompat._
(proc, proc)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new PublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new PublisherSink[In](attr, amendShape(attr))
}
@ -124,9 +117,6 @@ import akka.util.ccompat._
(fanoutProcessor, fanoutProcessor)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attr, amendShape(attr))
}
@ -143,8 +133,6 @@ import akka.util.ccompat._
override def create(context: MaterializationContext) = (subscriber, NotUsed)
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
new SubscriberSink[In](subscriber, attr, amendShape(attr))
}
@ -157,8 +145,6 @@ import akka.util.ccompat._
extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) =
(new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] =
new CancelSink(attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
}