package sequencer
- Alphabetic
- By Inheritance
- sequencer
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- abstract class BaseSequencer extends Sequencer with NamedLogging with Spanning
Provides additional functionality that is common between sequencer implementations:
Provides additional functionality that is common between sequencer implementations:
- auto registers unknown recipients addressed in envelopes from the domain topology manager (avoids explicit registration from the domain node -> sequencer which will be useful when separate processes)
- case class BatchWritten(notifies: WriteNotification, latestTimestamp: CantonTimestamp) extends Product with Serializable
- type BlockHeight = BigInteger
- sealed trait BlockProcessingError extends AnyRef
- sealed trait CommitMode extends AnyRef
- sealed trait CommunitySequencerConfig extends SequencerConfig
- class ConflictingPayloadIdException extends SequencerWriterException
We intentionally use an unsafe storage method for writing payloads to take advantage of a full connection pool for performance.
We intentionally use an unsafe storage method for writing payloads to take advantage of a full connection pool for performance. However this means if a HA Sequencer Writer has lost its instance lock it may still attempt to write payloads while another Sequencer Writer is active with the same instance index. As we use this instance index to generate an (almost) conflict free payload id, in this circumstance there is a slim chance that we may attempt to write conflicting payloads with the same id. If we were using a simple idempotent write approach this could result in the active sequencer writing an event with a payload from the offline writer process (and not the payload it is expecting). This would be a terrible and difficult to diagnose corruption issue.
If this exception is raised we currently just halt the writer and run crash recovery. This is slightly suboptimal as in the above scenario we may crash the active writer (if they were second to write a conflicting payload id). However this will be safe. We could optimise this by checking the active lock status and only halting if this is found to be false.
- class CounterCheckpointInconsistentException extends RuntimeException
We throw this if a store.SaveCounterCheckpointError.CounterCheckpointInconsistent error is returned when saving a new member counter checkpoint.
We throw this if a store.SaveCounterCheckpointError.CounterCheckpointInconsistent error is returned when saving a new member counter checkpoint. This is exceptionally concerning as may suggest that we are streaming events with inconsistent counters. Should only be caused by a bug or the datastore being corrupted.
- class DatabaseSequencer extends BaseSequencer with FlagCloseable
- trait DatabaseSequencerConfig extends AnyRef
Unsealed trait so the database sequencer config can be reused between community and enterprise
- class DirectSequencerClientTransport extends SequencerClientTransport with NamedLogging
This transport is meant to be used to create a sequencer client that connects directly to an in-process sequencer.
This transport is meant to be used to create a sequencer client that connects directly to an in-process sequencer. Needed for cases when the sequencer node itself needs to listen to specific events such as identity events.
- case class EthereumBlockContents[E](blockHeight: BlockHeight, events: List[E]) extends Product with Serializable
We frequently carry around events for a block.
We frequently carry around events for a block. Type aliases provided in
package.scala
give names to common usages of event types. - class EthereumLedgerConnection extends NamedLogging with AutoCloseable with HasDegradationState[Warn]
- sealed trait EthereumLedgerEvent extends AnyRef
- class EthereumSequencer extends BaseSequencer with NamedLogging with FlagCloseable with NoTracing
- trait EventSignaller extends FlagCloseableAsync
Component to signal to a SequencerReader that more events may be available to read so should attempt fetching events from its store.
- trait FabricBlockEvent extends AnyRef
- case class FabricBlockEvents(height: Long, events: Seq[Traced[FabricBlockEvent]]) extends Product with Serializable
- class FabricBlockUpdateGenerator extends NamedLogging
- class FabricInitializationChecker extends BlockListener with NamedLogging with NoTracing
When using multiple Fabric sequencer applications, it is important to check that the ledger has been initialized.
When using multiple Fabric sequencer applications, it is important to check that the ledger has been initialized. That's because if all application are started concurrently and one of them is initializing the ledger, the other ones should wait for that to finish before continuing. This logic is implemented here in the form of listening to blocks and considering it initialized only when seeing the "init" transaction. Alternatively if using a cloud deployment, we'll be looking for a "deploy" transaction. This init function call happens when FabricLedgerCreator instantiates and initializes the sequencer chaincode. Make sure this is only used when reading blocks from 0. Otherwise the transaction we're waiting for will not be seen.
- class FabricLedgerWriter extends NamedLogging with FlagCloseableAsync
- class FabricSequencer extends BaseSequencer with FlagCloseable
- class FabricSequencerReader extends NamedLogging with FlagCloseableAsync
- class LedgerBlockListener extends BlockListener
- class LessBrokenQueuingTransactionReceiptProcessor extends QueuingTransactionReceiptProcessor
Used as documented, the QueuingTransactionReceiptProcessor doesn't actually work...
Used as documented, the QueuingTransactionReceiptProcessor doesn't actually work... (or any other deferred receipt processor for that matter) https://github.com/web3j/web3j/issues/1207 It hands back a transaction receipt with only the transaction hash and attempting to access any other field will cause a unsupported operation to be thrown. This is correct as it's not waiting for the transaction to actually be mined and the queuing processor will asynchronously notify a callback when it is. However... All of the execution transaction methods that are used by the codegen'ed Java contract wrappers such as Sequencer call
isStatusOK
on the transaction receipt causing everything to go booooom. To work around we wrap the receipt processor and make the isStatusOk getter on the returned transaction receipt a little less boomy. - class LocalSequencerStateEventSignaller extends EventSignaller with NamedLogging
If all Sequencer writes are occurring locally we pipe write notifications to read subscriptions allowing the SequencerReader to immediately read from the backing store rather than polling.
If all Sequencer writes are occurring locally we pipe write notifications to read subscriptions allowing the SequencerReader to immediately read from the backing store rather than polling.
An important caveat is that we only supply signals when a write for a member occurs. If there are no writes from starting the process the member will never receive a read signal. The SequencerReader is responsible for performing at least one initial read from the store to ensure that all prior events are served as required.
Not suitable or at least very sub-optimal for a horizontally scaled sequencer setup where a reader will not have visibility of all writes locally.
- type ParsedBlockContents = EthereumBlockContents[Traced[EthereumLedgerEvent]]
- class PartitionedTimestampGenerator extends AnyRef
To generate unique timestamps between many nodes without coordination we partition available timestamps by the node index within a range of the total number of nodes.
- class PayloadMissingException extends SequencerWriterException
A payload that we should have just stored now seems to be missing.
- class PollingEventSignaller extends EventSignaller with NamedLogging
Ignore local writes and simply trigger reads periodically based on a static polling interval.
Ignore local writes and simply trigger reads periodically based on a static polling interval. Suitable for horizontally scaled sequencers where the local process will not have in-process visibility of all writes.
- sealed trait PruningError extends AnyRef
Errors from pruning
- sealed trait ReadSignal extends AnyRef
Signal that a reader should attempt to read the latest events as some may have been written
- class SendEventGenerator extends AnyRef
- sealed trait SequencedWrite extends HasTraceContext
A write that we've assigned a timestamp to.
A write that we've assigned a timestamp to. We drag these over the same clock so we can ensure earlier items have lower timestamps and later items have higher timestamps. This is very helpful, essential you may say, for correctly setting the watermark while ensuring an event with an earlier timestamp will not be written.
- trait Sequencer extends AutoCloseable
Interface for sequencer operations.
Interface for sequencer operations. The default DatabaseSequencer implementation is backed by a database run by a single operator. Other implementations support operating a Sequencer on top of third party ledgers or other infrastructure.
- case class SequencerClients(members: Set[Member] = Set.empty) extends Product with Serializable
Structure housing both members and instances of those members.
Structure housing both members and instances of those members. Used to list clients that have been or need to be disabled.
- trait SequencerConfig extends AnyRef
- trait SequencerFactory extends AnyRef
- case class SequencerHighAvailabilityConfig(enabled: Boolean = false, totalNodeCount: Int = 10, keepAliveInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMillis(100L), onlineCheckInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(5L), offlineDuration: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(8L)) extends Product with Serializable
Configuration for how many sequencers are concurrently operating within the domain.
Configuration for how many sequencers are concurrently operating within the domain.
- enabled
Set to true to enable HA for the sequencer.
- totalNodeCount
how many sequencer writers will there ever be in this domain. recommend setting to a value larger than the current topology to allow for expansion.
- keepAliveInterval
how frequently will we ensure the sequencer watermark is updated to ensure it still appears alive
- onlineCheckInterval
how frequently should this sequencer check that nodes are still online
- offlineDuration
how long should a sequencer watermark be lagging for it to be flagged as offline
- case class SequencerMemberStatus(member: Member, registeredAt: CantonTimestamp, lastAcknowledged: Option[CantonTimestamp], enabled: Boolean = true) extends HasProtoV0[admin.v0.SequencerMemberStatus] with Product with Serializable
- class SequencerOfflineException extends SequencerWriterException
Throw as an error in the akka stream when we discover that our currently running sequencer writer has been marked as offline.
- case class SequencerPruningStatus(lowerBound: CantonTimestamp, now: CantonTimestamp, members: Seq[SequencerMemberStatus]) extends HasProtoV0[admin.v0.SequencerPruningStatus] with Product with Serializable
Pruning status of a Sequencer.
Pruning status of a Sequencer.
- lowerBound
the earliest timestamp that can be read
- now
the current time of the sequencer clock
- members
details of registered members
- class SequencerReader extends NamedLogging with FlagCloseable
- case class SequencerReaderConfig(readBatchSize: Int = 100, checkpointInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(5), pollingInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMillis(100)) extends Product with Serializable
Configuration for the database based sequence reader.
Configuration for the database based sequence reader.
- readBatchSize
max number of events to fetch from the datastore in one page
- checkpointInterval
how frequently to checkpoint state
- pollingInterval
how frequently to poll for new events from the database. only used if the SequencerHighAvailabilityConfig has been set, otherwise will rely on local writes performed by this sequencer to indicate that new events are available.
- case class SequencerSnapshot(lastTs: CantonTimestamp, heads: Map[Member, SequencerCounter], status: SequencerPruningStatus, additional: Option[ImplementationSpecificInfo]) extends HasProtoV0[admin.v0.SequencerSnapshot] with Product with Serializable
- class SequencerWriter extends NamedLogging with FlagCloseableAsync
The Writer component is in practice a little state machine that will run crash recovery on startup then create a running SequencerWriterSource.
The Writer component is in practice a little state machine that will run crash recovery on startup then create a running SequencerWriterSource. If this materialized Sequencer flow then crashes with an exception that can be recovered by running crash recovery it will then go through this process and attempt to restart the flow.
Callers must call start to start the writer and will likely have to wait for this completing before accepting calls for the sequencer to direct at send. Note that the crash recovery steps may take a long duration to run:
- we delete invalid events previously written by the sequencer and then attempt to insert a new online watermark, and these database queries may simply take a long time to run.
- the SequencerWriter will wait until our clock has reached the new online watermark timestamp before starting the writer to ensure that no events before this timestamp are written. If this online watermark is significantly ahead of the current clock value it will just wait until this is reached. In practice assuming all sequencers in the local topology are kept in sync through NTP or similar, this duration should be very small (<1s).
- sealed trait SequencerWriterConfig extends AnyRef
Configuration for the database based sequencer writer
- sealed abstract class SequencerWriterException extends RuntimeException
Base class for exceptions intentionally thrown during Akka stream to flag errors
- class SequencerWriterQueues extends NamedLogging
- trait SequencerWriterStoreFactory extends AutoCloseable
Create instances for a store.SequencerWriterStore and a predicate to know whether we can recreate a sequencer writer on failures encountered potentially during storage.
Create instances for a store.SequencerWriterStore and a predicate to know whether we can recreate a sequencer writer on failures encountered potentially during storage. Implements
AutoClosable
so implementations can use lifecycle.FlagCloseable to short circuit retry attempts. - type UnparsedBlockContents = EthereumBlockContents[BaseEventResponse]
Unparsed in this context meaning the raw web3j codegen java instances (which to be fair have already been extracted from the raw transaction event log).
- sealed trait Write extends AnyRef
A write we want to make to the db
- sealed trait WriteNotification extends AnyRef
Who gets notified that a event has been written
- trait WriterStartupError extends AnyRef
Errors that can occur while starting the SequencerWriter.
Errors that can occur while starting the SequencerWriter. In the HA Sequencer Writer it is possible due to racy assignment of writer locks that the startup may fail. This is by design and starting the writer can simply be retried. However other errors such as discovering incorrect configuration or a unexpected commit mode will not be recoverable and the writer should propagate the failure.
Value Members
- object BatchWritten extends Serializable
- object BlockProcessingError
- object CommitMode
- object CommunitySequencerConfig
- object DatabaseSequencerConfig
- object EthereumBlockContents extends Serializable
- object EthereumBlockProcessingFlow
Creates a flowable that will take blocks of unparsed ethereum events, fetch batches, parse out the events then finally update the state manager appropriately
- object EthereumErrors extends EthereumErrorGroup
- object EthereumLedgerConnection
- object EthereumLedgerEvent
- object EthereumSequencer
- object FabricBlockEvent
- object FabricSequencer
- object FabricSequencerUtils
- object FetchLatestEventsFlow
Flow that upon read signals will attempt to fetch all events until head is reached.
Flow that upon read signals will attempt to fetch all events until head is reached. Events are paged in within a sub-source to prevent all events being held in memory at once.
- object LedgerBlockListener
- object NotifyEventSignallerFlow
- object PruningError
- case object ReadSignal extends ReadSignal with Product with Serializable
- object SequenceWritesFlow
- object SequencedWrite
- object Sequencer
- object SequencerFactory
- object SequencerHighAvailabilityConfig extends Serializable
- object SequencerMemberStatus extends Serializable
- object SequencerPruningStatus extends Serializable
- object SequencerReader
- object SequencerSnapshot extends Serializable
- object SequencerWriter
- object SequencerWriterConfig
Expose config as different named versions using different default values to allow easy switching for the different setups we can run in (high-throughput, low-latency).
Expose config as different named versions using different default values to allow easy switching for the different setups we can run in (high-throughput, low-latency). However as each value is only a default so they can also be easily overridden if required.
- object SequencerWriterSource
Akka stream for writing as a Sequencer
- object SequencerWriterStoreFactory
- object UpdateWatermarkFlow
- object Write
- object WriteNotification
- object WritePayloadsFlow
Extract the payloads of events and write them in batches to the payloads table.
Extract the payloads of events and write them in batches to the payloads table. As order does not matter at this point allow writing batches concurrently up to the concurrency specified by SequencerWriterConfig.payloadWriteMaxConcurrency. Pass on the events with the payloads dropped and replaced by their payload ids.
- object WriterStartupError