class OrderedBucketMergeHub[Name, A, Config, Offset, M] extends GraphStageWithMaterializedValue[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]] with NamedLogging

A custom Pekko org.apache.pekko.stream.stage.GraphStage that merges several ordered source streams into one based on those sources reaching a threshold for equivalent elements.

The ordered sources produce elements with totally ordered offsets. For a given threshold t, whenever t different sources have produced equivalent elements for an offset that is higher than the previous offset, the OrderedBucketMergeHub emits the map of all these equivalent elements as the next com.digitalasset.canton.util.OrderedBucketMergeHub.OutputElement to downstream. Elements from the other ordered sources with lower or equal offset that have not yet reached the threshold are dropped.

Every correct ordered source should produce the same sequence of offsets. Faulty sources can produce any sequence of elements as they like. The threshold should be set to F+1 where at most F sources are assumed to be faulty, and at least 2F+1 ordered sources should be configured. This ensures that the F faulty ordered sources cannot corrupt the stream nor block it.

If this assumption is violated, the OrderedBucketMergeHub may deadlock, as it only looks at the next element of each ordered source (this avoids unbounded buffering and therefore ensures that downstream backpressure reaches the ordered sources). For example, given a threshold of 2 with three ordered sources, two of which are faulty, the first elements of the sources have offsets 1, 2, 3. Suppose that the first ordered source's second element had offset 3 and is equivalent to the third ordered source's first element. Then, by the above definition of merging, the stage could emit the elements with offset 3 and discard those with 1 and 2. However, this is not yet implemented; the stream just does not emit anything. Neither are such deadlocks detected right now. This is because in an asynchronous system, there typically are ordered sources that have not yet delivered their next element, and possibly may never will within useful time, say because they have crashed (which is not considered a fault). In the above example, suppose that the second ordered source had not emitted the element with offset 2. Then it is unknown whether the element with offset 1 should be emitted or not, because we do not know which ordered sources are correct. Suppose we had decided that we drop the elements with offset 1 from a correct ordered source and emit the ones with offset 3 instead, Then the second (delayed, but correct) ordered source can still send an equivalent element with 1, and so the decision of dropping 1 was wrong in hindsight.

The OrderedBucketMergeHub manages the ordered sources. Their configurations and the threshold are coming through the OrderedBucketMergeHub's input stream as a OrderedBucketMergeConfig. As soon as a new OrderedBucketMergeConfig is available, the OrderedBucketMergeHub changes the ordered sources as necessary:

- Ordered sources are identified by their Name. - Existing ordered sources whose name does not appear in the new configuration are stopped. - If a new configuration contains a new name for an ordered source, a new ordered source is created using ops. - If the configuration of an ordered source changes, the previous source is stopped and a new one with the new configuration is created.

The OrderedBucketMergeHub emits com.digitalasset.canton.util.OrderedBucketMergeHub.ControlOutput events to downstream:

- com.digitalasset.canton.util.OrderedBucketMergeHub.NewConfiguration signals the new configuration in place. - com.digitalasset.canton.util.OrderedBucketMergeHub.ActiveSourceTerminated signals that an ordered source has completed or aborted with an error before it was stopped.

Since configuration changes are consumed eagerly, the OrderedBucketMergeHub buffers these com.digitalasset.canton.util.OrderedBucketMergeHub.ControlOutput events if downstream is not consuming them fast enough. The stream of configuration changes should therefore be slower than downstream; otherwise, the buffer will grow unboundedly and lead to java.lang.OutOfMemoryErrors eventually.

When the configuration stream completes or aborts, all ordered sources are stopped and the output stream completes.

An ordered source is stopped by pulling its org.apache.pekko.stream.KillSwitch and dropping all elements until the source completes or aborts. In particular, the ordered source is not just simply cancelled upon a configuration change or when the configuration stream completes. This allows for properly synchronizing the completion of the OrderedBucketMergeHub with the internal computations happening in the ordered sources. To that end, the OrderedBucketMergeHub materializes to a scala.concurrent.Future that completes when the corresponding futures from all created ordered sources have completed as well as the ordered sources themselves.

If downstream cancels, the OrderedBucketMergeHub cancels all sources and the input port, without draining them. Therefore, the materialized scala.concurrent.Future may or may not complete, depending on the shape of the ordered sources. For example, if the ordered sources' futures are created with a plain org.apache.pekko.stream.scaladsl.FlowOpsMat.watchTermination, it will complete because org.apache.pekko.stream.scaladsl.FlowOpsMat.watchTermination completes immediately when it sees a cancellation. Therefore, it is better to avoid downstream cancellations altogether.

Rationale for the merging logic:

This graph stage is meant to merge the streams of sequenced events from several sequencers on a client node. The operator configures N sequencer connections and specifies a threshold T. Suppose the operator assumes that at most F nodes out of N are faulty. So we need F < T for safety. For liveness, the operator wants to tolerate as many crashes of correct sequencer nodes as feasible. Let C be the number of tolerated crashes. Then T <= N - C - F because faulty sequencers may not deliver any messages. For a fixed F, T = F + 1 is optimal as we can then tolerate C = N - 2F - 1 crashed sequencer nodes.

