class EnterpriseMessageDispatcher extends MessageDispatcher with NamedLogging with HasFlushFuture with Spanning
Dispatches the incoming messages of the com.digitalasset.canton.sequencing.client.SequencerClient to the different processors. It also informs the com.digitalasset.canton.participant.protocol.conflictdetection.RequestTracker about the passing of time for messages that are not processed by the com.digitalasset.canton.participant.protocol.ProtocolProcessor.
- Alphabetic
- By Inheritance
- EnterpriseMessageDispatcher
- Spanning
- HasFlushFuture
- NamedLogging
- MessageDispatcher
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new EnterpriseMessageDispatcher(domainId: DomainId, participantId: ParticipantId, requestTracker: RequestTracker, requestProcessors: RequestProcessors, tracker: SingleDomainCausalTracker, topologyProcessor: (SequencerCounter, CantonTimestamp, Traced[List[DefaultOpenEnvelope]]) => HandlerResult, acsCommitmentProcessor: ProcessorType, requestCounterAllocator: RequestCounterAllocator, recordOrderPublisher: RecordOrderPublisher, badRootHashMessagesRequestProcessor: BadRootHashMessagesRequestProcessor, repairProcessor: RepairProcessor, inFlightSubmissionTracker: InFlightSubmissionTracker, processAsyncronously: (ViewType) => Boolean, loggerFactory: NamedLoggerFactory)(implicit ec: ExecutionContext, tracer: Tracer)
Type Members
- type ProcessingResult = AsyncResult
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- val acsCommitmentProcessor: ProcessorType
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- def addToFlushAndLogError(name: String)(future: Future[_])(implicit loggingContext: ErrorLoggingContext): Unit
Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed.Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed. Logs an error if thefuture
fails with an exception.- Attributes
- protected
- Definition Classes
- HasFlushFuture
- def addToFlushWithoutLogging(name: String)(future: Future[_]): Unit
Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed.Adds the task
future
to the flush future so that doFlush completes only afterfuture
has completed. The caller is responsible for logging any exceptions thrown inside the future.- Attributes
- protected
- Definition Classes
- HasFlushFuture
- def alarm(sc: SequencerCounter, ts: CantonTimestamp, msg: String)(implicit traceContext: TraceContext): Future[Unit]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- lazy val alarmer: LoggingAlarmStreamer
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- val badRootHashMessagesRequestProcessor: BadRootHashMessagesRequestProcessor
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- def doFlush(): Future[Unit]
Returns a future that completes after all added futures have completed.
Returns a future that completes after all added futures have completed. The returned future never fails.
- Attributes
- protected
- Definition Classes
- HasFlushFuture
- def doProcess[A](kind: MessageKind[A], run: => FutureUnlessShutdown[A]): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val domainId: DomainId
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- implicit val ec: ExecutionContext
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def filterBatchForDomainId(batch: Batch[DefaultOpenEnvelope], sc: SequencerCounter, ts: CantonTimestamp)(implicit traceContext: TraceContext): List[DefaultOpenEnvelope]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def flush(): Future[Unit]
Returns a future that completes when all calls to handleAll whose returned scala.concurrent.Future has completed prior to this call have completed processing.
Returns a future that completes when all calls to handleAll whose returned scala.concurrent.Future has completed prior to this call have completed processing.
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- Annotations
- @VisibleForTesting()
- def flushCloseable(name: String, timeout: TimeoutDuration): SyncCloseable
- Attributes
- protected
- Definition Classes
- HasFlushFuture
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def handleAll(tracedEvents: Traced[Seq[PossiblyIgnoredProtocolEvent]]): HandlerResult
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- val inFlightSubmissionTracker: InFlightSubmissionTracker
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def logger: TracedLogger
- Attributes
- protected
- Definition Classes
- NamedLogging
- val loggerFactory: NamedLoggerFactory
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → NamedLogging
- implicit def loggingContext(implicit traceContext: TraceContext): ErrorLoggingContext
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def noTracingLogger: Logger
- Attributes
- protected
- Definition Classes
- NamedLogging
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def observeDeliverError(error: DeliverError)(implicit traceContext: TraceContext): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def observeSequencing(events: Seq[RawProtocolEvent])(implicit traceContext: TraceContext): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- val participantId: ParticipantId
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- def processBatch(event: SignedContent[Deliver[DefaultOpenEnvelope]])(implicit traceContext: TraceContext): FutureUnlessShutdown[ProcessingResult]
Rules for processing batches of envelopes:
Rules for processing batches of envelopes:
- Identity transactions can be included in any batch of envelopes. They must be processed first.
The identity processor ignores replayed or invalid transactions and merely logs an error. - Acs commitments can be included in any batch of envelopes.
They must be processed before the requests and results to
meet the precondition of com.digitalasset.canton.participant.pruning.AcsCommitmentProcessor's
processBatch
method. - A com.digitalasset.canton.protocol.messages.MediatorResult message should be sent only by the trusted mediator of the domain.
The mediator should never include further messages with a com.digitalasset.canton.protocol.messages.MediatorResult.
So a participant accepts a com.digitalasset.canton.protocol.messages.MediatorResult
only if there are no other messages (except topology transactions and ACS commitments) in the batch.
Otherwise, the participant ignores the com.digitalasset.canton.protocol.messages.MediatorResult and raises an alarm.
The same applies to a com.digitalasset.canton.protocol.messages.MalformedMediatorRequestResult message that is triggered by root hash messages. The mediator uses the com.digitalasset.canton.data.ViewType from the com.digitalasset.canton.protocol.messages.RootHashMessage, which the participants also used to choose the processor for the request. So it suffices to forward the com.digitalasset.canton.protocol.messages.MalformedMediatorRequestResult to the appropriate processor.
- Request messages originate from untrusted participants. If the batch contains exactly one com.digitalasset.canton.protocol.messages.RootHashMessage that is sent to the participant and the mediator only, the participant processes only request messages with the same root hash. If there are no such root hash message or multiple thereof, the participant does not process the request at all because the mediator will reject the request as a whole.
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- Identity transactions can be included in any batch of envelopes. They must be processed first.
- def processCausalityMessages(envelopes: List[DefaultOpenEnvelope])(implicit tc: TraceContext): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- def processTopologyTransactions(sc: SequencerCounter, ts: CantonTimestamp, envelopes: List[DefaultOpenEnvelope])(implicit traceContext: TraceContext): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- implicit val processingResultMonoid: Monoid[ProcessingResult]
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val recordOrderPublisher: RecordOrderPublisher
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val repairProcessor: RepairProcessor
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- def repairProcessorWedging(timestamp: CantonTimestamp)(implicit traceContext: TraceContext): FutureUnlessShutdown[ProcessingResult]
- Attributes
- protected
- Definition Classes
- MessageDispatcher
- val requestCounterAllocator: RequestCounterAllocator
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val requestProcessors: RequestProcessors
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val requestTracker: RequestTracker
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- val topologyProcessor: (SequencerCounter, CantonTimestamp, Traced[List[DefaultOpenEnvelope]]) => HandlerResult
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- val tracker: SingleDomainCausalTracker
- Attributes
- protected
- Definition Classes
- EnterpriseMessageDispatcher → MessageDispatcher
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- def withNewTrace[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning
- def withSpan[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit traceContext: TraceContext, tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning
- def withSpanFromGrpcContext[A](description: String)(f: (TraceContext) => (SpanWrapper) => A)(implicit tracer: Tracer): A
- Attributes
- protected
- Definition Classes
- Spanning