diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala index 38a091b3d0..5df7fa6b26 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala @@ -44,8 +44,8 @@ object EventSourcedEntity { entityTypeKey: EntityTypeKey[Command], entityId: String, emptyState: State, - commandHandler: (State, Command) ⇒ ReplyEffect[Event, State], - eventHandler: (State, Event) ⇒ State): EventSourcedBehavior[Command, Event, State] = + commandHandler: (State, Command) => ReplyEffect[Event, State], + eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = EventSourcedBehavior.withEnforcedReplies( entityTypeKey.persistenceIdFrom(entityId), emptyState, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 417043dac4..d87d899a57 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1104,7 +1104,7 @@ class DDataShardCoordinator( stateInitialized() activate() - case g: GetShardHome ⇒ + case g: GetShardHome => stashGetShardHomeRequest(sender(), g) case _ => stash() @@ -1182,7 +1182,7 @@ class DDataShardCoordinator( private def unstashGetShardHomeRequests(): Unit = { getShardHomeRequests.foreach { - case (originalSender, request) ⇒ self.tell(request, sender = originalSender) + case (originalSender, request) => self.tell(request, sender = originalSender) } getShardHomeRequests = Set.empty } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 4fb3c513b2..7be285c850 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -181,7 +181,7 @@ class QueueSinkSpec extends StreamSpec { val expected = List(Some(1), Some(2), Some(3), None) val javadslQueue = Source(expected.flatten).runWith(Sink.queue()).asJava val scaladslQueue = akka.stream.javadsl.SinkQueueWithCancel.asScala(javadslQueue) - expected.foreach { v ⇒ + expected.foreach { v => scaladslQueue.pull().pipeTo(testActor) expectMsg(v) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala index d57e365998..477b88ab1e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SetupSpec.scala @@ -16,7 +16,7 @@ class SetupSpec extends StreamSpec { "Source.setup" should { "expose materializer" in { - val source = Source.setup { (mat, _) ⇒ + val source = Source.setup { (mat, _) => Source.single(mat.isShutdown) } @@ -24,7 +24,7 @@ class SetupSpec extends StreamSpec { } "expose attributes" in { - val source = Source.setup { (_, attr) ⇒ + val source = Source.setup { (_, attr) => Source.single(attr.attributeList) } @@ -32,7 +32,7 @@ class SetupSpec extends StreamSpec { } "propagate materialized value" in { - val source = Source.setup { (_, _) ⇒ + val source = Source.setup { (_, _) => Source.maybe[NotUsed] } @@ -43,7 +43,7 @@ class SetupSpec extends StreamSpec { "propagate attributes" in { val source = Source - .setup { (_, attr) ⇒ + .setup { (_, attr) => Source.single(attr.nameLifted) } .named("my-name") @@ -53,8 +53,8 @@ class SetupSpec extends StreamSpec { "propagate attributes when nested" in { val source = Source - .setup { (_, _) ⇒ - Source.setup { (_, attr) ⇒ + .setup { (_, _) => + Source.setup { (_, attr) => Source.single(attr.nameLifted) } } @@ -65,7 +65,7 @@ class SetupSpec extends StreamSpec { "handle factory failure" in { val error = new Error("boom") - val source = Source.setup { (_, _) ⇒ + val source = Source.setup { (_, _) => throw error } @@ -76,8 +76,8 @@ class SetupSpec extends StreamSpec { "handle materialization failure" in { val error = new Error("boom") - val source = Source.setup { (_, _) ⇒ - Source.empty.mapMaterializedValue(_ ⇒ throw error) + val source = Source.setup { (_, _) => + Source.empty.mapMaterializedValue(_ => throw error) } val (materialized, completion) = source.toMat(Sink.head)(Keep.both).run() @@ -90,7 +90,7 @@ class SetupSpec extends StreamSpec { "Flow.setup" should { "expose materializer" in { - val flow = Flow.setup { (mat, _) ⇒ + val flow = Flow.setup { (mat, _) => Flow.fromSinkAndSource(Sink.ignore, Source.single(mat.isShutdown)) } @@ -98,7 +98,7 @@ class SetupSpec extends StreamSpec { } "expose attributes" in { - val flow = Flow.setup { (_, attr) ⇒ + val flow = Flow.setup { (_, attr) => Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.attributeList)) } @@ -106,7 +106,7 @@ class SetupSpec extends StreamSpec { } "propagate materialized value" in { - val flow = Flow.setup { (_, _) ⇒ + val flow = Flow.setup { (_, _) => Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[NotUsed])(Keep.right) } @@ -117,7 +117,7 @@ class SetupSpec extends StreamSpec { "propagate attributes" in { val flow = Flow - .setup { (_, attr) ⇒ + .setup { (_, attr) => Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) } .named("my-name") @@ -127,8 +127,8 @@ class SetupSpec extends StreamSpec { "propagate attributes when nested" in { val flow = Flow - .setup { (_, _) ⇒ - Flow.setup { (_, attr) ⇒ + .setup { (_, _) => + Flow.setup { (_, attr) => Flow.fromSinkAndSource(Sink.ignore, Source.single(attr.nameLifted)) } } @@ -139,7 +139,7 @@ class SetupSpec extends StreamSpec { "handle factory failure" in { val error = new Error("boom") - val flow = Flow.setup { (_, _) ⇒ + val flow = Flow.setup { (_, _) => throw error } @@ -150,8 +150,8 @@ class SetupSpec extends StreamSpec { "handle materialization failure" in { val error = new Error("boom") - val flow = Flow.setup { (_, _) ⇒ - Flow[NotUsed].mapMaterializedValue(_ ⇒ throw error) + val flow = Flow.setup { (_, _) => + Flow[NotUsed].mapMaterializedValue(_ => throw error) } val (materialized, completion) = Source.empty.viaMat(flow)(Keep.right).toMat(Sink.head)(Keep.both).run() @@ -164,7 +164,7 @@ class SetupSpec extends StreamSpec { "Sink.setup" should { "expose materializer" in { - val sink = Sink.setup { (mat, _) ⇒ + val sink = Sink.setup { (mat, _) => Sink.fold(mat.isShutdown)(Keep.left) } @@ -172,7 +172,7 @@ class SetupSpec extends StreamSpec { } "expose attributes" in { - val sink = Sink.setup { (_, attr) ⇒ + val sink = Sink.setup { (_, attr) => Sink.fold(attr.attributeList)(Keep.left) } @@ -180,7 +180,7 @@ class SetupSpec extends StreamSpec { } "propagate materialized value" in { - val sink = Sink.setup { (_, _) ⇒ + val sink = Sink.setup { (_, _) => Sink.fold(NotUsed)(Keep.left) } @@ -189,7 +189,7 @@ class SetupSpec extends StreamSpec { "propagate attributes" in { val sink = Sink - .setup { (_, attr) ⇒ + .setup { (_, attr) => Sink.fold(attr.nameLifted)(Keep.left) } .named("my-name") @@ -199,8 +199,8 @@ class SetupSpec extends StreamSpec { "propagate attributes when nested" in { val sink = Sink - .setup { (_, _) ⇒ - Sink.setup { (_, attr) ⇒ + .setup { (_, _) => + Sink.setup { (_, attr) => Sink.fold(attr.nameLifted)(Keep.left) } } @@ -211,7 +211,7 @@ class SetupSpec extends StreamSpec { "handle factory failure" in { val error = new Error("boom") - val sink = Sink.setup { (_, _) ⇒ + val sink = Sink.setup { (_, _) => throw error } @@ -220,8 +220,8 @@ class SetupSpec extends StreamSpec { "handle materialization failure" in { val error = new Error("boom") - val sink = Sink.setup { (_, _) ⇒ - Sink.ignore.mapMaterializedValue(_ ⇒ throw error) + val sink = Sink.setup { (_, _) => + Sink.ignore.mapMaterializedValue(_ => throw error) } Source.empty.runWith(sink).failed.futureValue.getCause shouldBe error diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala index 333564136e..d0e0be88c2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala @@ -13,7 +13,7 @@ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal /** Internal Api */ -@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]) +@InternalApi private[stream] final class SetupSinkStage[T, M](factory: (ActorMaterializer, Attributes) => Sink[T, M]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { private val in = Inlet[T]("SetupSinkStage.in") @@ -28,8 +28,8 @@ import scala.util.control.NonFatal import SetupStage._ val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in))) - setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet)) + subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) + setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) override def preStart(): Unit = { try { @@ -38,7 +38,7 @@ import scala.util.control.NonFatal val mat = Source.fromGraph(subOutlet.source).runWith(sink.withAttributes(attributes))(subFusingMaterializer) matPromise.success(mat) } catch { - case NonFatal(ex) ⇒ + case NonFatal(ex) => matPromise.failure(ex) throw ex } @@ -49,7 +49,7 @@ import scala.util.control.NonFatal /** Internal Api */ @InternalApi private[stream] final class SetupFlowStage[T, U, M]( - factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]) + factory: (ActorMaterializer, Attributes) => Flow[T, U, M]) extends GraphStageWithMaterializedValue[FlowShape[T, U], Future[M]] { private val in = Inlet[T]("SetupFlowStage.in") @@ -67,10 +67,10 @@ import scala.util.control.NonFatal val subInlet = new SubSinkInlet[U]("SetupFlowStage") val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") - subInlet.setHandler(delegateToOutlet(push(out, _: U), () ⇒ complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() ⇒ pull(in), () ⇒ cancel(in))) + subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) + subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) - setHandler(in, delegateToSubOutlet(() ⇒ grab(in), subOutlet)) + setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) setHandler(out, delegateToSubInlet(subInlet)) override def preStart(): Unit = { @@ -84,7 +84,7 @@ import scala.util.control.NonFatal .run()(subFusingMaterializer) matPromise.success(mat) } catch { - case NonFatal(ex) ⇒ + case NonFatal(ex) => matPromise.failure(ex) throw ex } @@ -93,7 +93,8 @@ import scala.util.control.NonFatal } /** Internal Api */ -@InternalApi private[stream] final class SetupSourceStage[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]) +@InternalApi private[stream] final class SetupSourceStage[T, M]( + factory: (ActorMaterializer, Attributes) => Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { private val out = Outlet[T]("SetupSourceStage.out") @@ -108,7 +109,7 @@ import scala.util.control.NonFatal import SetupStage._ val subInlet = new SubSinkInlet[T]("SetupSourceStage") - subInlet.setHandler(delegateToOutlet(push(out, _: T), () ⇒ complete(out), fail(out, _), subInlet)) + subInlet.setHandler(delegateToOutlet(push(out, _: T), () => complete(out), fail(out, _), subInlet)) setHandler(out, delegateToSubInlet(subInlet)) override def preStart(): Unit = { @@ -118,7 +119,7 @@ import scala.util.control.NonFatal val mat = source.withAttributes(attributes).to(Sink.fromGraph(subInlet.sink)).run()(subFusingMaterializer) matPromise.success(mat) } catch { - case NonFatal(ex) ⇒ + case NonFatal(ex) => matPromise.failure(ex) throw ex } @@ -127,7 +128,7 @@ import scala.util.control.NonFatal } private object SetupStage { - def delegateToSubOutlet[T](grab: () ⇒ T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { + def delegateToSubOutlet[T](grab: () => T, subOutlet: GraphStageLogic#SubSourceOutlet[T]) = new InHandler { override def onPush(): Unit = subOutlet.push(grab()) override def onUpstreamFinish(): Unit = @@ -137,9 +138,9 @@ private object SetupStage { } def delegateToOutlet[T]( - push: T ⇒ Unit, - complete: () ⇒ Unit, - fail: Throwable ⇒ Unit, + push: T => Unit, + complete: () => Unit, + fail: Throwable => Unit, subInlet: GraphStageLogic#SubSinkInlet[T]) = new InHandler { override def onPush(): Unit = push(subInlet.grab()) @@ -156,7 +157,7 @@ private object SetupStage { subInlet.cancel() } - def delegateToInlet(pull: () ⇒ Unit, cancel: () ⇒ Unit) = new OutHandler { + def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { override def onPull(): Unit = pull() override def onDownstreamFinish(): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5bd9b1ad6b..b1c8441993 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -69,7 +69,7 @@ object Flow { */ def setup[I, O, M]( factory: BiFunction[ActorMaterializer, Attributes, Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = - scaladsl.Flow.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Flow.setup((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava /** * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 4bd6c2fe2e..7976d1a651 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -293,7 +293,7 @@ object Sink { * [[Attributes]] of the [[Sink]] returned by this method. */ def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Sink[T, M]]): Sink[T, CompletionStage[M]] = - scaladsl.Sink.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Sink.setup((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava /** * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 457d9224d0..0ded76645d 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -387,7 +387,7 @@ object Source { * [[Attributes]] of the [[Source]] returned by this method. */ def setup[T, M](factory: BiFunction[ActorMaterializer, Attributes, Source[T, M]]): Source[T, CompletionStage[M]] = - scaladsl.Source.setup((mat, attr) ⇒ factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Source.setup((mat, attr) => factory(mat, attr).asScala).mapMaterializedValue(_.toJava).asJava /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8679454a3d..4f2494ca9b 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -393,7 +393,7 @@ object Flow { * exposes [[ActorMaterializer]] which is going to be used during materialization and * [[Attributes]] of the [[Flow]] returned by this method. */ - def setup[T, U, M](factory: (ActorMaterializer, Attributes) ⇒ Flow[T, U, M]): Flow[T, U, Future[M]] = + def setup[T, U, M](factory: (ActorMaterializer, Attributes) => Flow[T, U, M]): Flow[T, U, Future[M]] = Flow.fromGraph(new SetupFlowStage(factory)) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 378b5d9e31..ad9cf40474 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -150,7 +150,7 @@ object Sink { * exposes [[ActorMaterializer]] which is going to be used during materialization and * [[Attributes]] of the [[Sink]] returned by this method. */ - def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Sink[T, M]): Sink[T, Future[M]] = + def setup[T, M](factory: (ActorMaterializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] = Sink.fromGraph(new SetupSinkStage(factory)) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5a4804b23b..bb27cc0ba0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -300,7 +300,7 @@ object Source { * exposes [[ActorMaterializer]] which is going to be used during materialization and * [[Attributes]] of the [[Source]] returned by this method. */ - def setup[T, M](factory: (ActorMaterializer, Attributes) ⇒ Source[T, M]): Source[T, Future[M]] = + def setup[T, M](factory: (ActorMaterializer, Attributes) => Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new SetupSourceStage(factory)) /**