Custom Views¶
The Custom Views library provides convenient features to continuously ingest data from a ledger into a database, into tables of your choice, optimized for your querying requirements.
Custom Views is a Java library for projecting ledger events into a SQL database. It is currently available in Labs early access, and only supports PostgreSQL right now.
Use the following Maven dependency to use the Custom Views library:
<dependency>
<groupId>com.daml</groupId>
<artifactId>custom-views_2.13</artifactId>
<version>2.5.0</version>
</dependency>
Please see the Custom Views github repository for an example Maven project of a simple application that projects data into a SQL table and provides access to the data through a REST interface.
Overview¶
A Projection is a resumable process that continuously reads ledger events and transforms these into rows in SQL tables. A projection ensures that rows are committed according to ledger transactions, ensuring that the isolation and atomicity of changes perceived by database users are consistent with committed transactions on the ledger.
At a high level, the following types are needed to run a projection process:
- A
BatchSource
connects to the ledger and reads events from it. - A
Projection
defines which events to process from theBatchSource
, from whichOffset
to start processing, optionally up to an endOffset
. - A
Project
function converts the events into database actions. AJdbcAction
type defines which SQL statements should be executed. - The
project
method onProjector
takes aBatchSource
, aProjection
, and aProject
function. Theproject
method starts the projection process. Database transactions are committed as they occur on the ledger.
A common workflow for setting up a projection process follows:
- Create a table in your SQL database.
- Create a
Projector
. - Choose the type of event you want to project and create a
BatchSource
for it. - Create a
Projection
. If the projection already exists, it will continue where it left off. - Create a
Project
function that transforms an event into (0 to N) database actions. - Invoke
project
on theProjector
, passing in theBatchSource
, theProjection
, and theProject
function. This starts the projection process, and returns aControl
to control the process. - Cancel the projection by invoking
control.cancel
on shutdown of your application.
The next sections explain the most important objects in the Custom Views library in more detail.
Projector¶
A Projector
executes the projection process. The code snippet below shows how to create a JdbcProjector
.
var config = new HikariConfig();
config.setJdbcUrl(url);
config.setUsername(user);
config.setPassword(password);
var ds = new HikariDataSource(config);
var system = ActorSystem.create("my-projection-app");
var projector = JdbcProjector.create(ds, system);
A Projector
provides project
methods to start a projection process.
A DataSource is used to create database connections when required. In this example a Hikari connection pool is used.
The project
methods return a Control
which can be used to:
- Cancel the projection.
- Find out if the projection has completed or failed.
- Wait for the projection process to close all its resources.
A projection only completes if an end Offset
is set, otherwise it continuously runs and projects events as they occur on the ledger.
The project
methods take a BatchSource
, a Projection
and a Project
function, which are explained in the next sections.
BatchSource¶
A projection connects to the ledger and reads events using a BatchSource
, which internally uses the Ledger API with gRPC.
The Ledger API provides the following types of events:
Event
(CreatedEvent
orArchivedEvent
)ExercisedEvent
TreeEvent
The projection library uses the Event
, ExercisedEvent
and TreeEvent
classes from the Java Bindings
in the com.daml.ledger.javaapi.data
package to represent these events.
The following BatchSource
s are available:
BatchSource.events
creates aBatchSource
that readsEvent
s from the ledger.BatchSource.exercisedEvents
creates aBatchSource
that readsExercisedEvent
s from the ledger.BatchSource.treeEvents
creates aBatchSource
that readsTreeEvent
s from the ledger.
The example below shows how to create a BatchSource
that reads CreatedEvent
s and ArchivedEvent
s from the ledger at localhost
, port 6865
:
var grpcClientSettings = GrpcClientSettings.connectToServiceAt("localhost", 6865, system);
var source = BatchSource.events(grpcClientSettings);
Additionally BatchSource.create
creates a BatchSource
from code-generated Contract
types from CreateEvent
s,
or creates a BatchSource
from simple values, which is convenient for unit testing.
Batch¶
A BatchSource
reads events into Batch
es. A Batch
consists of 1 to many events, and optionally contains a marker that indicates that a transaction has been committed on the ledger.
Batches make it possible to process larger than memory transactions, while tracking transactions as they occur on the ledger, and making it possible for downstream
database transactions to only commit when these transaction markers have been detected.
Envelope¶
The events in Batches are wrapped in Envelopes. An Envelope
provides additional fields providing more context about what occurred on the ledger.
It has the following fields:
event
: The wrapped value.getEvent
andunwrap()
both provide this value.offset
: The offset of the event.table
: The (main)ProjectionTable
that is projected to.workflowId
(optional)ledgerEffectiveTime
(optional)transactionId
(optional)
Projection¶
The Projection
keeps track of the projection process and decides which events will be projected from the BatchSource
.
A Projection:
- has a ProjectionId that must uniquely identify the projection process.
- has an
Offset
which is used as a starting point to read from the ledger. - has a
ProjectionFilter
. TheBatchSource
uses this filter to select events from the ledger. (If you are familiar with the gRPC service, theProjectionFilter
translates to aTransactionFilter
) - specifies an SQL table to project to with a
ProjectionTable
. - optionally has a
Predicate
to filter events that were read from the ledger. - optionally has an end
Offset
, if set the projection ends when a transaction for theOffset
has been read from the ledger. - is stored in the
projection
SQL table.
A newly created projection by default has no offset, which means a projection starts from the beginning of the ledger. A projection updates when it successfully commits transactions into the SQL database according to transactions that were committed on the ledger. A projection resumes from its stored offset automatically, if it can be found by its ProjectionId.
The code below shows an example of how to create a Projection:
var projectionTable = new ProjectionTable("ious");
var eventsProjection =
Projection.<Event>create(
new ProjectionId("iou-projection-for-party"),
ProjectionFilter.parties(Set.of(partyId))
);
The eventsProjection
Projection
selects Event
s that occurred visible to the party partyId
to the ious
SQL table.
The Project function¶
The Project<E,A> function projects an event Envelope<E> into a List<A>.
For the project
methods on JdbcProjector
, A is a JdbcAction
.
The code below shows an example of a Project
function that handles CreatedEvents and ArchivedEvents.
Project<Event, JdbcAction> f =
envelope -> {
var event = envelope.getEvent();
if (event instanceof CreatedEvent) {
Iou.Contract iou = Iou.Contract.fromCreatedEvent((CreatedEvent) event);
var action =
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (?, ?, ?, ?)"
)
.bind(1, event.getContractId(), Bind.String())
.bind(2, event.getEventId(), Bind.String())
.bind(3, iou.data.amount, Bind.BigDecimal())
.bind(4, iou.data.currency, Bind.String());
return List.of(action);
} else {
var action =
ExecuteUpdate.create(
"delete from " +
projectionTable.getName() +
" where contract_id = ?"
)
.bind(1, event.getContractId(), Bind.String());
return List.of(action);
}
};
The Project
function f creates an insert action for every CreatedEvent
and a delete action for every ArchivedEvent
.
The JdbcAction
s are further explained in the next section.
The JdbcAction¶
A database action captures a SQL statement that is executed by a Projector
.
The JdbcAction
is an interface with one method, shown in the example below:
public int execute(java.sql.Connection con);
All actions extend JdbcAction
. execute
should return the number of rows affected by the action.
The ExecuteUpdate
action creates an insert, delete, or update statement.
The example below shows how an insert statement can be created, and how arguments can be bound to the statement:
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (?, ?, ?, ?)")
.bind(1, event.getContractId(), Bind.String())
.bind(2, event.getEventId(), Bind.String())
.bind(3, iou.data.amount, Bind.BigDecimal())
.bind(4, iou.data.currency, Bind.String());
It is also possible to use named parameters, which is shown in the example below:
ExecuteUpdate.create(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (:cid, :eid, :amount, :currency)")
.bind("cid", event.getContractId(), Bind.String())
.bind("eid", event.getEventId(), Bind.String())
.bind("amount", iou.data.amount, Bind.BigDecimal())
.bind("currency", iou.data.currency, Bind.String());
Projecting rows in batches¶
The ExecuteUpdate action internally creates a new java.sql.PreparedStatement
when it is executed.
Use UpdateMany if you want to reuse the java.sql.PreparedStatement
and add statements in batches, which can make a considerable difference to performance.
The example below shows how you can use projectRows
to project using UpdateMany
.
In this case we are using a code generated Iou.Contract
class to function as a Row, which we use to bind to a SQL statement
which is executed in batches.
var projectionTable = new ProjectionTable("ious");
var contracts = Projection.<Iou.Contract>create(
new ProjectionId("iou-contracts-for-party"),
ProjectionFilter.parties(Set.of(partyId)),
projectionTable
);
var batchSource = BatchSource.create(grpcClientSettings,
e -> {
return Iou.Contract.fromCreatedEvent(e);
});
Project<Iou.Contract, Iou.Contract> mkRow =
envelope -> {
return List.of(envelope.getEvent());
};
Binder<Iou.Contract> binder = Sql.<Iou.Contract>binder(
"insert into "
+ projectionTable.getName()
+ "(contract_id, event_id, amount, currency) "
+ "values (:contract_id, :event_id, :amount, :currency)")
.bind("contract_id", iou -> iou.id.contractId, Bind.String())
.bind("event_id", iou -> null, Bind.String())
.bind("amount", iou -> iou.data.amount, Bind.BigDecimal())
.bind("currency", iou -> iou.data.currency, Bind.String());
BatchRows<Iou.Contract, JdbcAction> batchRows =
UpdateMany.create(binder);
var control =
projector.projectRows(
batchSource,
contracts,
batchRows,
mkRow
);
The Project
function just returns the Iou.Contract
since we can use this directly for our insert statement.
Next we use a Binder
to bind the Iou.Contract
to the insert statement.
The UpdateMany.create
creates a BatchRow
s function that transforms a List
of rows, in this case Iou.Contract
s, into a single JdbcAction
.
projectRows
starts the projection process, converting created Iou.Contract
s into rows in the ious
table.
Configuration¶
The Custom Views library uses the Lightbend config library for configuration.
The library is packaged with a reference.conf
file which specifies default settings. The next sections describe the default configuration settings.
You can override the configuration by using an application.conf
file, see using the Lightbend config library for more details.
Database migration with Flyway¶
Flyway is used for database migration. Resources to create and migrate the database objects that the library needs internally are provided, for instance for the projection table that is used to persist Projections.
The internal SQL scripts are provided in the jar at /db/migration/projection.
The reference.conf file configures this by default, shown below:
projection {
# The name of the projection table which keeps track of all projections by projection-id
projection-table-name = "projection"
# database migration configuration
flyway {
# location of flyway migration schemas for internal bookkeeping (the projection-table).
internal-locations = ["db/migration/projection"]
# Override locations to provide your own flyway scripts.
locations = []
# If set to true, database migration is executed automatically.
migrate-on-start = true
}
}
The projection table is created automatically when a projection process is started with the project
, projectRows
, or projectEvents
method on Projector
.
Provide additional flyway locations with the projection.flyway.locations configuration parameter and bundle your own resources as explained
in the Flyway documentation. This makes it possible to create and migrate
database tables and other database objects required for your projections automatically when a projection is (re-)started.
If you do not want to use Flyway database migration, set projection.flyway.migrate-on-start to false. In that case you have to create the projection table yourself as well.
Batcher configuration¶
A Batch
consists of 1 to many events, and optionally contains a marker that indicates that a transaction has been committed on the ledger.
Both the batch-size
and the batch-interval
are configured in the reference.conf:
projection {
batch-size = 10000
batch-interval = 1 second
}
Dispatcher configuration for blocking operation¶
A default dedicated dispatcher for blocking operations (e.g. db operation) is configured in reference.conf:
projection {
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 16
}
throughput = 1
}
}
Ledger API Authorization¶
The client must provide an access token when authorization is required by the Ledger. For details of ledger authorization, please refer to Ledger Authorization documentation.
Provide access token to custom-view library¶
Applications can provide an access token when setting up the client. The example below shows how to set LedgerCallCredentials on the GrpcClientSettings.
var grpcClientSettings = GrpcClientSettings
.connectToServiceAt("localhost", 6865, system)
.withCallCredentials(new LedgerCallCredentials(accessToken));
var source = BatchSource.events(grpcClientSettings);
var control = projector.project(source, events, f);
Provide a newly retrieved access token when the existing one has expired¶
When an access token is expired, an application can retrieve a new access token with the stored refresh token. For details on the refresh token, please refer to Ledger auth-middleware documentation. With the new access token, an application can cancel the running projection and re-create a new one using the new token.
control.cancel().thenApply(done -> {
var sourceWithNewToken = BatchSource.events(
grpcClientSettings.withCallCredentials(new LedgerCallCredentials(newAccessToken))
);
return projector.project(sourceWithNewToken, events, f);
});