c
com.digitalasset.canton.domain.mediator
MediatorEventsProcessor
Companion object MediatorEventsProcessor
class MediatorEventsProcessor extends NamedLogging
Attempt to process a sequence of sequential events from the sequencer for the mediator in an optimal manner. We could correctly process them sequentially however this is suboptimal. We can parallelize their processing by respecting the following rules:
- TODO(soren) Only the active mediator for the domain can send messages to sequencer, and this active state could change within the events we are processing. So this must be determined before processing any Mediator requests.
- Mediator requests/responses with different request ids can be processed in parallel. For events referencing the same request-id we can provide these to the confirmation request processor as a group so it can make optimizations such as deferring persistence of a response state until the final message to avoid unnecessary database writes.
- Identity transactions must be processed by the identity client before subsequent mediator request/responses as the confirmation response processor may require knowing the latest relevant topology state.
- Pending mediator requests could timeout during the execution of this batch and should be handled with the timestamp of the event from the sequencer that caused them to timeout (it is tempting to just use the last timestamp to determine timeouts however we would like to ensure we use the closest timestamp to ensure a consistent version is applied across Mediators regardless of the batches of events they process). Unlikely however technically possible is that requests that are created while processing these events could also timeout due to sequencer time passing within this event range (think a low timeout value with a sequencer that is catching up so a long period could elapse even during a short range of events).
Crashes can occur at any point during this processing (or even afterwards as it's the persistence in the sequencer client that would move us to following events). Processing should be effectively idempotent to handle this.
Linear Supertypes
Ordering
- Alphabetic
- By Inheritance
Inherited
- MediatorEventsProcessor
- NamedLogging
- AnyRef
- Any
- Hide All
- Show All
Visibility
- Public
- Protected
Instance Constructors
- new MediatorEventsProcessor(state: MediatorState, crypto: DomainSyncCryptoClient, identityClientEventHandler: UnsignedProtocolEventHandler, handleMediatorEvents: (RequestId, Seq[Traced[MediatorEvent]]) => HandlerResult, readyCheck: MediatorReadyCheck, loggerFactory: NamedLoggerFactory)(implicit executionContext: ExecutionContext)
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def handle(events: Seq[OrdinaryProtocolEvent])(implicit traceContext: TraceContext): HandlerResult
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def logger: TracedLogger
- Attributes
- protected
- Definition Classes
- NamedLogging
- val loggerFactory: NamedLoggerFactory
- Attributes
- protected
- Definition Classes
- MediatorEventsProcessor → NamedLogging
- implicit def loggingContext(implicit traceContext: TraceContext): ErrorLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def noTracingLogger: Logger
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])