Custom Views¶
The Custom Views library provides convenient features to continuously ingest data from a ledger into a database, into tables of your choice, optimized for your querying requirements.
Custom Views is a Java library for projecting ledger events into a SQL database. It is currently available in Labs early access, and only supports PostgreSQL right now.
Use the following Maven dependency to use the Custom Views library:
<dependency>
<groupId>com.daml</groupId>
<artifactId>custom-views_2.13</artifactId>
<version>2.5.0</version>
</dependency>
Please see the Custom Views github repository for an example Maven project of a simple application that projects data into a SQL table and provides access to the data through a REST interface.
Overview¶
A Projection is a resumable process that continuously reads ledger events and transforms these into rows in SQL tables. A projection ensures that rows are committed according to ledger transactions, ensuring that the isolation and atomicity of changes perceived by database users are consistent with committed transactions on the ledger.
At a high level, the following types are needed to run a projection process:
- A
BatchSourceconnects to the ledger and reads events from it. - A
Projectiondefines which events to process from theBatchSource, from whichOffsetto start processing, optionally up to an endOffset. - A
Projectfunction converts the events into database actions. AJdbcActiontype defines which SQL statements should be executed. - The
projectmethod onProjectortakes aBatchSource, aProjection, and aProjectfunction. Theprojectmethod starts the projection process. Database transactions are committed as they occur on the ledger.
A common workflow for setting up a projection process follows:
- Create a table in your SQL database.
- Create a
Projector. - Choose the type of event you want to project and create a
BatchSourcefor it. - Create a
Projection. If the projection already exists, it will continue where it left off. - Create a
Projectfunction that transforms an event into (0 to N) database actions. - Invoke
projecton theProjector, passing in theBatchSource, theProjection, and theProjectfunction. This starts the projection process, and returns aControlto control the process. - Cancel the projection by invoking
control.cancelon shutdown of your application.
The next sections explain the most important objects in the Custom Views library in more detail.
Projector¶
A Projector executes the projection process. The code snippet below shows how to create a JdbcProjector.
var config = new HikariConfig();
config.setJdbcUrl(url);
config.setUsername(user);
config.setPassword(password);
var ds = new HikariDataSource(config);
var system = ActorSystem.create("my-projection-app");
var projector = JdbcProjector.create(ds, system);
A Projector provides project methods to start a projection process.
A DataSource is used to create database connections when required. In this example a Hikari connection pool is used.
The project methods return a Control which can be used to:
- Cancel the projection.
- Find out if the projection has completed or failed.
- Wait for the projection process to close all its resources.
A projection only completes if an end Offset is set, otherwise it continuously runs and projects events as they occur on the ledger.
The project methods take a BatchSource, a Projection and a Project function, which are explained in the next sections.
BatchSource¶
A projection connects to the ledger and reads events using a BatchSource, which internally uses the Ledger API with gRPC.
The Ledger API provides the following types of events:
Event(CreatedEventorArchivedEvent)ExercisedEventTreeEvent
The projection library uses the Event, ExercisedEvent and TreeEvent classes from the Java Bindings
in the com.daml.ledger.javaapi.data package to represent these events.
The following BatchSources are available:
BatchSource.eventscreates aBatchSourcethat readsEvents from the ledger.BatchSource.exercisedEventscreates aBatchSourcethat readsExercisedEvents from the ledger.BatchSource.treeEventscreates aBatchSourcethat readsTreeEvents from the ledger.
The example below shows how to create a BatchSource that reads CreatedEvents and ArchivedEvents from the ledger at localhost, port 6865:
var grpcClientSettings = GrpcClientSettings.connectToServiceAt("localhost", 6865, system);
var source = BatchSource.events(grpcClientSettings);
Additionally BatchSource.create creates a BatchSource from code-generated Contract types from CreateEvents,
or creates a BatchSource from simple values, which is convenient for unit testing.
Batch¶
A BatchSource reads events into Batches. A Batch consists of 1 to many events, and optionally contains a marker that indicates that a transaction has been committed on the ledger.
Batches make it possible to process larger than memory transactions, while tracking transactions as they occur on the ledger, and making it possible for downstream
database transactions to only commit when these transaction markers have been detected.
Envelope¶
The events in Batches are wrapped in Envelopes. An Envelope provides additional fields providing more context about what occurred on the ledger.
It has the following fields:
event: The wrapped value.getEventandunwrap()both provide this value.offset: The offset of the event.table: The (main)ProjectionTablethat is projected to.workflowId(optional)ledgerEffectiveTime(optional)transactionId(optional)
Projection¶
The Projection keeps track of the projection process and decides which events will be projected from the BatchSource.
A Projection:
- has a ProjectionId that must uniquely identify the projection process.
- has an
Offsetwhich is used as a starting point to read from the ledger. - has a
ProjectionFilter. TheBatchSourceuses this filter to select events from the ledger. (If you are familiar with the gRPC service, theProjectionFiltertranslates to aTransactionFilter) - specifies an SQL table to project to with a
ProjectionTable. - optionally has a
Predicateto filter events that were read from the ledger. - optionally has an end
Offset, if set the projection ends when a transaction for theOffsethas been read from the ledger. - is stored in the
projectionSQL table.
A newly created projection by default has no offset, which means a projection starts from the beginning of the ledger. A projection updates when it successfully commits transactions into the SQL database according to transactions that were committed on the ledger. A projection resumes from its stored offset automatically, if it can be found by its ProjectionId.
The code below shows an example of how to create a Projection:
var projectionTable = new ProjectionTable("ious");
var eventsProjection =
Projection.<Event>create(
new ProjectionId("iou-projection-for-party"),
ProjectionFilter.parties(Set.of(partyId))
);
The eventsProjection Projection selects Events that occurred visible to the party partyId to the ious SQL table.
The Project function¶
The Project<E,A> function projects an event Envelope<E> into a List<A>.
For the project methods on JdbcProjector, A is a JdbcAction.
The code below shows an example of a Project function that handles CreatedEvents and ArchivedEvents.
Project<Event, JdbcAction> f =
envelope -> {
var event = envelope.getEvent();
if (event instanceof CreatedEvent) {
Iou.Contract iou = Iou.Contract.fromCreatedEvent((CreatedEvent) event);
var action =
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (?, ?, ?, ?)"
)
.bind(1, event.getContractId(), Bind.String())
.bind(2, event.getEventId(), Bind.String())
.bind(3, iou.data.amount, Bind.BigDecimal())
.bind(4, iou.data.currency, Bind.String());
return List.of(action);
} else {
var action =
ExecuteUpdate.create(
"delete from " +
projectionTable.getName() +
" where contract_id = ?"
)
.bind(1, event.getContractId(), Bind.String());
return List.of(action);
}
};
The Project function f creates an insert action for every CreatedEvent and a delete action for every ArchivedEvent.
The JdbcActions are further explained in the next section.
The JdbcAction¶
A database action captures a SQL statement that is executed by a Projector.
The JdbcAction is an interface with one method, shown in the example below:
public int execute(java.sql.Connection con);
All actions extend JdbcAction. execute should return the number of rows affected by the action.
The ExecuteUpdate action creates an insert, delete, or update statement.
The example below shows how an insert statement can be created, and how arguments can be bound to the statement:
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (?, ?, ?, ?)")
.bind(1, event.getContractId(), Bind.String())
.bind(2, event.getEventId(), Bind.String())
.bind(3, iou.data.amount, Bind.BigDecimal())
.bind(4, iou.data.currency, Bind.String());
It is also possible to use named parameters, which is shown in the example below:
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (:cid, :eid, :amount, :currency)")
.bind("cid", event.getContractId(), Bind.String())
.bind("eid", event.getEventId(), Bind.String())
.bind("amount", iou.data.amount, Bind.BigDecimal())
.bind("currency", iou.data.currency, Bind.String());
Projecting rows in batches¶
The ExecuteUpdate action internally creates a new java.sql.PreparedStatement when it is executed.
Use UpdateMany if you want to reuse the java.sql.PreparedStatement and add statements in batches, which can make a considerable difference to performance.
The example below shows how you can use projectRows to project using UpdateMany.
In this case we are using a code generated Iou.Contract class to function as a Row, which we use to bind to a SQL statement
which is executed in batches.
var projectionTable = new ProjectionTable("ious");
var contracts = Projection.<Iou.Contract>create(
new ProjectionId("iou-contracts-for-party"),
ProjectionFilter.parties(Set.of(partyId)),
projectionTable
);
var batchSource = BatchSource.create(grpcClientSettings,
e -> {
return Iou.Contract.fromCreatedEvent(e);
});
Project<Iou.Contract, Iou.Contract> mkRow =
envelope -> {
return List.of(envelope.getEvent());
};
Binder<Iou.Contract> binder = Sql.<Iou.Contract>binder(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (:contract_id, :event_id, :amount, :currency)")
.bind("contract_id", iou -> iou.id.contractId, Bind.String())
.bind("event_id", iou -> null, Bind.String())
.bind("amount", iou -> iou.data.amount, Bind.BigDecimal())
.bind("currency", iou -> iou.data.currency, Bind.String());
BatchRows<Iou.Contract, JdbcAction> batchRows =
UpdateMany.create(binder);
var control =
projector.projectRows(
batchSource,
contracts,
batchRows,
mkRow
);
The Project function just returns the Iou.Contract since we can use this directly for our insert statement.
Next we use a Binder to bind the Iou.Contract to the insert statement.
The UpdateMany.create creates a BatchRows function that transforms a List of rows, in this case Iou.Contracts, into a single JdbcAction.
projectRows starts the projection process, converting created Iou.Contracts into rows in the ious table.
Configuration¶
The Custom Views library uses the Lightbend config library for configuration.
The library is packaged with a reference.conf file which specifies default settings. The next sections describe the default confguration settings.
You can override the configuration by using an application.conf file, see using the Lightbend config library for more details.
Database migration with Flyway¶
Flyway is used for database migration. Resources to create and migrate the database objects that the library needs internally are provided, for instance for the projection table that is used to persist Projections.
The internal SQL scripts are provided in the jar at /db/migration/projection.
The reference.conf file configures this by default, shown below:
projection {
# The name of the projection table which keeps track of all projections by projection-id
projection-table-name = "projection"
# database migration configuration
flyway {
# location of flyway migration schemas for internal bookkeeping (the projection-table).
internal-locations = ["db/migration/projection"]
# Override locations to provide your own flyway scripts.
locations = []
# If set to true, database migration is executed automatically.
migrate-on-start = true
}
}
The projection table is created automatically when a projection process is started with the project, projectRows, or projectEvents method on Projector.
Provide additional flyway locations with the projection.flyway.locations configuration parameter and bundle your own resources as explained
in the Flyway documentation. This makes it possible to create and migrate
database tables and other database objects required for your projections automatically when a projection is (re-)started.
If you do not want to use Flyway database migration, set projection.flyway.migrate-on-start to false. In that case you have to create the projection table yourself as well.
Batcher configuration¶
A Batch consists of 1 to many events, and optionally contains a marker that indicates that a transaction has been committed on the ledger.
Both the batch-size and the batch-interval are configured in the reference.conf:
projection {
batch-size = 10000
batch-interval = 1 second
}
Dispatcher configuration for blocking operation¶
A default dedicated dispatcher for blocking operations (e.g. db operation) is configured in reference.conf:
projection {
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
}
Ledger API Authorization¶
The client must provide an access token when authorization is required by the Ledger. For details of ledger authorization, please refer to Ledger Authorization documentation.
Provide access token to custom-view library¶
Applications can provide an access token when setting up the client. The example below shows how to set LedgerCallCredentials on the GrpcClientSettings.
var grpcClientSettings = GrpcClientSettings
.connectToServiceAt("localhost", 6865, system)
.withCallCredentials(new LedgerCallCredentials(accessToken));
var source = BatchSource.events(grpcClientSettings);
var control = projector.project(source, events, f);
Provide a newly retrieved access token when the existing one has expired¶
When an access token is expired, an application can retrieve a new access token with the stored refresh token. For details on the refresh token, please refer to Ledger auth-middleware documentation. With the new access token, an application can cancel the running projection and re-create a new one using the new token.
control.cancel().thenApply(done -> {
var sourceWithNewToken = BatchSource.events(
grpcClientSettings.withCallCredentials(new LedgerCallCredentials(newAccessToken))
);
return projector.project(sourceWithNewToken, events, f);
});