In other words, if the operator wants to tolerate up to F faults and up to C crashes, then it should set T = F + 1 and configure N = 2F + C + 1 different sequencer connections.

If more than C sequencers have crashed, then the faulty sequencers can make the client deadlock. The client cannot detect this under the asynchrony assumption.

Moreover, the client cannot distinguish either between whether a sequencer node is actively malicious or just accidentally faulty. In particular, if several sequencer nodes deliver inequivalent events, we currently silently drop them. TODO(#14365) Design and implement an alert mechanism

Linear Supertypes
NamedLogging, GraphStageWithMaterializedValue[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]], Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]], AnyRef, Any
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. OrderedBucketMergeHub
  2. NamedLogging
  3. GraphStageWithMaterializedValue
  4. Graph
  5. AnyRef
  6. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. Protected

Instance Constructors

  1. new OrderedBucketMergeHub(ops: OrderedBucketMergeHubOps[Name, A, Config, Offset, M], loggerFactory: NamedLoggerFactory, enableInvariantCheck: Boolean)(implicit arg0: Pretty[Name], arg1: Pretty[Offset])

    ops

    The operations for the abstracted-away parameters. In particular, the equivalence relation between elements is expressed as the pre-image of the equals relation under the OrderedBucketMergeHubOps.bucketOf function, i.e., two elements are equivalent if they end up in the same bucket.

    enableInvariantCheck

    If true, invariants of the OrderedBucketMergeHub implementation are checked at run-time. Invariant violation are then logged as java.lang.IllegalStateException and abort the stage with an error. Do not enable these checks in production.

Type Members

  1. type ConfigAndMat = (Config, Option[M])
  2. type Shape = FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]]
    Definition Classes
    Graph

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##: Int
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  4. def addAttributes(attr: Attributes): Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    Graph
  5. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  6. def async(dispatcher: String, inputBufferSize: Int): Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    Graph
  7. def async(dispatcher: String): Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    Graph
  8. def async: Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    Graph
  9. def clone(): AnyRef
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.CloneNotSupportedException]) @native() @IntrinsicCandidate()
  10. def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done])
    Definition Classes
    OrderedBucketMergeHub → GraphStageWithMaterializedValue
  11. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  12. def equals(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef → Any
  13. implicit def errorLoggingContext(implicit traceContext: TraceContext): ErrorLoggingContext
    Attributes
    protected
    Definition Classes
    NamedLogging
  14. def getAttributes: Attributes
    Definition Classes
    Graph
  15. final def getClass(): Class[_ <: AnyRef]
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @IntrinsicCandidate()
  16. def hashCode(): Int
    Definition Classes
    AnyRef → Any
    Annotations
    @native() @IntrinsicCandidate()
  17. def initialAttributes: Attributes
    Attributes
    protected
    Definition Classes
    GraphStageWithMaterializedValue
  18. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  19. def logger: TracedLogger
    Attributes
    protected
    Definition Classes
    NamedLogging
  20. val loggerFactory: NamedLoggerFactory
    Attributes
    protected
    Definition Classes
    OrderedBucketMergeHubNamedLogging
  21. def named(name: String): Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    Graph
  22. implicit def namedLoggingContext(implicit traceContext: TraceContext): NamedLoggingContext
    Attributes
    protected
    Definition Classes
    NamedLogging
  23. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  24. def noTracingLogger: Logger
    Attributes
    protected
    Definition Classes
    NamedLogging
  25. final def notify(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @IntrinsicCandidate()
  26. final def notifyAll(): Unit
    Definition Classes
    AnyRef
    Annotations
    @native() @IntrinsicCandidate()
  27. def shape: FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, ConfigAndMat, A, Offset]]
    Definition Classes
    OrderedBucketMergeHub → Graph
  28. final def synchronized[T0](arg0: => T0): T0
    Definition Classes
    AnyRef
  29. def toString(): String
    Definition Classes
    AnyRef → Any
  30. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  31. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException]) @native()
  32. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.InterruptedException])
  33. final def withAttributes(attr: Attributes): Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]
    Definition Classes
    GraphStageWithMaterializedValue → Graph

Deprecated Value Members

  1. def finalize(): Unit
    Attributes
    protected[lang]
    Definition Classes
    AnyRef
    Annotations
    @throws(classOf[java.lang.Throwable]) @Deprecated @Deprecated
    Deprecated

Inherited from NamedLogging

Inherited from GraphStageWithMaterializedValue[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]

Inherited from Graph[FlowShape[OrderedBucketMergeConfig[Name, Config], Output[Name, (Config, Option[M]), A, Offset]], Future[Done]]

Inherited from AnyRef

Inherited from Any

Ungrouped