diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala index 6e8267ad6e..1184925a0f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala @@ -62,12 +62,11 @@ class FlowWithContextLogSpec extends StreamSpec(""" onFinish = Logging.DebugLevel, onFailure = Logging.DebugLevel) - val logging = FlowWithContext[Message, Long].log("my-log3") + val logging = FlowWithContext[Message, Long].log("my-log3").withAttributes(disableElementLogging) Source(List(Message("a", 1L), Message("b", 2L))) .asSourceWithContext(m ⇒ m.offset) .via(logging) .asSource - .withAttributes(disableElementLogging) .runWith(Sink.ignore) logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log3] Upstream finished.")) @@ -100,6 +99,22 @@ class FlowWithContextLogSpec extends StreamSpec(""" logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log5] Upstream finished.")) } + "allow disabling element logging" in { + val disableElementLogging = Attributes.logLevels( + onElement = LogLevels.Off, + onFinish = Logging.DebugLevel, + onFailure = Logging.DebugLevel) + + Source(List(Message("a", 1L), Message("b", 2L))) + .asSourceWithContext(m ⇒ m.offset) + .log("my-log6") + .withAttributes(disableElementLogging) + .asSource + .runWith(Sink.ignore) + + logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-log6] Upstream finished.")) + } + } } diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index 65ae5539bb..d16117cbcb 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -8,6 +8,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceW ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$2") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.log$default$3") + # Various compiler warnings in streams #26399 ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.PhasedFusingActorMaterializer.apply") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.ProcessorModulePhase.this") @@ -17,3 +18,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.TlsModulePh ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.snapshot.MaterializerState.requestFromChild") + +# Sets correct return type for withAttributes on Source/FlowWithContext #26411 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.withAttributes") diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 61874af190..11adbb4223 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -80,5 +80,4 @@ trait Graph[+S <: Shape, +M] { private[stream] abstract class GraphDelegate[+S <: Shape, +Mat](delegate: Graph[S, Mat]) extends Graph[S, Mat] { final override def shape: S = delegate.shape final override private[stream] def traversalBuilder: TraversalBuilder = delegate.traversalBuilder - final override def withAttributes(attr: Attributes): Graph[S, Mat] = delegate.withAttributes(attr) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 7c2f63647a..681bde134c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -57,6 +57,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: scaladsl FlowWithContext.fromPairs(under) } + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.withAttributes]]. + * + * @see [[akka.stream.javadsl.Flow.withAttributes]] + */ + override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.withAttributes(attr)) + /** * Creates a regular flow of pairs (data, context). */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index acf63b4ef7..7e2ed55850 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -40,6 +40,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Out2, Ctx2, Mat] = viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) ⇒ Pair(o, c) }.via(viaFlow).map(_.toScala))) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.withAttributes]]. + * + * @see [[akka.stream.javadsl.Source.withAttributes]] + */ + override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.withAttributes(attr)) + /** * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context). diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 87c2cec324..522202ebe5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -51,6 +51,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = FlowWithContext.from(delegate.viaMat(flow)(combine)) + /** + * Context-preserving variant of [[akka.stream.scaladsl.Flow.withAttributes]]. + * + * @see [[akka.stream.scaladsl.Flow.withAttributes]] + */ + override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + new FlowWithContext(delegate.withAttributes(attr)) + def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]: javadsl.FlowWithContext[JIn, JCtxIn, JOut, JCtxOut, JMat] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index d8637ac9e1..3ae5d7aad3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -31,6 +31,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] ( override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Out2, Ctx2, Mat3] = new SourceWithContext(delegate.viaMat(flow)(combine)) + /** + * Context-preserving variant of [[akka.stream.scaladsl.Source.withAttributes]]. + * + * @see [[akka.stream.scaladsl.Source.withAttributes]] + */ + override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = + new SourceWithContext(delegate.withAttributes(attr)) + /** * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context).