diff --git a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala index 109a4332d3..250c20dc20 100644 --- a/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala +++ b/actor/src/main/scala/org/apache/pekko/japi/function/Function.scala @@ -105,7 +105,7 @@ trait Predicate2[-T1, -T2] extends java.io.Serializable { /** * A constructor/factory, takes no parameters but creates a new value of type T every call. - * Supports throwing `Exception` in the apply, which the `java.util.function.Supplier` counterpart does not. + * Supports throwing `Exception` in the create method, which the `java.util.function.Supplier` counterpart does not. */ @nowarn("msg=@SerialVersionUID has no effect") @SerialVersionUID(1L) diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md index 7fe77db067..5c2c6e8113 100644 --- a/docs/src/main/paradox/stream/operators/ActorFlow/ask.md +++ b/docs/src/main/paradox/stream/operators/ActorFlow/ask.md @@ -19,7 +19,7 @@ This operator is included in: ## Signature -@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]" java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" } +@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5BI,A,org.apache.pekko.NotUsed]" java="#ask(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md index bf420e0586..6359a4213f 100644 --- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md +++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md @@ -19,7 +19,7 @@ This operator is included in: ## Signature -@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" } +@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:org.apache.pekko.actor.typed.ActorRef%5BQ])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java="#askWithContext(org.apache.pekko.actor.typed.ActorRef,java.time.Duration,org.apache.pekko.japi.function.Function2)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md index d23f3974b5..9721940cd0 100644 --- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md +++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md @@ -18,8 +18,8 @@ This operator is included in: ## Signature -@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" } -@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" } +@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" } +@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[I,A,org.apache.pekko.NotUsed]" java ="#askWithStatus[I,Q,A](ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q]):org.apache.pekko.stream.javadsl.Flow[I,A,org.apache.pekko.NotUsed]" } ## Description diff --git a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md index de2e6d0312..4cb4ead595 100644 --- a/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md +++ b/docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md @@ -19,7 +19,7 @@ This operator is included in: ## Signature -@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q])" } +@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:org.apache.pekko.actor.typed.ActorRef[Q])(makeMessage:(I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]])=>Q)(implicittimeout:org.apache.pekko.util.Timeout):org.apache.pekko.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),org.apache.pekko.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:org.apache.pekko.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:org.apache.pekko.japi.function.Function2[I,org.apache.pekko.actor.typed.ActorRef[org.apache.pekko.pattern.StatusReply[A]],Q])" } ## Description diff --git a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md index 35851bfea8..71cc41b153 100644 --- a/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md +++ b/docs/src/main/paradox/stream/operators/ActorSource/actorRef.md @@ -19,7 +19,7 @@ This operator is included in: ## Signature -@apidoc[ActorSource.actorRef](ActorSource$) { scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]" java="#actorRef(java.util.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)" } +@apidoc[ActorSource.actorRef](ActorSource$) { scala="#actorRef[T](completionMatcher:PartialFunction[T,Unit],failureMatcher:PartialFunction[T,Throwable],bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.actor.typed.ActorRef[T]]" java="#actorRef(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function,int,org.apache.pekko.stream.OverflowStrategy)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md index 91cdef4746..c10b18b2a9 100644 --- a/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md +++ b/docs/src/main/paradox/stream/operators/Sink/fromMaterializer.md @@ -6,7 +6,7 @@ Defer the creation of a `Sink` until materialization and access `Materializer` a ## Signature -@apidoc[Sink.fromMaterializer](Sink$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" } +@apidoc[Sink.fromMaterializer](Sink$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Sink[T,M]):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md index d6e8959bdc..4c053db52a 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/aggregateWithBoundary.md @@ -8,8 +8,8 @@ Aggregate and emit until custom boundary condition met. ## Signature -@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"} -@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(java.util.function.Supplier,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)" } +@apidoc[Source.aggregateWithBoundary](Source) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)"} +@apidoc[Flow.aggregateWithBoundary](Flow) { scala="#aggregateWithBoundary[Agg,Emit](allocate:()=%3EAgg)(aggregate:(Agg,Out)=%3E(Agg,Boolean),harvest:Agg=%3EEmit,emitOnTimer:Option[(Agg=%3EBoolean,scala.concurrent.duration.FiniteDuration)]):FlowOps.this.Repr[Emit]" java="#aggregateWithBoundary(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.Pair)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md index 75f7fccd2e..f76efe39a0 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md @@ -6,8 +6,8 @@ Delay every element passed through with a duration that can be controlled dynami ## Signature -@apidoc[Source.delayWith](Source) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)" } -@apidoc[Flow.delayWith](Flow) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(java.util.function.Supplier,org.apache.pekko.stream.DelayOverflowStrategy)" } +@apidoc[Source.delayWith](Source) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)" } +@apidoc[Flow.delayWith](Flow) { scala="#delayWith(delayStrategySupplier:()=>org.apache.pekko.stream.scaladsl.DelayStrategy[Out],overFlowStrategy:org.apache.pekko.stream.DelayOverflowStrategy):FlowOps.this.Repr[Out]" java="#delayWith(org.apache.pekko.japi.function.Creator,org.apache.pekko.stream.DelayOverflowStrategy)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md index 9fe8b8b740..a988bdb5f0 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/fromMaterializer.md @@ -6,8 +6,8 @@ Defer the creation of a `Source/Flow` until materialization and access `Material ## Signature -@apidoc[Source.fromMaterializer](Source$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" } -@apidoc[Flow.fromMaterializer](Flow$) { scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]" java="#fromMaterializer(java.util.function.BiFunction)" } +@apidoc[Source.fromMaterializer](Source$) { scala="#fromMaterializer[T,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Source[T,M]):org.apache.pekko.stream.scaladsl.Source[T,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" } +@apidoc[Flow.fromMaterializer](Flow$) { scala="#fromMaterializer[T,U,M](factory:(org.apache.pekko.stream.Materializer,org.apache.pekko.stream.Attributes)=>org.apache.pekko.stream.scaladsl.Flow[T,U,M]):org.apache.pekko.stream.scaladsl.Flow[T,U,scala.concurrent.Future[M]]" java="#fromMaterializer(org.apache.pekko.japi.function.Function2)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md index 0772ce4b43..66d43d8d8a 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorComplete.md @@ -6,9 +6,9 @@ Allows completing the stream when an upstream error occurs. ## Signature -@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" } +@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" } @apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" } -@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" } +@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(org.apache.pekko.japi.function.Predicate)" } @apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md index 7aac41bcfc..e4ff749b01 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recover.md @@ -6,8 +6,8 @@ Allow sending of one last element downstream when a failure has happened upstrea ## Signature -@apidoc[Source.recover](Source) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,java.util.function.Supplier)" } -@apidoc[Flow.recover](Flow) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,java.util.function.Supplier)" } +@apidoc[Source.recover](Source) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" } +@apidoc[Flow.recover](Flow) { scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(scala.PartialFunction)" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md index 4d62c22c05..b1e78b3d22 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWith.md @@ -6,8 +6,8 @@ Allow switching to alternative Source when a failure has happened upstream. ## Signature -@apidoc[Source.recoverWith](Source) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,java.util.function.Supplier)" } -@apidoc[Flow.recoverWith](Flow) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,java.util.function.Supplier)" } +@apidoc[Source.recoverWith](Source) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" } +@apidoc[Flow.recoverWith](Flow) { scala="#recoverWith[T>:Out](pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWith(java.lang.Class,org.apache.pekko.japi.function.Creator)" } ## Description diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md index 666794a1ee..f7631f0a7c 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/recoverWithRetries.md @@ -6,8 +6,8 @@ RecoverWithRetries allows to switch to alternative Source on flow failure. ## Signature -@apidoc[Source.recoverWithRetries](Source) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" } -@apidoc[Flow.recoverWithRetries](Flow) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)" } +@apidoc[Source.recoverWithRetries](Source) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)" } +@apidoc[Flow.recoverWithRetries](Flow) { scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)" } ## Description diff --git a/docs/src/main/paradox/stream/stream-dynamic.md b/docs/src/main/paradox/stream/stream-dynamic.md index 18a3c24f06..5ad915c636 100644 --- a/docs/src/main/paradox/stream/stream-dynamic.md +++ b/docs/src/main/paradox/stream/stream-dynamic.md @@ -219,7 +219,7 @@ It is possible to define how many initial consumers that are required before it to the attached consumers. While not enough consumers have been attached messages are buffered and when the buffer is full the upstream producer is backpressured. No messages are dropped. -The above example illustrate a stateless partition function. For more advanced stateful routing the @java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,java.util.function.Supplier,int))] +The above example illustrate a stateless partition function. For more advanced stateful routing the @java[@javadoc[ofStateful](pekko.stream.javadsl.PartitionHub$#ofStateful(java.lang.Class,org.apache.pekko.japi.function.Creator,int))] @scala[@scaladoc[statefulSink](pekko.stream.scaladsl.PartitionHub$#statefulSink[T](partitioner:()=%3E(org.apache.pekko.stream.scaladsl.PartitionHub.ConsumerInfo,T)=%3ELong,startAfterNrOfConsumers:Int,bufferSize:Int):org.apache.pekko.stream.scaladsl.Sink[T,org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.NotUsed]])] can be used. Here is an example of a stateful round-robin function: Scala diff --git a/docs/src/main/paradox/stream/stream-error.md b/docs/src/main/paradox/stream/stream-error.md index 17001855fa..dd1f13f2a7 100644 --- a/docs/src/main/paradox/stream/stream-error.md +++ b/docs/src/main/paradox/stream/stream-error.md @@ -20,8 +20,8 @@ Each of the operators downstream gets informed about the failure and each upstre In many cases you may want to avoid complete stream failure, this can be done in a few different ways: - * @apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} to emit a final element then complete the stream normally on upstream failure - * @apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} to create a new upstream and start consuming from that on failure + * @apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} to emit a final element then complete the stream normally on upstream failure + * @apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"} to create a new upstream and start consuming from that on failure * Restarting sections of the stream after a backoff * Using a supervision strategy for operators that support it @@ -52,7 +52,7 @@ in @ref:[Logging in streams](stream-cookbook.md#logging-in-streams). ## Recover -@apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} allows you to emit a final element and then complete the stream on an upstream failure. +@apidoc[recover](stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,org.apache.pekko.japi.function.Creator)"} allows you to emit a final element and then complete the stream on an upstream failure. Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception does not have a @scala[matching case] @java[match defined] the stream is failed. @@ -80,7 +80,7 @@ Java ## Recover with retries -@apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} allows you to put a new upstream in place of the failed one, recovering +@apidoc[recoverWithRetries](stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[T],org.apache.pekko.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,org.apache.pekko.japi.function.Creator)"} allows you to put a new upstream in place of the failed one, recovering stream failures up to a specified maximum number of times. Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception diff --git a/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes new file mode 100644 index 0000000000..f412fc2a45 --- /dev/null +++ b/stream-typed/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Migrate java.util.function.* to pekko.japi.function.* +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.ask") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatus") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithStatusAndContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorFlow.askWithContext") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.typed.javadsl.ActorSource.actorRef") diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala index e683830b0b..8873facb45 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorFlow.scala @@ -13,12 +13,12 @@ package org.apache.pekko.stream.typed.javadsl -import java.util.function.BiFunction import scala.concurrent.duration._ import org.apache.pekko import pekko.NotUsed import pekko.actor.typed.ActorRef import pekko.japi.Pair +import pekko.japi.function import pekko.pattern.StatusReply import pekko.stream.javadsl.Flow import pekko.util.JavaDurationConverters @@ -68,7 +68,7 @@ object ActorFlow { def ask[I, Q, A]( ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[I, A, NotUsed] = + makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[I, A, NotUsed] = org.apache.pekko.stream.typed.scaladsl.ActorFlow .ask[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))( JavaDurationConverters.asFiniteDuration(timeout)) @@ -82,7 +82,7 @@ object ActorFlow { def askWithStatus[I, Q, A]( ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = + makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = org.apache.pekko.stream.typed.scaladsl.ActorFlow .askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))( JavaDurationConverters.asFiniteDuration(timeout)) @@ -139,7 +139,7 @@ object ActorFlow { parallelism: Int, ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = + makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = org.apache.pekko.stream.typed.scaladsl.ActorFlow .askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis) .asJava @@ -150,7 +150,7 @@ object ActorFlow { def askWithContext[I, Q, A, Ctx]( ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = + makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = org.apache.pekko.stream.scaladsl .Flow[Pair[I, Ctx]] .map(_.toScala) @@ -169,7 +169,7 @@ object ActorFlow { def askWithStatusAndContext[I, Q, A, Ctx]( ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = + makeMessage: function.Function2[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = org.apache.pekko.stream.scaladsl .Flow[Pair[I, Ctx]] .map(_.toScala) @@ -187,7 +187,7 @@ object ActorFlow { parallelism: Int, ref: ActorRef[Q], timeout: java.time.Duration, - makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = { + makeMessage: function.Function2[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = { org.apache.pekko.stream.scaladsl .Flow[Pair[I, Ctx]] .map(_.toScala) diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala index 5e31b916e6..e7da04a0d0 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/javadsl/ActorSource.scala @@ -13,11 +13,10 @@ package org.apache.pekko.stream.typed.javadsl -import java.util.function.Predicate - import org.apache.pekko import pekko.actor.typed._ import pekko.japi.JavaPartialFunction +import pekko.japi.function import pekko.stream.{ CompletionStrategy, OverflowStrategy } import pekko.stream.javadsl._ @@ -58,7 +57,7 @@ object ActorSource { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def actorRef[T]( - completionMatcher: Predicate[T], + completionMatcher: function.Predicate[T], failureMatcher: pekko.japi.function.Function[T, java.util.Optional[Throwable]], bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef[T]] = { diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes index 3d671dfd72..11ab37d791 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/javaapi-functions.excludes @@ -18,3 +18,30 @@ # Migrate functions in japi.* to japi.function.* ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessor") ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.JavaFlowSupport#Flow.fromProcessorMat") + +# Migrate java.util.function.* to pekko.japi.function.* +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.RestartSettings.withRestartOn") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.groupedWeighted") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.delayWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recover") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.recoverWithRetries") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.aggregateWithBoundary") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Flow.fromMaterializer") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.of") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.PartitionHub.ofStateful") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Sink.fromMaterializer") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recover") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.recoverWithRetries") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.groupedWeighted") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.delayWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.aggregateWithBoundary") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Source.fromMaterializer") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.delayWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubFlow.aggregateWithBoundary") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.delayWith") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.SubSource.aggregateWithBoundary") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.TLS.create") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.outgoingConnectionWithTls") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.stream.javadsl.Tcp.bindWithTls") diff --git a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala index e7fb56b61a..8fda40de43 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/RestartSettings.scala @@ -16,10 +16,10 @@ package org.apache.pekko.stream import scala.concurrent.duration.FiniteDuration import org.apache.pekko +import pekko.japi.function import pekko.event.Logging import pekko.event.Logging.LogLevel import pekko.util.ConstantFun -import pekko.util.FunctionConverters._ import pekko.util.JavaDurationConverters._ final class RestartSettings private ( @@ -58,8 +58,8 @@ final class RestartSettings private ( copy(maxRestarts = count, maxRestartsWithin = within.asScala) /** Decides whether the failure should restart the stream or make the surrounding stream fail */ - def withRestartOn(restartOn: java.util.function.Predicate[Throwable]): RestartSettings = - copy(restartOn = restartOn.asScala) + def withRestartOn(restartOn: function.Predicate[Throwable]): RestartSettings = + copy(restartOn = t => restartOn.test(t)) def withLogSettings(newLogSettings: RestartSettings.LogSettings): RestartSettings = copy(logSettings = newLogSettings) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 637bf91541..6e76d89579 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -16,8 +16,6 @@ package org.apache.pekko.stream.javadsl import java.util.Comparator import java.util.Optional import java.util.concurrent.CompletionStage -import java.util.function.BiFunction -import java.util.function.Supplier import scala.annotation.varargs import scala.annotation.unchecked.uncheckedVariance @@ -112,7 +110,7 @@ object Flow { * [[Attributes]] of the [[Flow]] returned by this method. */ def fromMaterializer[I, O, M]( - factory: BiFunction[Materializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = + factory: function.Function2[Materializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = scaladsl.Flow.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava /** @@ -1273,7 +1271,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels */ def groupedWeighted(minWeight: Long, - costFn: java.util.function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] = + costFn: function.Function[Out, java.lang.Long]): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) // TODO optimize to one step /** @@ -1316,7 +1314,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr */ def groupedAdjacentByWeighted[R](f: function.Function[Out, R], maxWeight: Long, - costFn: java.util.function.Function[Out, java.lang.Long]) + costFn: function.Function[Out, java.lang.Long]) : javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) @@ -1740,13 +1738,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization + * @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def delayWith( - delayStrategySupplier: Supplier[DelayStrategy[Out]], + delayStrategyCreator: function.Creator[DelayStrategy[Out]], overFlowStrategy: DelayOverflowStrategy): Flow[In, Out, Mat] = - new Flow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy)) + new Flow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy)) /** * Discard the given number of elements at the beginning of the stream. @@ -1892,9 +1890,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ - def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Flow[In, Out, Mat] = + def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = recover { - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() } /** @@ -1984,9 +1982,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr */ def recoverWith( clazz: Class[_ <: Throwable], - supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = + creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = recoverWith({ - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** @@ -2043,15 +2041,15 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * @param attempts Maximum number of retries or -1 to retry indefinitely * @param clazz the class object of the failure cause - * @param supplier supply the new Source to be materialized + * @param creator supply the new Source to be materialized */ def recoverWithRetries( attempts: Int, clazz: Class[_ <: Throwable], - supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = + creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = recoverWithRetries(attempts, { - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() }) /** @@ -2104,7 +2102,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels * @since 1.1.0 */ - def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = + def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.onErrorComplete { case ex: Throwable if predicate.test(ex) => true }) @@ -4308,13 +4306,13 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]) : javadsl.Flow[In, Emit, Mat] = { asScala - .aggregateWithBoundary(() => allocate.get())( + .aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = Option(emitOnTimer).map { @@ -4341,14 +4339,14 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]]) : javadsl.Flow[In, Emit, Mat] = { import org.apache.pekko.util.OptionConverters._ asScala - .aggregateWithBoundary(() => allocate.get())( + .aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = emitOnTimer.toScala.map { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala index be492a1dc9..3a3f34ce56 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala @@ -13,9 +13,10 @@ package org.apache.pekko.stream.javadsl -import java.util.function.{ BiFunction, Supplier, ToLongBiFunction } +import java.util.function.ToLongBiFunction import org.apache.pekko +import pekko.japi.function import pekko.NotUsed import pekko.annotation.DoNotInherit import pekko.util.unused @@ -259,11 +260,11 @@ object PartitionHub { */ def ofStateful[T]( @unused clazz: Class[T], - partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]], + partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]], startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = { val p: () => (pekko.stream.scaladsl.PartitionHub.ConsumerInfo, T) => Long = () => { - val f = partitioner.get() + val f = partitioner.create() (info, elem) => f.applyAsLong(info, elem) } pekko.stream.scaladsl.PartitionHub @@ -303,7 +304,7 @@ object PartitionHub { */ def ofStateful[T]( clazz: Class[T], - partitioner: Supplier[ToLongBiFunction[ConsumerInfo, T]], + partitioner: function.Creator[ToLongBiFunction[ConsumerInfo, T]], startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] = ofStateful(clazz, partitioner, startAfterNrOfConsumers, pekko.stream.scaladsl.PartitionHub.defaultBufferSize) @@ -338,7 +339,7 @@ object PartitionHub { */ def of[T]( @unused clazz: Class[T], - partitioner: BiFunction[Integer, T, Integer], + partitioner: function.Function2[Integer, T, Integer], startAfterNrOfConsumers: Int, bufferSize: Int): Sink[T, Source[T, NotUsed]] = pekko.stream.scaladsl.PartitionHub @@ -377,7 +378,7 @@ object PartitionHub { */ def of[T]( clazz: Class[T], - partitioner: BiFunction[Integer, T, Integer], + partitioner: function.Function2[Integer, T, Integer], startAfterNrOfConsumers: Int): Sink[T, Source[T, NotUsed]] = of(clazz, partitioner, startAfterNrOfConsumers, pekko.stream.scaladsl.PartitionHub.defaultBufferSize) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index e5f4bf7fcd..76db0ef98c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl import java.util.Optional import java.util.concurrent.{ CompletableFuture, CompletionStage } -import java.util.function.BiFunction import java.util.stream.Collector import scala.annotation.unchecked.uncheckedVariance @@ -391,7 +390,8 @@ object Sink { * exposes [[Materializer]] which is going to be used during materialization and * [[Attributes]] of the [[Sink]] returned by this method. */ - def fromMaterializer[T, M](factory: BiFunction[Materializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] = + def fromMaterializer[T, M]( + factory: function.Function2[Materializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] = scaladsl.Sink.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 3649a7ef44..96e5f852c7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -16,7 +16,6 @@ package org.apache.pekko.stream.javadsl import java.util import java.util.Optional import java.util.concurrent.{ CompletableFuture, CompletionStage } -import java.util.function.{ BiFunction, Supplier } import scala.annotation.varargs import scala.annotation.unchecked.uncheckedVariance @@ -507,7 +506,7 @@ object Source { * [[Attributes]] of the [[Source]] returned by this method. */ def fromMaterializer[T, M]( - factory: BiFunction[Materializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] = + factory: function.Function2[Materializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] = scaladsl.Source.fromMaterializer((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.asJava).asJava /** @@ -2129,9 +2128,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Source[Out, Mat] = + def recover(clazz: Class[_ <: Throwable], creator: function.Creator[Out]): javadsl.Source[Out, Mat] = recover { - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() } /** @@ -2221,9 +2220,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def recoverWith( clazz: Class[_ <: Throwable], - supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = + creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = recoverWith({ - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** @@ -2277,15 +2276,15 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * @param attempts Maximum number of retries or -1 to retry indefinitely * @param clazz the class object of the failure cause - * @param supplier supply the new Source to be materialized + * @param creator create the new Source to be materialized */ def recoverWithRetries( attempts: Int, clazz: Class[_ <: Throwable], - supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = + creator: function.Creator[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = recoverWithRetries(attempts, { - case elem if clazz.isInstance(elem) => supplier.get() + case elem if clazz.isInstance(elem) => creator.create() }: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]) /** @@ -2338,7 +2337,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels * @since 1.1.0 */ - def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] = + def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] = new Source(delegate.onErrorComplete { case ex: Throwable if predicate.test(ex) => true }) @@ -2988,7 +2987,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ - def groupedWeighted(minWeight: Long, costFn: java.util.function.Function[Out, java.lang.Long]) + def groupedWeighted(minWeight: Long, costFn: function.Function[Out, java.lang.Long]) : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWeighted(minWeight)(costFn.apply).map(_.asJava)) @@ -3031,7 +3030,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def groupedAdjacentByWeighted[R](f: function.Function[Out, R], maxWeight: Long, - costFn: java.util.function.Function[Out, java.lang.Long]) + costFn: function.Function[Out, java.lang.Long]) : javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) @@ -3444,13 +3443,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization + * @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def delayWith( - delayStrategySupplier: Supplier[DelayStrategy[Out]], + delayStrategyCreator: function.Creator[DelayStrategy[Out]], overFlowStrategy: DelayOverflowStrategy): Source[Out, Mat] = - new Source(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy)) + new Source(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy)) /** * Discard the given number of elements at the beginning of the stream. @@ -4749,12 +4748,12 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = { + emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]): javadsl.Source[Emit, Mat] = { asScala - .aggregateWithBoundary(() => allocate.get())( + .aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = Option(emitOnTimer).map { @@ -4781,13 +4780,13 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = { + emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]]): javadsl.Source[Emit, Mat] = { import org.apache.pekko.util.OptionConverters._ asScala - .aggregateWithBoundary(() => allocate.get())( + .aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = emitOnTimer.toScala.map { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e4417dc676..947432a6f1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage -import java.util.function.Supplier import scala.annotation.varargs import scala.annotation.unchecked.uncheckedVariance @@ -717,7 +716,7 @@ final class SubFlow[In, Out, Mat]( */ def groupedAdjacentByWeighted[R](f: function.Function[Out, R], maxWeight: Long, - costFn: java.util.function.Function[Out, java.lang.Long]) + costFn: function.Function[Out, java.lang.Long]) : SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) @@ -1134,13 +1133,13 @@ final class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels * - * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization + * @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def delayWith( - delayStrategySupplier: Supplier[DelayStrategy[Out]], + delayStrategyCreator: function.Creator[DelayStrategy[Out]], overFlowStrategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = - new SubFlow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy)) + new SubFlow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy)) /** * Discard the given number of elements at the beginning of the stream. @@ -1367,7 +1366,7 @@ final class SubFlow[In, Out, Mat]( * '''Cancels when''' downstream cancels * @since 1.1.0 */ - def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubFlow[In, Out, Mat] = + def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): SubFlow[In, Out, Mat] = new SubFlow(delegate.onErrorComplete { case ex: Throwable if predicate.test(ex) => true }) @@ -2790,13 +2789,13 @@ final class SubFlow[In, Out, Mat]( * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]) : javadsl.SubFlow[In, Emit, Mat] = { new SubFlow( - asScala.aggregateWithBoundary(() => allocate.get())( + asScala.aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = Option(emitOnTimer).map { @@ -2822,14 +2821,14 @@ final class SubFlow[In, Out, Mat]( * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]]) : javadsl.SubFlow[In, Emit, Mat] = { import org.apache.pekko.util.OptionConverters._ new SubFlow( - asScala.aggregateWithBoundary(() => allocate.get())( + asScala.aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = emitOnTimer.toScala.map { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index ed6b8a19dd..387c4f1b19 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -15,7 +15,6 @@ package org.apache.pekko.stream.javadsl import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage -import java.util.function.Supplier import scala.annotation.varargs import scala.annotation.unchecked.uncheckedVariance @@ -707,7 +706,7 @@ final class SubSource[Out, Mat]( */ def groupedAdjacentByWeighted[R](f: function.Function[Out, R], maxWeight: Long, - costFn: java.util.function.Function[Out, java.lang.Long]) + costFn: function.Function[Out, java.lang.Long]) : SubSource[java.util.List[Out @uncheckedVariance], Mat] = new SubSource(delegate.groupedAdjacentByWeighted(f.apply, maxWeight)(costFn.apply).map(_.asJava)) @@ -1226,13 +1225,13 @@ final class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels * - * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization + * @param delayStrategyCreator creates new [[DelayStrategy]] object for each materialization * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ def delayWith( - delayStrategySupplier: Supplier[DelayStrategy[Out]], + delayStrategyCreator: function.Creator[DelayStrategy[Out]], overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat] = - new SubSource(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy)) + new SubSource(delegate.delayWith(() => DelayStrategy.asScala(delayStrategyCreator.create()), overFlowStrategy)) /** * Recover allows to send last element on failure and gracefully complete the stream @@ -1344,7 +1343,7 @@ final class SubSource[Out, Mat]( * '''Cancels when''' downstream cancels * @since 1.1.0 */ - def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): SubSource[Out, Mat] = + def onErrorComplete(predicate: function.Predicate[_ >: Throwable]): SubSource[Out, Mat] = new SubSource(delegate.onErrorComplete { case ex: Throwable if predicate.test(ex) => true }) @@ -2764,13 +2763,13 @@ final class SubSource[Out, Mat]( * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ @deprecated("Use the overloaded one which accepts an Optional instead.", since = "1.2.0") - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Pair[java.util.function.Predicate[Agg], java.time.Duration]) + emitOnTimer: Pair[function.Predicate[Agg], java.time.Duration]) : javadsl.SubSource[Emit, Mat] = { new SubSource( - asScala.aggregateWithBoundary(() => allocate.get())( + asScala.aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = Option(emitOnTimer).map { @@ -2796,14 +2795,14 @@ final class SubSource[Out, Mat]( * @param harvest this is invoked before emit within the current stage/operator * @param emitOnTimer decide whether the current aggregated elements can be emitted, the custom function is invoked on every interval */ - def aggregateWithBoundary[Agg, Emit](allocate: java.util.function.Supplier[Agg], + def aggregateWithBoundary[Agg, Emit](allocate: function.Creator[Agg], aggregate: function.Function2[Agg, Out, Pair[Agg, Boolean]], harvest: function.Function[Agg, Emit], - emitOnTimer: Optional[Pair[java.util.function.Predicate[Agg], java.time.Duration]]) + emitOnTimer: Optional[Pair[function.Predicate[Agg], java.time.Duration]]) : javadsl.SubSource[Emit, Mat] = { import org.apache.pekko.util.OptionConverters._ new SubSource( - asScala.aggregateWithBoundary(() => allocate.get())( + asScala.aggregateWithBoundary(() => allocate.create())( aggregate = (agg, out) => aggregate.apply(agg, out).toScala, harvest = agg => harvest.apply(agg), emitOnTimer = emitOnTimer.toScala.map { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala index 0955fba7ac..30747abdbe 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/TLS.scala @@ -13,13 +13,13 @@ package org.apache.pekko.stream.javadsl -import java.util.function.{ Consumer, Supplier } import javax.net.ssl.{ SSLEngine, SSLSession } import scala.util.Try import org.apache.pekko import pekko.NotUsed +import pekko.japi.function import pekko.stream._ import pekko.stream.TLSProtocol._ import pekko.util.ByteString @@ -73,11 +73,11 @@ object TLS { * For a description of the `closing` parameter please refer to [[TLSClosing]]. */ def create( - sslEngineCreator: Supplier[SSLEngine], - sessionVerifier: Consumer[SSLSession], + sslEngineCreator: function.Creator[SSLEngine], + sessionVerifier: function.Procedure[SSLSession], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = new javadsl.BidiFlow( - scaladsl.TLS.apply(() => sslEngineCreator.get(), session => Try(sessionVerifier.accept(session)), closing)) + scaladsl.TLS.apply(() => sslEngineCreator.create(), session => Try(sessionVerifier(session)), closing)) /** * Create a StreamTls [[pekko.stream.javadsl.BidiFlow]]. This is a low-level interface. @@ -88,9 +88,9 @@ object TLS { * For a description of the `closing` parameter please refer to [[TLSClosing]]. */ def create( - sslEngineCreator: Supplier[SSLEngine], + sslEngineCreator: function.Creator[SSLEngine], closing: TLSClosing): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed] = - new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.get(), closing)) + new javadsl.BidiFlow(scaladsl.TLS.apply(() => sslEngineCreator.create(), closing)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index d34b1b51a4..7255a02e6a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -17,8 +17,6 @@ import java.lang.{ Iterable => JIterable } import java.net.InetSocketAddress import java.util.Optional import java.util.concurrent.CompletionStage -import java.util.function.{ Function => JFunction } -import java.util.function.Supplier import javax.net.ssl.SSLEngine import javax.net.ssl.SSLSession @@ -35,6 +33,7 @@ import pekko.actor.ExtensionId import pekko.actor.ExtensionIdProvider import pekko.annotation.InternalApi import pekko.io.Inet.SocketOption +import pekko.japi.function import pekko.stream.Materializer import pekko.stream.SystemMaterializer import pekko.stream.TLSClosing @@ -261,10 +260,10 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { */ def outgoingConnectionWithTls( remoteAddress: InetSocketAddress, - createSSLEngine: Supplier[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = + createSSLEngine: function.Creator[SSLEngine]): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = Flow.fromGraph( delegate - .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get()) + .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.create()) .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).asJava)) /** @@ -279,18 +278,18 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { */ def outgoingConnectionWithTls( remoteAddress: InetSocketAddress, - createSSLEngine: Supplier[SSLEngine], + createSSLEngine: function.Creator[SSLEngine], localAddress: Optional[InetSocketAddress], options: JIterable[SocketOption], connectTimeout: Optional[java.time.Duration], idleTimeout: Optional[java.time.Duration], - verifySession: JFunction[SSLSession, Optional[Throwable]], + verifySession: function.Function[SSLSession, Optional[Throwable]], closing: TLSClosing): Flow[ByteString, ByteString, CompletionStage[OutgoingConnection]] = { Flow.fromGraph( delegate .outgoingConnectionWithTls( remoteAddress, - createSSLEngine = () => createSSLEngine.get(), + createSSLEngine = () => createSSLEngine.create(), localAddress.toScala, CollectionUtil.toSeq(options), optionalDurationToScala(connectTimeout), @@ -313,10 +312,10 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { def bindWithTls( interface: String, port: Int, - createSSLEngine: Supplier[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = { + createSSLEngine: function.Creator[SSLEngine]): Source[IncomingConnection, CompletionStage[ServerBinding]] = { Source.fromGraph( delegate - .bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get()) + .bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.create()) .map(new IncomingConnection(_)) .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).asJava)) } @@ -330,18 +329,18 @@ class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { def bindWithTls( interface: String, port: Int, - createSSLEngine: Supplier[SSLEngine], + createSSLEngine: function.Creator[SSLEngine], backlog: Int, options: JIterable[SocketOption], idleTimeout: Optional[java.time.Duration], - verifySession: JFunction[SSLSession, Optional[Throwable]], + verifySession: function.Function[SSLSession, Optional[Throwable]], closing: TLSClosing): Source[IncomingConnection, CompletionStage[ServerBinding]] = { Source.fromGraph( delegate .bindWithTls( interface, port, - createSSLEngine = () => createSSLEngine.get(), + createSSLEngine = () => createSSLEngine.create(), backlog, CollectionUtil.toSeq(options), optionalDurationToScala(idleTimeout),