trait MessageDispatcher extends AnyRef
Dispatches the incoming messages of the com.digitalasset.canton.sequencing.client.SequencerClient to the different processors. It also informs the conflictdetection.RequestTracker about the passing of time for messages that are not processed by the TransactionProcessor.
- Self Type
- MessageDispatcher with NamedLogging
- Alphabetic
- By Inheritance
- MessageDispatcher
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- abstract type ProcessingResult
- Attributes
- protected
Abstract Value Members
- abstract def acsCommitmentProcessor: ProcessorType
- Attributes
- protected
- abstract def badRootHashMessagesRequestProcessor: BadRootHashMessagesRequestProcessor
- Attributes
- protected
- abstract def doProcess[A](kind: MessageKind[A], run: => FutureUnlessShutdown[A]): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- abstract def domainId: DomainId
- Attributes
- protected
- implicit abstract val ec: ExecutionContext
- Attributes
- protected
- abstract def flush(): Future[Unit]
Returns a future that completes when all calls to handleAll whose returned scala.concurrent.Future has completed prior to this call have completed processing.
Returns a future that completes when all calls to handleAll whose returned scala.concurrent.Future has completed prior to this call have completed processing.
- Annotations
- @VisibleForTesting()
- abstract def handleAll(events: Traced[Seq[PossiblyIgnoredProtocolEvent]]): HandlerResult
- abstract def inFlightSubmissionTracker: InFlightSubmissionTracker
- Attributes
- protected
- abstract def participantId: ParticipantId
- Attributes
- protected
- implicit abstract def processingResultMonoid: Monoid[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- abstract def recordOrderPublisher: RecordOrderPublisher
- Attributes
- protected
- abstract def repairProcessor: RepairProcessor
- Attributes
- protected
- abstract def requestCounterAllocator: RequestCounterAllocator
- Attributes
- protected
- abstract def requestProcessors: RequestProcessors
- Attributes
- protected
- abstract def requestTracker: RequestTracker
- Attributes
- protected
- abstract def topologyProcessor: (SequencerCounter, CantonTimestamp, Traced[List[DefaultOpenEnvelope]]) => HandlerResult
- Attributes
- protected
- abstract def tracker: SingleDomainCausalTracker
- Attributes
- protected
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def alarm(sc: SequencerCounter, ts: CantonTimestamp, msg: String)(implicit traceContext: TraceContext): Future[Unit]
- Attributes
- protected
- lazy val alarmer: LoggingAlarmStreamer
- Attributes
- protected
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def filterBatchForDomainId(batch: Batch[DefaultOpenEnvelope], sc: SequencerCounter, ts: CantonTimestamp)(implicit traceContext: TraceContext): List[DefaultOpenEnvelope]
- Attributes
- protected
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def observeDeliverError(error: DeliverError)(implicit traceContext: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- def observeSequencing(events: Seq[RawProtocolEvent])(implicit traceContext: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- def processBatch(event: SignedContent[Deliver[DefaultOpenEnvelope]])(implicit traceContext: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
Rules for processing batches of envelopes:
Rules for processing batches of envelopes:
- Identity transactions can be included in any batch of envelopes. They must be processed first.
The identity processor ignores replayed or invalid transactions and merely logs an error. - Acs commitments can be included in any batch of envelopes.
They must be processed before the requests and results to
meet the precondition of com.digitalasset.canton.participant.pruning.AcsCommitmentProcessor's
processBatch
method. - A com.digitalasset.canton.protocol.messages.MediatorResult message should be sent only by the trusted mediator of the domain.
The mediator should never include further messages with a com.digitalasset.canton.protocol.messages.MediatorResult.
So a participant accepts a com.digitalasset.canton.protocol.messages.MediatorResult
only if there are no other messages (except topology transactions and ACS commitments) in the batch.
Otherwise, the participant ignores the com.digitalasset.canton.protocol.messages.MediatorResult and raises an alarm.
The same applies to a com.digitalasset.canton.protocol.messages.MalformedMediatorRequestResult message that is triggered by root hash messages. The mediator uses the com.digitalasset.canton.data.ViewType from the com.digitalasset.canton.protocol.messages.RootHashMessage, which the participants also used to choose the processor for the request. So it suffices to forward the com.digitalasset.canton.protocol.messages.MalformedMediatorRequestResult to the appropriate processor.
- Request messages originate from untrusted participants. If the batch contains exactly one com.digitalasset.canton.protocol.messages.RootHashMessage that is sent to the participant and the mediator only, the participant processes only request messages with the same root hash. If there are no such root hash message or multiple thereof, the participant does not process the request at all because the mediator will reject the request as a whole.
- Attributes
- protected
- Identity transactions can be included in any batch of envelopes. They must be processed first.
- def processCausalityMessages(envelopes: List[DefaultOpenEnvelope])(implicit tc: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- def processTopologyTransactions(sc: SequencerCounter, ts: CantonTimestamp, envelopes: List[DefaultOpenEnvelope])(implicit traceContext: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- def repairProcessorWedging(timestamp: CantonTimestamp)(implicit traceContext: TraceContext): FutureUnlessShutdown[(MessageDispatcher.this)#ProcessingResult]
- Attributes
- protected
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])