diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala index 1c4d5838a6..91b367ac14 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala @@ -21,11 +21,21 @@ object DeadLetterSuspensionSpec { context.system.eventStream.publish(Dropped(n, "Don't like numbers", self)) } } + + object Unandled { + def props(): Props = Props(new Unandled) + } + + class Unandled extends Actor { + override def receive: Receive = { + case n: Int => unhandled(n) + } + } } class DeadLetterSuspensionSpec extends AkkaSpec(""" akka.loglevel = INFO - akka.log-dead-letters = 3 + akka.log-dead-letters = 4 akka.log-dead-letters-suspend-duration = 2s """) with ImplicitSender { import DeadLetterSuspensionSpec._ @@ -36,6 +46,7 @@ class DeadLetterSuspensionSpec extends AkkaSpec(""" expectTerminated(deadActor) private val droppingActor = system.actorOf(Dropping.props(), "droppingActor") + private val unhandledActor = system.actorOf(Unandled.props(), "unhandledActor") private def expectedDeadLettersLogMessage(count: Int): String = s"Message [java.lang.Integer] from $testActor to $deadActor was not delivered. [$count] dead letters encountered" @@ -43,6 +54,9 @@ class DeadLetterSuspensionSpec extends AkkaSpec(""" private def expectedDroppedLogMessage(count: Int): String = s"Message [java.lang.Integer] to $droppingActor was dropped. Don't like numbers. [$count] dead letters encountered" + private def expectedUnhandledLogMessage(count: Int): String = + s"Message [java.lang.Integer] from $testActor to $unhandledActor was unhandled. [$count] dead letters encountered" + "must suspend dead-letters logging when reaching 'akka.log-dead-letters', and then re-enable" in { EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { deadActor ! 1 @@ -50,27 +64,30 @@ class DeadLetterSuspensionSpec extends AkkaSpec(""" EventFilter.info(start = expectedDroppedLogMessage(2), occurrences = 1).intercept { droppingActor ! 2 } + EventFilter.info(start = expectedUnhandledLogMessage(3), occurrences = 1).intercept { + unhandledActor ! 3 + } EventFilter - .info(start = expectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next", occurrences = 1) + .info(start = expectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next", occurrences = 1) .intercept { - deadActor ! 3 + deadActor ! 4 } - deadActor ! 4 - droppingActor ! 5 + deadActor ! 5 + droppingActor ! 6 // let suspend-duration elapse Thread.sleep(2050) // re-enabled EventFilter - .info(start = expectedDeadLettersLogMessage(6) + ", of which 2 were not logged", occurrences = 1) + .info(start = expectedDeadLettersLogMessage(7) + ", of which 2 were not logged", occurrences = 1) .intercept { - deadActor ! 6 + deadActor ! 7 } // reset count EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { - deadActor ! 7 + deadActor ! 8 } } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index f01139de27..0a9a279068 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -26,6 +26,7 @@ object EventStreamSpec { stdout-loglevel = WARNING loglevel = WARNING actor.debug.unhandled = on + log-dead-letters = off } """) diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index 48fe9fe4ea..8a816cd657 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -150,7 +150,13 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { def likeVector(bs: ByteString)(body: IndexedSeq[Byte] => Any): Boolean = { val vec = Vector(bs: _*) - body(bs) == body(vec) + val a = body(bs) + val b = body(vec) + val result = a == b + if (!result) { + println(s"$bs => $a != $vec => $b") + } + result } def likeVectors(bsA: ByteString, bsB: ByteString)(body: (IndexedSeq[Byte], IndexedSeq[Byte]) => Any): Boolean = { @@ -384,6 +390,27 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { ByteString1.fromString("0123456789").take(3).drop(1) should ===(ByteString("12")) ByteString1.fromString("0123456789").take(10).take(8).drop(3).take(5) should ===(ByteString("34567")) } + "copyToArray" in { + val byteString = ByteString1(Array[Byte](1, 2, 3, 4, 5), startIndex = 1, length = 3) + def verify(f: Array[Byte] => Unit)(expected: Byte*): Unit = { + val array = Array.fill[Byte](3)(0) + f(array) + array should ===(expected.toArray) + } + + verify(byteString.copyToArray(_, 0, 1))(2, 0, 0) + verify(byteString.copyToArray(_, 1, 1))(0, 2, 0) + verify(byteString.copyToArray(_, 2, 1))(0, 0, 2) + verify(byteString.copyToArray(_, 3, 1))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 2))(2, 3, 0) + verify(byteString.copyToArray(_, 1, 2))(0, 2, 3) + verify(byteString.copyToArray(_, 2, 2))(0, 0, 2) + verify(byteString.copyToArray(_, 3, 2))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 3))(2, 3, 4) + verify(byteString.copyToArray(_, 1, 3))(0, 2, 3) + verify(byteString.copyToArray(_, 2, 3))(0, 0, 2) + verify(byteString.copyToArray(_, 3, 3))(0, 0, 0) + } } "ByteString1C" must { "drop" in { @@ -427,6 +454,27 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { ByteString1.fromString("abcdefg").drop(2) should ===(ByteString("cdefg")) ByteString1.fromString("abcdefg").drop(2).take(1) should ===(ByteString("c")) } + "copyToArray" in { + val byteString = ByteString1C(Array[Byte](1, 2, 3)) + def verify(f: Array[Byte] => Unit)(expected: Byte*): Unit = { + val array = Array.fill[Byte](3)(0) + f(array) + array should ===(expected.toArray) + } + + verify(byteString.copyToArray(_, 0, 1))(1, 0, 0) + verify(byteString.copyToArray(_, 1, 1))(0, 1, 0) + verify(byteString.copyToArray(_, 2, 1))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 1))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 2))(1, 2, 0) + verify(byteString.copyToArray(_, 1, 2))(0, 1, 2) + verify(byteString.copyToArray(_, 2, 2))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 2))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 3))(1, 2, 3) + verify(byteString.copyToArray(_, 1, 3))(0, 1, 2) + verify(byteString.copyToArray(_, 2, 3))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 3))(0, 0, 0) + } } "ByteStrings" must { "drop" in { @@ -656,6 +704,28 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { compact.indexOf('g', 5) should ===(5) compact.indexOf('g', 6) should ===(-1) } + "copyToArray" in { + val byteString = ByteString(1, 2) ++ ByteString(3) ++ ByteString(4) + + def verify(f: Array[Byte] => Unit)(expected: Byte*): Unit = { + val array = Array.fill[Byte](3)(0) + f(array) + array should ===(expected.toArray) + } + + verify(byteString.copyToArray(_, 0, 1))(1, 0, 0) + verify(byteString.copyToArray(_, 1, 1))(0, 1, 0) + verify(byteString.copyToArray(_, 2, 1))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 1))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 2))(1, 2, 0) + verify(byteString.copyToArray(_, 1, 2))(0, 1, 2) + verify(byteString.copyToArray(_, 2, 2))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 2))(0, 0, 0) + verify(byteString.copyToArray(_, 0, 3))(1, 2, 3) + verify(byteString.copyToArray(_, 1, 3))(0, 1, 2) + verify(byteString.copyToArray(_, 2, 3))(0, 0, 1) + verify(byteString.copyToArray(_, 3, 3))(0, 0, 0) + } } "A ByteString" must { @@ -886,7 +956,7 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { case (xs, from, until) => likeVector(xs)({ it => val array = new Array[Byte](xs.length) - it.slice(from, until).copyToArray(array, from, until) + it.copyToArray(array, from, until) array.toSeq }) } @@ -1125,6 +1195,14 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers { iterator.copyToArray(array, 4, 2) assert(new String(array) === "123456") } + + "calling copyToArray with length passing end of destination" in { + // Pre fix len passing the end of the destination would cause never ending loop inside iterator copyToArray + val iterator = (ByteString(1, 2) ++ ByteString(3) ++ ByteString(4)).iterator + val array = Array.fill[Byte](3)(0) + iterator.copyToArray(array, 2, 2) + array.toSeq should ===(Seq(0, 0, 1)) + } } "decode data correctly" when { diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java index 3e0bc28e38..76ba875825 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java @@ -106,36 +106,36 @@ public interface IntroTest { // #hello-world-main // #hello-world-main-setup - public class HelloWorldMain extends AbstractBehavior { + public class HelloWorldMain extends AbstractBehavior { // #hello-world-main-setup - public static class Start { + public static class SayHello { public final String name; - public Start(String name) { + public SayHello(String name) { this.name = name; } } // #hello-world-main-setup - public static Behavior create() { + public static Behavior create() { return Behaviors.setup(HelloWorldMain::new); } private final ActorRef greeter; - private HelloWorldMain(ActorContext context) { + private HelloWorldMain(ActorContext context) { super(context); greeter = context.spawn(HelloWorld.create(), "greeter"); } // #hello-world-main-setup @Override - public Receive createReceive() { - return newReceiveBuilder().onMessage(Start.class, this::onStart).build(); + public Receive createReceive() { + return newReceiveBuilder().onMessage(SayHello.class, this::onStart).build(); } - private Behavior onStart(Start command) { + private Behavior onStart(SayHello command) { ActorRef replyTo = getContext().spawn(HelloWorldBot.create(3), command.name); greeter.tell(new HelloWorld.Greet(command.name, replyTo)); @@ -148,35 +148,35 @@ public interface IntroTest { interface CustomDispatchersExample { - public static class Start { + public static class SayHello { public final String name; - public Start(String name) { + public SayHello(String name) { this.name = name; } } // #hello-world-main-with-dispatchers - public class HelloWorldMain extends AbstractBehavior { + public class HelloWorldMain extends AbstractBehavior { // Start message... // #hello-world-main-with-dispatchers - public static class Start { + public static class SayHello { public final String name; - public Start(String name) { + public SayHello(String name) { this.name = name; } } // #hello-world-main-with-dispatchers - public static Behavior create() { + public static Behavior create() { return Behaviors.setup(HelloWorldMain::new); } private final ActorRef greeter; - private HelloWorldMain(ActorContext context) { + private HelloWorldMain(ActorContext context) { super(context); final String dispatcherPath = "akka.actor.default-blocking-io-dispatcher"; @@ -187,7 +187,7 @@ public interface IntroTest { // createReceive ... // #hello-world-main-with-dispatchers @Override - public Receive createReceive() { + public Receive createReceive() { return null; } // #hello-world-main-with-dispatchers @@ -197,11 +197,11 @@ public interface IntroTest { public static void main(String[] args) throws Exception { // #hello-world - final ActorSystem system = + final ActorSystem system = ActorSystem.create(HelloWorldMain.create(), "hello"); - system.tell(new HelloWorldMain.Start("World")); - system.tell(new HelloWorldMain.Start("Akka")); + system.tell(new HelloWorldMain.SayHello("World")); + system.tell(new HelloWorldMain.SayHello("Akka")); // #hello-world Thread.sleep(3000); diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala index ecfe1a77b5..174684ad07 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala @@ -74,9 +74,9 @@ object IntroSpec { //#hello-world-main object HelloWorldMain { - final case class Start(name: String) + final case class SayHello(name: String) - def apply(): Behavior[Start] = + def apply(): Behavior[SayHello] = Behaviors.setup { context => val greeter = context.spawn(HelloWorld(), "greeter") @@ -94,10 +94,10 @@ object IntroSpec { object CustomDispatchersExample { object HelloWorldMain { - final case class Start(name: String) + final case class SayHello(name: String) //#hello-world-main-with-dispatchers - def apply(): Behavior[Start] = + def apply(): Behavior[SayHello] = Behaviors.setup { context => val dispatcherPath = "akka.actor.default-blocking-io-dispatcher" @@ -232,11 +232,11 @@ class IntroSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogC //#fiddle_code //#hello-world - val system: ActorSystem[HelloWorldMain.Start] = + val system: ActorSystem[HelloWorldMain.SayHello] = ActorSystem(HelloWorldMain(), "hello") - system ! HelloWorldMain.Start("World") - system ! HelloWorldMain.Start("Akka") + system ! HelloWorldMain.SayHello("World") + system ! HelloWorldMain.SayHello("Akka") //#hello-world //#fiddle_code diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 968e32b19c..31f3e3c176 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -47,7 +47,8 @@ akka { # This is useful when you are uncertain of what configuration is used. log-config-on-start = off - # Log at info level when messages are sent to dead letters. + # Log at info level when messages are sent to dead letters, or published to + # eventStream as `DeadLetter`, `Dropped` or `UnhandledMessage`. # Possible values: # on: all dead letters are logged # off: no logging of dead letters diff --git a/akka-actor/src/main/scala-2.12/akka/util/ByteIterator.scala b/akka-actor/src/main/scala-2.12/akka/util/ByteIterator.scala index 1e2e27d6da..aaac46b5d5 100644 --- a/akka-actor/src/main/scala-2.12/akka/util/ByteIterator.scala +++ b/akka-actor/src/main/scala-2.12/akka/util/ByteIterator.scala @@ -300,7 +300,7 @@ object ByteIterator { final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { var pos = start var rest = len - while ((rest > 0) && !iterators.isEmpty) { + while ((rest > 0) && !iterators.isEmpty && pos < xs.length) { val n = 0 max ((xs.length - pos) min current.len min rest) current.copyToArray(xs, pos, n) pos += n diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala b/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala index cd63f960f6..4f15f7e147 100644 --- a/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala +++ b/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala @@ -309,7 +309,7 @@ object ByteIterator { final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Int = { var pos = start var rest = len - while ((rest > 0) && !iterators.isEmpty) { + while ((rest > 0) && !iterators.isEmpty && pos < xs.length) { val n = 0 max ((xs.length - pos) min current.len min rest) current.copyToArray(xs, pos, n) pos += n diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala b/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala index 8c2eac8e57..614d0e1bdc 100644 --- a/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala @@ -253,11 +253,11 @@ object ByteString { } override def copyToArray[B >: Byte](dest: Array[B], start: Int, len: Int): Int = { - val copied = math.min(math.min(len, bytes.length), dest.length - start) - if (copied > 0) { - System.arraycopy(bytes, 0, dest, start, copied) + val toCopy = math.min(math.min(len, bytes.length), dest.length - start) + if (toCopy > 0) { + Array.copy(bytes, 0, dest, start, toCopy) } - copied + toCopy } } @@ -395,11 +395,11 @@ object ByteString { override def copyToArray[B >: Byte](dest: Array[B], start: Int, len: Int): Int = { // min of the bytes available to copy, bytes there is room for in dest and the requested number of bytes - val copied = math.max(math.min(math.min(len, length), dest.length - start), 0) - if (copied > 0) { - System.arraycopy(bytes, 0, dest, start, copied) + val toCopy = math.min(math.min(len, length), dest.length - start) + if (toCopy > 0) { + Array.copy(bytes, startIndex, dest, start, toCopy) } - copied + toCopy } protected def writeReplace(): AnyRef = new SerializationProxy(this) @@ -645,20 +645,19 @@ object ByteString { } override def copyToArray[B >: Byte](dest: Array[B], start: Int, len: Int): Int = { - if (isCompact) bytestrings.head.copyToArray(dest, start, len) + if (bytestrings.size == 1) bytestrings.head.copyToArray(dest, start, len) else { - // min of the bytes available top copy, bytes there is room for in dest and the requested number of bytes - val copied = math.min(math.min(len, length), dest.length - start) - if (copied > 0) { + // min of the bytes available to copy, bytes there is room for in dest and the requested number of bytes + val totalToCopy = math.min(math.min(len, length), dest.length - start) + if (totalToCopy > 0) { val bsIterator = bytestrings.iterator - var pos = 0 - while (pos < copied) { + var copied = 0 + while (copied < totalToCopy) { val current = bsIterator.next() - val copied = current.copyToArray(dest, pos, len - pos) - pos += copied + copied += current.copyToArray(dest, start + copied, totalToCopy - copied) } } - copied + totalToCopy } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index aeb93e95bd..18c6001a78 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -289,6 +289,7 @@ final case class UnhandledMessage( @BeanProperty recipient: ActorRef) extends NoSerializationVerificationNeeded with WrappedMessage + with AllDeadLetters /** * Classes for passing status back to the sender. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 24ea086b3c..83c9ff47a3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -7,10 +7,11 @@ package akka.actor import java.util.concurrent.ConcurrentHashMap import akka.annotation.InternalApi - import scala.annotation.tailrec import scala.collection.immutable import scala.util.control.NonFatal + +import akka.annotation.DoNotInherit import akka.dispatch._ import akka.dispatch.sysmsg._ import akka.event.AddressTerminatedTopic @@ -478,8 +479,11 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { /** * Subscribe to this class to be notified about all [[DeadLetter]] (also the suppressed ones) * and [[Dropped]]. + * + * Not for user extension */ -sealed trait AllDeadLetters extends WrappedMessage { +@DoNotInherit +trait AllDeadLetters extends WrappedMessage { def message: Any def sender: ActorRef def recipient: ActorRef diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index fb7649b409..0455a5f11d 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -15,6 +15,7 @@ import akka.actor.DeadLetter import akka.actor.DeadLetterActorRef import akka.actor.DeadLetterSuppression import akka.actor.Dropped +import akka.actor.UnhandledMessage import akka.actor.WrappedMessage import akka.event.Logging.Info import akka.util.PrettyDuration._ @@ -29,6 +30,7 @@ class DeadLetterListener extends Actor { override def preStart(): Unit = { eventStream.subscribe(self, classOf[DeadLetter]) eventStream.subscribe(self, classOf[Dropped]) + eventStream.subscribe(self, classOf[UnhandledMessage]) } // don't re-subscribe, skip call to preStart @@ -115,6 +117,10 @@ class DeadLetterListener extends Actor { val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else "" s"Message [$messageStr]$wrappedIn$origin$destination was dropped. ${dropped.reason}. " + s"[$count] dead letters encountered$doneMsg. " + case _: UnhandledMessage => + val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else "" + s"Message [$messageStr]$wrappedIn$origin$destination was unhandled. " + + s"[$count] dead letters encountered$doneMsg. " case _ => s"Message [$messageStr]$wrappedIn$origin to ${d.recipient} was not delivered. " + s"[$count] dead letters encountered$doneMsg. " + diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index c2703d7929..5cd62aab21 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -1637,6 +1637,8 @@ class LogMarker(val name: String, val properties: Map[String, Any]) { import akka.util.ccompat.JavaConverters._ properties.map { case (k, v) => (k, v.asInstanceOf[AnyRef]) }.asJava } + + override def toString: String = s"LogMarker($name,$properties)" } object LogMarker { diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 9caa219b3c..005f45f594 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -116,8 +116,8 @@ object CircuitBreaker { } /** - * Provides circuit breaker functionality to provide stability when working with "dangerous" operations, e.g. calls to - * remote systems + * Provides circuit breaker functionality for stability when working with "dangerous" operations, e.g. calls to + * remote systems. * * Transitions through three states: * - In *Closed* state, calls pass through until the `maxFailures` count is reached. This causes the circuit breaker @@ -244,7 +244,7 @@ class CircuitBreaker( Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] /** - * Wraps invocations of asynchronous calls that need to be protected + * Wraps invocations of asynchronous calls that need to be protected. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -255,7 +255,7 @@ class CircuitBreaker( currentState.invoke(body, defineFailureFn) /** - * Wraps invocations of asynchronous calls that need to be protected + * Wraps invocations of asynchronous calls that need to be protected. * * @param body Call needing protected * @return [[scala.concurrent.Future]] containing the call result or a @@ -266,7 +266,7 @@ class CircuitBreaker( currentState.invoke(body, CircuitBreaker.exceptionAsFailure) /** - * Java API for [[#withCircuitBreaker]] + * Java API for [[#withCircuitBreaker]]. * * @param body Call needing protected * @return [[scala.concurrent.Future]] containing the call result or a @@ -276,7 +276,7 @@ class CircuitBreaker( callWithCircuitBreaker(body, CircuitBreaker.exceptionAsFailureJava[T]) /** - * Java API for [[#withCircuitBreaker]] + * Java API for [[#withCircuitBreaker]]. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -292,7 +292,7 @@ class CircuitBreaker( } /** - * Java API (8) for [[#withCircuitBreaker]] + * Java API (8) for [[#withCircuitBreaker]]. * * @param body Call needing protected * @return [[java.util.concurrent.CompletionStage]] containing the call result or a @@ -302,7 +302,7 @@ class CircuitBreaker( callWithCircuitBreakerCS(body, CircuitBreaker.exceptionAsFailureJava) /** - * Java API (8) for [[#withCircuitBreaker]] + * Java API (8) for [[#withCircuitBreaker]]. * * @param body Call needing protected * @param defineFailureFn function that define what should be consider failure and thus increase failure count @@ -317,10 +317,10 @@ class CircuitBreaker( }, defineFailureFn)) /** - * Wraps invocations of synchronous calls that need to be protected + * Wraps invocations of synchronous calls that need to be protected. * - * Calls are run in caller's thread. Because of the synchronous nature of - * this call the `scala.concurrent.TimeoutException` will only be thrown + * Calls are run in the caller's thread. Because of the synchronous nature of + * this call, the `scala.concurrent.TimeoutException` will only be thrown * after the body has completed. * * Throws java.util.concurrent.TimeoutException if the call timed out. @@ -332,7 +332,7 @@ class CircuitBreaker( withSyncCircuitBreaker(body, CircuitBreaker.exceptionAsFailure) /** - * Wraps invocations of synchronous calls that need to be protected + * Wraps invocations of synchronous calls that need to be protected. * * Calls are run in caller's thread. Because of the synchronous nature of * this call the `scala.concurrent.TimeoutException` will only be thrown @@ -510,7 +510,7 @@ class CircuitBreaker( } /** - * Adds a callback to execute when call finished with failure. + * Adds a callback to execute if the call finished with failure. * * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * @@ -523,7 +523,7 @@ class CircuitBreaker( }) /** - * JavaAPI for onCallFailure + * JavaAPI for onCallFailure. * * @param callback Handler to be invoked on failed call, where passed value is elapsed time in nanoseconds. * @return CircuitBreaker for fluent usage @@ -534,7 +534,7 @@ class CircuitBreaker( } /** - * Adds a callback to execute when call finished with timeout. + * Adds a callback to execute if a call finished with timeout. * * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * @@ -547,7 +547,7 @@ class CircuitBreaker( }) /** - * JavaAPI for onCallTimeout + * JavaAPI for onCallTimeout. * * @param callback Handler to be invoked on call finished with timeout, where passed value is elapsed time in nanoseconds. * @return CircuitBreaker for fluent usage @@ -558,7 +558,7 @@ class CircuitBreaker( } /** - * Adds a callback to execute when call was failed due to open breaker. + * Adds a callback to execute if call was failed due to open breaker. * * The callback is run in the [[scala.concurrent.ExecutionContext]] supplied in the constructor. * @@ -607,7 +607,6 @@ class CircuitBreaker( /** * Resets breaker to a closed state. This is valid from an Half-Open state only. - * */ private def resetBreaker(): Unit = transition(HalfOpen, Closed) @@ -672,7 +671,6 @@ class CircuitBreaker( /** * Attempts to reset breaker by transitioning to a half-open state. This is valid from an Open state only. - * */ private def attemptReset(): Unit = transition(Open, HalfOpen) diff --git a/akka-docs/src/main/paradox/common/circuitbreaker.md b/akka-docs/src/main/paradox/common/circuitbreaker.md index deb957dba1..220deae574 100644 --- a/akka-docs/src/main/paradox/common/circuitbreaker.md +++ b/akka-docs/src/main/paradox/common/circuitbreaker.md @@ -92,7 +92,7 @@ Java @@@ note -Using the `CircuitBreaker` companion object's @scala[*apply*]@java[*create*] method +Using the `CircuitBreaker`'s companion object @scala[*apply*]@java[*create*] method will return a `CircuitBreaker` where callbacks are executed in the caller's thread. This can be useful if the asynchronous `Future` behavior is unnecessary, for example invoking a synchronous-only API. @@ -101,11 +101,11 @@ example invoking a synchronous-only API. ### Control failure count explicitly -By default, the circuit breaker treat `Exception` as failure in synchronized API, or failed `Future` as failure in future based API. -Failure will increment failure count, when failure count reach the *maxFailures*, circuit breaker will be opened. -However, some applications might requires certain exception to not increase failure count, or vice versa, -sometime we want to increase the failure count even if the call succeeded. -Akka circuit breaker provides a way to achieve such use case: +By default, the circuit breaker treats `Exception` as failure in synchronized API, or failed `Future` as failure in future based API. +On failure, the failure count will increment. If the failure count reaches the *maxFailures*, the circuit breaker will be opened. +However, some applications may require certain exceptions to not increase the failure count. +In other cases one may want to increase the failure count even if the call succeeded. +Akka circuit breaker provides a way to achieve such use cases: * `withCircuitBreaker` * `withSyncCircuitBreaker` @@ -113,7 +113,7 @@ Akka circuit breaker provides a way to achieve such use case: * `callWithCircuitBreakerCS` * `callWithSyncCircuitBreaker` -All methods above accepts an argument `defineFailureFn` +All methods above accept an argument `defineFailureFn` Type of `defineFailureFn`: @scala[`Try[T] => Boolean`]@java[`BiFunction[Optional[T], Optional[Throwable], java.lang.Boolean]`] @@ -128,9 +128,14 @@ Java ### Low level API -The low-level API allows you to describe the behavior of the CircuitBreaker in detail, including deciding what to return to the calling `Actor` in case of success or failure. This is especially useful when expecting the remote call to send a reply. CircuitBreaker doesn't support `Tell Protection` (protecting against calls that expect a reply) natively at the moment, so you need to use the low-level power-user APIs, `succeed` and `fail` methods, as well as `isClose`, `isOpen`, `isHalfOpen` to implement it. +The low-level API allows you to describe the behavior of the CircuitBreaker in detail, including deciding what to return to the calling `Actor` in case of success or failure. This is especially useful when expecting the remote call to send a reply. +CircuitBreaker doesn't support `Tell Protection` (protecting against calls that expect a reply) natively at the moment. +Thus you need to use the low-level power-user APIs, `succeed` and `fail` methods, as well as `isClose`, `isOpen`, `isHalfOpen` to implement it. -As can be seen in the examples below, a `Tell Protection` pattern could be implemented by using the `succeed` and `fail` methods, which would count towards the `CircuitBreaker` counts. In the example, a call is made to the remote service if the `breaker.isClosed`, and once a response is received, the `succeed` method is invoked, which tells the `CircuitBreaker` to keep the breaker closed. If on the other hand an error or timeout is received, we trigger a `fail` and the breaker accrues this failure towards its count for opening the breaker. +As can be seen in the examples below, a `Tell Protection` pattern could be implemented by using the `succeed` and `fail` methods, which would count towards the `CircuitBreaker` counts. +In the example, a call is made to the remote service if the `breaker.isClosed`. +Once a response is received, the `succeed` method is invoked, which tells the `CircuitBreaker` to keep the breaker closed. +On the other hand, if an error or timeout is received we trigger a `fail`, and the breaker accrues this failure towards its count for opening the breaker. @@@ note diff --git a/akka-docs/src/main/paradox/dispatchers.md b/akka-docs/src/main/paradox/dispatchers.md index 6530aa312c..8146f0e7e3 100644 --- a/akka-docs/src/main/paradox/dispatchers.md +++ b/akka-docs/src/main/paradox/dispatchers.md @@ -48,7 +48,7 @@ Another example that uses the "thread-pool-executor": @@@ note -The thread pool executor dispatcher is implemented using by a `java.util.concurrent.ThreadPoolExecutor`. +The thread pool executor dispatcher is implemented using a `java.util.concurrent.ThreadPoolExecutor`. You can read more about it in the JDK's [ThreadPoolExecutor documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html). @@@ diff --git a/akka-docs/src/main/paradox/logging.md b/akka-docs/src/main/paradox/logging.md index 0552d47b31..0f3d0bf1b6 100644 --- a/akka-docs/src/main/paradox/logging.md +++ b/akka-docs/src/main/paradox/logging.md @@ -50,18 +50,18 @@ class MyActor extends Actor with akka.actor.ActorLogging { @@@ The first parameter to @scala[`Logging`] @java[`Logging.getLogger`] could also be any -`LoggingBus`, specifically @scala[`system.eventStream`] @scala[`system.eventStream()`]; in the demonstrated -case, the actor system's address is included in the `akkaSource` -representation of the log source (see @ref:[Logging Thread, Akka Source and Actor System in MDC](#logging-thread-akka-source-and-actor-system-in-mdc)) +`LoggingBus`, specifically @scala[`system.eventStream`] @scala[`system.eventStream()`]. +In the demonstrated case, the actor system's address is included in the `akkaSource` +representation of the log source (see @ref:[Logging Thread, Akka Source and Actor System in MDC](#logging-thread-akka-source-and-actor-system-in-mdc)), while in the second case this is not automatically done. The second parameter to @scala[`Logging`] @java[`Logging.getLogger`] is the source of this logging channel. The source object is translated to a String according to the following rules: * if it is an Actor or ActorRef, its path is used * in case of a String it is used as is - * in case of a class an approximation of its simpleName - * and in all other cases @scala[a compile error occurs unless an implicit -`LogSource[T]` is in scope for the type in question] @java[the simpleName of its class] + * in case of a Class an approximation of its `simpleName` is used + * in all other cases @scala[a compile error occurs unless an implicit +`LogSource[T]` is in scope for the type in question] @java[the `simpleName` of its class] is used The log message may contain argument placeholders `{}`, which will be substituted if the log level is enabled. Giving more arguments than diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 2737512567..d6c6ec0bcc 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -3,6 +3,9 @@ project.description: Migrating to Akka 2.6. --- # Migration Guide 2.5.x to 2.6.x +An overview of the changes in Akka 2.6 is presented in the [What's new in Akka 2.6 video](https://akka.io/blog/news/2019/12/12/akka-26-intro) +and the [release announcement](https://akka.io/blog/news/2019/11/06/akka-2.6.0-released). + Akka 2.6.x is binary backwards compatible with 2.5.x with the ordinary exceptions listed in the @ref:[Binary Compatibility Rules](../common/circuitbreaker.md). diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md index 229c3bbd15..a71b8b3eb2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md @@ -16,6 +16,19 @@ End the current substream whenever a predicate returns `true`, starting a new su End the current substream whenever a predicate returns `true`, starting a new substream for the next element. +## Example + +Given some time series data source we would like to split the stream into sub-streams for each second. +By using `sliding` we can compare the timestamp of the current and next element to decide when to split. + +Scala +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala) { #splitAfter } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java) { #splitAfter } + +An alternative way of implementing this is shown in @ref:[splitWhen example](splitWhen.md#example). + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md index df0f131146..609afe8ca2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md @@ -16,6 +16,20 @@ Split off elements into a new substream whenever a predicate function return `tr Split off elements into a new substream whenever a predicate function return `true`. +## Example + +Given some time series data source we would like to split the stream into sub-streams for each second. +We need to compare the timestamp of the previous and current element to decide when to split. This +decision can be implemented in a `statefulMapConcat` operator preceding the `splitWhen`. + +Scala +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala) { #splitWhen } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java) { #splitWhen } + +An alternative way of implementing this is shown in @ref:[splitAfter example](splitAfter.md#example). + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/typed/actors.md b/akka-docs/src/main/paradox/typed/actors.md index 8baa98706b..2c3d16b9e0 100644 --- a/akka-docs/src/main/paradox/typed/actors.md +++ b/akka-docs/src/main/paradox/typed/actors.md @@ -34,7 +34,8 @@ systems. The API of Akka’s Actors has borrowed some of its syntax from Erlang. ## First example If you are new to Akka you might want to start with reading the @ref:[Getting Started Guide](guide/introduction.md) -and then come back here to learn more. +and then come back here to learn more. We also recommend watching the short +[introduction video to Akka actors](https://akka.io/blog/news/2019/12/03/akka-typed-actor-intro-video). It is helpful to become familiar with the foundational, external and internal ecosystem of your Actors, to see what you can leverage and customize as needed, see @@ -132,7 +133,7 @@ Scala Java : @@snip [IntroSpec.scala](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/IntroTest.java) { #hello-world } -We start an Actor system from the defined `HelloWorldMain` behavior and send two `Start` messages that +We start an Actor system from the defined `HelloWorldMain` behavior and send two `SayHello` messages that will kick-off the interaction between two separate `HelloWorldBot` actors and the single `Greeter` actor. An application normally consists of a single `ActorSystem`, running many actors, per JVM. diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 6c10cc5f8a..9355d143d3 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -27,6 +27,9 @@ It could for example be actors representing Aggregate Roots in Domain-Driven Des Here we call these actors "entities". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state. +The [Introduction to Akka Cluster Sharding video](https://akka.io/blog/news/2019/12/16/akka-cluster-sharding-intro-video) +is a good starting point for learning Cluster Sharding. + Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a @ref:[Cluster Singleton](cluster-singleton.md) node. diff --git a/akka-docs/src/main/paradox/typed/dispatchers.md b/akka-docs/src/main/paradox/typed/dispatchers.md index 40fe453edd..715d56e438 100644 --- a/akka-docs/src/main/paradox/typed/dispatchers.md +++ b/akka-docs/src/main/paradox/typed/dispatchers.md @@ -209,8 +209,8 @@ The thread information was recorded using the YourKit profiler, however any good has this feature (including the free and bundled with the Oracle JDK [VisualVM](https://visualvm.github.io/), as well as [Java Mission Control](https://openjdk.java.net/projects/jmc/)). The orange portion of the thread shows that it is idle. Idle threads are fine - -they're ready to accept new work. However, large amount of turquoise (blocked, or sleeping as in our example) threads -is very bad and leads to thread starvation. +they're ready to accept new work. However, a large number of turquoise (blocked, or sleeping as in our example) threads +leads to thread starvation. @@@ note @@ -267,7 +267,7 @@ unless you @ref:[set up a separate dispatcher for the actor](../dispatchers.md#s ### Solution: Dedicated dispatcher for blocking operations -One of the most efficient methods of isolating the blocking behavior such that it does not impact the rest of the system +One of the most efficient methods of isolating the blocking behavior, such that it does not impact the rest of the system, is to prepare and use a dedicated dispatcher for all those blocking operations. This technique is often referred to as "bulk-heading" or simply "isolating blocking". diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index a7af0457ba..1f0780986b 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -30,6 +30,9 @@ allows for very high transaction rates and efficient replication. A stateful act events to the actor, allowing it to rebuild its state. This can be either the full history of changes or starting from a checkpoint in a snapshot which can dramatically reduce recovery times. +The [Event Sourcing with Akka 2.6 video](https://akka.io/blog/news/2020/01/07/akka-event-sourcing-video) +is a good starting point for learning Event Sourcing. + @@@ note The General Data Protection Regulation (GDPR) requires that personal information must be deleted at the request of users. diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java new file mode 100644 index 0000000000..f9530d3bb8 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.japi.function.Creator; +import akka.japi.function.Function; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Collections; + +public class Split { + public static void splitWhenExample(String[] args) { + ActorSystem system = ActorSystem.create(); + + // #splitWhen + Source.range(1, 100) + .throttle(1, Duration.ofMillis(100)) + .map(elem -> new Pair<>(elem, Instant.now())) + .statefulMapConcat( + () -> { + return new Function, Iterable>>() { + // stateful decision in statefulMapConcat + // keep track of time bucket (one per second) + LocalDateTime currentTimeBucket = + LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC); + + @Override + public Iterable> apply( + Pair elemTimestamp) { + LocalDateTime time = + LocalDateTime.ofInstant(elemTimestamp.second(), ZoneOffset.UTC); + LocalDateTime bucket = time.withNano(0); + boolean newBucket = !bucket.equals(currentTimeBucket); + if (newBucket) currentTimeBucket = bucket; + return Collections.singleton(new Pair<>(elemTimestamp.first(), newBucket)); + } + }; + }) + .splitWhen(elemDecision -> elemDecision.second()) // split when time bucket changes + .map(elemDecision -> elemDecision.first()) + .fold(0, (acc, notUsed) -> acc + 1) // sum + .to(Sink.foreach(System.out::println)) + .run(system); + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 7 + // #splitWhen + } + + public static void splitAfterExample(String[] args) { + ActorSystem system = ActorSystem.create(); + + // #splitAfter + Source.range(1, 100) + .throttle(1, Duration.ofMillis(100)) + .map(elem -> new Pair<>(elem, Instant.now())) + .sliding(2, 1) + .splitAfter( + slidingElements -> { + if (slidingElements.size() == 2) { + Pair current = slidingElements.get(0); + Pair next = slidingElements.get(1); + LocalDateTime currentBucket = + LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0); + LocalDateTime nextBucket = + LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0); + return !currentBucket.equals(nextBucket); + } else { + return false; + } + }) + .map(slidingElements -> slidingElements.get(0).first()) + .fold(0, (acc, notUsed) -> acc + 1) // sum + .to(Sink.foreach(System.out::println)) + .run(system); + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 6 + // note that the very last element is never included due to sliding, + // but that would not be problem for an infinite stream + // #splitAfter + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala new file mode 100644 index 0000000000..fed0a0fe6b --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset + +import scala.concurrent.duration._ + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Split { + def splitWhenExample(args: Array[String]): Unit = { + import akka.actor.ActorSystem + + implicit val system: ActorSystem = ActorSystem() + + //#splitWhen + Source(1 to 100) + .throttle(1, 100.millis) + .map(elem => (elem, Instant.now())) + .statefulMapConcat(() => { + // stateful decision in statefulMapConcat + // keep track of time bucket (one per second) + var currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC) + + { + case (elem, timestamp) => + val time = LocalDateTime.ofInstant(timestamp, ZoneOffset.UTC) + val bucket = time.withNano(0) + val newBucket = bucket != currentTimeBucket + if (newBucket) + currentTimeBucket = bucket + List((elem, newBucket)) + } + }) + .splitWhen(_._2) // split when time bucket changes + .map(_._1) + .fold(0)((acc, _) => acc + 1) // sum + .to(Sink.foreach(println)) + .run() + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 7 + //#splitWhen + } + + def splitAfterExample(args: Array[String]): Unit = { + import akka.actor.ActorSystem + + implicit val system: ActorSystem = ActorSystem() + + //#splitAfter + Source(1 to 100) + .throttle(1, 100.millis) + .map(elem => (elem, Instant.now())) + .sliding(2) + .splitAfter { slidingElements => + if (slidingElements.size == 2) { + val current = slidingElements.head + val next = slidingElements.tail.head + val currentBucket = LocalDateTime.ofInstant(current._2, ZoneOffset.UTC).withNano(0) + val nextBucket = LocalDateTime.ofInstant(next._2, ZoneOffset.UTC).withNano(0) + currentBucket != nextBucket + } else { + false + } + } + .map(_.head._1) + .fold(0)((acc, _) => acc + 1) // sum + .to(Sink.foreach(println)) + .run() + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 6 + // note that the very last element is never included due to sliding, + // but that would not be problem for an infinite stream + //#splitAfter + } + +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index 5e2af1f335..067da8585e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -13,6 +13,7 @@ object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventE def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope = new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) + @deprecated("for binary compatibility", "2.6.2") override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope = new EventEnvelope(offset, persistenceId, sequenceNr, event) @@ -37,7 +38,7 @@ final class EventEnvelope( extends Product4[Offset, String, Long, Any] with Serializable { - // for binary compatibility + @deprecated("for binary compatibility", "2.6.2") def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) = this(offset, persistenceId, sequenceNr, event, 0L) diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala index b2c5170c2f..c64466e18b 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/EventsByTagSpec.scala @@ -86,18 +86,18 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with greenSrc .runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) - .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple", 0L)) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana", 0L)) .expectNoMessage(500.millis) .request(2) - .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L)) .expectComplete() val blackSrc = queries.currentEventsByTag(tag = "black", offset = Sequence(0L)) blackSrc .runWith(TestSink.probe[Any]) .request(5) - .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")) + .expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car", 0L)) .expectComplete() } @@ -108,8 +108,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with val probe = greenSrc .runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple")) - .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana")) + .expectNext(EventEnvelope(Sequence(1L), "a", 2L, "a green apple", 0L)) + .expectNext(EventEnvelope(Sequence(2L), "a", 3L, "a green banana", 0L)) .expectNoMessage(100.millis) c ! "a green cucumber" @@ -118,7 +118,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with probe .expectNoMessage(100.millis) .request(5) - .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L)) .expectComplete() // green cucumber not seen } @@ -128,8 +128,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with .runWith(TestSink.probe[Any]) .request(10) // note that banana is not included, since exclusive offset - .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) - .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L)) + .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber", 0L)) .expectComplete() } @@ -145,15 +145,15 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with val pinkSrc = queries.currentEventsByTag(tag = "pink") val probe = pinkSrc.runWith(TestSink.probe[Any]) - probe.request(1).expectNext(EventEnvelope(Sequence(1L), "z", 1L, "a pink apple")) + probe.request(1).expectNext(EventEnvelope(Sequence(1L), "z", 1L, "a pink apple", 0L)) system.log.info("delay before demand") probe .expectNoMessage(200.millis) .request(3) - .expectNext(EventEnvelope(Sequence(2L), "z", 2L, "a pink banana")) - .expectNext(EventEnvelope(Sequence(3L), "z", 3L, "a pink orange")) + .expectNext(EventEnvelope(Sequence(2L), "z", 2L, "a pink banana", 0L)) + .expectNext(EventEnvelope(Sequence(3L), "z", 3L, "a pink orange", 0L)) .expectComplete() } @@ -179,7 +179,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with try { - probe.request(2).expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car")).expectNoMessage(100.millis) + probe.request(2).expectNext(EventEnvelope(Sequence(1L), "b", 1L, "a black car", 0L)).expectNoMessage(100.millis) d ! "a black dog" expectMsg(s"a black dog-done") @@ -187,10 +187,10 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with expectMsg(s"a black night-done") probe - .expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog")) + .expectNext(EventEnvelope(Sequence(2L), "d", 1L, "a black dog", 0L)) .expectNoMessage(100.millis) .request(10) - .expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night")) + .expectNext(EventEnvelope(Sequence(3L), "d", 2L, "a black night", 0L)) } finally { probe.cancel() } @@ -203,8 +203,8 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with probe .request(10) // note that banana is not included, since exclusive offset - .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf")) - .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber")) + .expectNext(EventEnvelope(Sequence(3L), "b", 2L, "a green leaf", 0L)) + .expectNext(EventEnvelope(Sequence(4L), "c", 1L, "a green cucumber", 0L)) .expectNoMessage(100.millis) } finally { probe.cancel() @@ -222,7 +222,7 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with val probe = yellowSrc .runWith(TestSink.probe[Any]) .request(2) - .expectNext(EventEnvelope(Sequence(1L), "y", 1L, "a yellow car")) + .expectNext(EventEnvelope(Sequence(1L), "y", 1L, "a yellow car", 0L)) .expectNoMessage(100.millis) d ! "a yellow dog" @@ -231,10 +231,10 @@ class EventsByTagSpec extends AkkaSpec(EventsByTagSpec.config) with Cleanup with expectMsg(s"a yellow night-done") probe - .expectNext(EventEnvelope(Sequence(2L), "y", 2L, "a yellow dog")) + .expectNext(EventEnvelope(Sequence(2L), "y", 2L, "a yellow dog", 0L)) .expectNoMessage(100.millis) .request(10) - .expectNext(EventEnvelope(Sequence(3L), "y", 3L, "a yellow night")) + .expectNext(EventEnvelope(Sequence(3L), "y", 3L, "a yellow night", 0L)) probe.cancel() } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 75ba1ce3c0..263c3d039d 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -461,7 +461,7 @@ class EventSourcedBehaviorSpec replyProbe.expectMessage(State(1, Vector(0))) val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1))) + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1), 0L)) } "handle scheduled message arriving before recovery completed " in { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala index 3e7f79e6b8..41391ce6a2 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedEventAdapterSpec.scala @@ -191,7 +191,7 @@ class EventSourcedEventAdapterSpec replyProbe.expectMessage(State(1, Vector(0))) val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)), 0L)) val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) @@ -212,8 +212,8 @@ class EventSourcedEventAdapterSpec val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue events shouldEqual List( - EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))), - EventEnvelope(Sequence(2), pid.id, 2, GenericWrapper(Incremented(1)))) + EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)), 0L), + EventEnvelope(Sequence(2), pid.id, 2, GenericWrapper(Incremented(1)), 0L)) val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) @@ -232,7 +232,7 @@ class EventSourcedEventAdapterSpec replyProbe.expectMessage(State(1, Vector(0))) val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue - events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)), 0L)) val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event]))) @@ -240,7 +240,7 @@ class EventSourcedEventAdapterSpec replyProbe.expectMessage(State(1, Vector(0))) val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue - taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)))) + taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1)), 0L)) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala index 37982c28c2..da6cd3aabe 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2019 Lightbend Inc. + * Copyright (C) 2017-2020 Lightbend Inc. */ package akka.persistence.journal.inmem diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 0f1d905732..a55a421ae0 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -27,8 +27,8 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { val silencerVersion = "1.4.4" Seq( libraryDependencies ++= Seq( - compilerPlugin("com.github.ghik" %% "silencer-plugin" % silencerVersion cross CrossVersion.patch), - "com.github.ghik" %% "silencer-lib" % silencerVersion % Provided cross CrossVersion.patch)) + compilerPlugin(("com.github.ghik" %% "silencer-plugin" % silencerVersion).cross(CrossVersion.patch)), + ("com.github.ghik" %% "silencer-lib" % silencerVersion % Provided).cross(CrossVersion.patch))) } lazy val disciplineSettings = @@ -39,7 +39,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { else Seq.empty ), Test / scalacOptions --= testUndicipline, - Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"), Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, 13)) => disciplineScalacOptions -- Set( @@ -61,7 +60,9 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { // different compiler phases from the regular run), and in particular // '-Ywarn-unused:explicits' breaks 'sbt ++2.13.0-M5 akka-actor/doc' // https://github.com/akka/akka/issues/26119 - Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings") + Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings", + // having discipline warnings in console is just an annoyance + Compile / console / scalacOptions --= disciplineScalacOptions.toSeq) val testUndicipline = Seq( "-Ywarn-dead-code", // ??? used in compile only specs diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 76b35c87ed..4fd6366cfa 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,12 +16,12 @@ object Dependencies { lazy val java8CompatVersion = settingKey[String]("The version of scala-java8-compat to use.") val junitVersion = "4.13" - val slf4jVersion = "1.7.29" + val slf4jVersion = "1.7.30" // check agrona version when updating this - val aeronVersion = "1.24.0" + val aeronVersion = "1.25.0" // needs to be inline with the aeron version, check // https://github.com/real-logic/aeron/blob/1.x.y/build.gradle - val agronaVersion = "1.1.0" + val agronaVersion = "1.2.0" val nettyVersion = "3.10.6.Final" val jacksonVersion = "2.10.2" val protobufJavaVersion = "3.10.0" @@ -37,7 +37,7 @@ object Dependencies { val Versions = Seq( crossScalaVersions := Seq(scala212Version, scala213Version), scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head), - scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse("1.14.2"), + scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse("1.14.3"), scalaTestVersion := "3.1.0", scalaTestPlusVersion := "3.1.0.0", java8CompatVersion := { @@ -128,8 +128,8 @@ object Dependencies { val dockerClient = "com.spotify" % "docker-client" % "8.16.0" % "test" // ApacheV2 // metrics, measurements, perf testing - val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.1" % "test" // ApacheV2 - val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.1" % "test" // ApacheV2 + val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.2" % "test" // ApacheV2 + val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.2" % "test" // ApacheV2 val latencyUtils = "org.latencyutils" % "LatencyUtils" % "2.0.3" % "test" // Free BSD val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.12" % "test" // CC0 val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) diff --git a/project/plugins.sbt b/project/plugins.sbt index 08171600e1..8a34ef0317 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -14,7 +14,7 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.11") // sbt-osgi 0.9.5 is available but breaks including jdk9-only classes addSbtPlugin("com.typesafe.sbt" % "sbt-osgi" % "0.9.4") addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.6.1") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.2") addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.7")