Architecture in version 4
This page explains the architecture of OpenRefine 4, which is currently being developed and differs from the previous versions in important ways. Projects and their history are represented differently, which makes it possible to work on tables which do not fit in RAM, improve the user experience around long-running operations in various ways and the reproducibility of OpenRefine workflows more broadly.
Project history structure
The history of a project is a succession of different states of the project. Each state is a Grid, which comprises the following data:
- column headers, containing column metadata such as the column names;
- the project table itself;
- the overlay models stored alongside the table (such as a Wikibase schema).
The initial grid of the project is generated by the importer which created the project. Each following is obtained by applying a Change on the previous grid. A change is a function which takes the previous grid and produces the new grid.
Grids are immutable objects. This means that a change cannot mutate the grid in place: it must create a modified copy of the grid instead. We will see in the Runners section why can be done efficiently and does not require a lot of data copy in general. The immutability of grids provides useful guarantees about thread safety. For instance, this helps ensure that the evaluation of facets happened at a precise point in project history, and does not give a mixed view of different grid states.
In the previous architecture, changes were not only responsible for applying themselves to a grid, but also to revert themselves, meaning that they had to implement the reverse operation as well. Only requiring the forward application has important consequences:
- Implementing a change is easier, since we only need to implement the function in one direction. Implementing both directions and making sure they were indeed inverses of each other is non-trivial and gave rise to certain bugs, such as #2. Because we let extensions define their own changes, there was a significant risk that an extension implements a change incorrectly, leaving the project in an inconsistent state after using the undo feature.
- Many operations in OpenRefine are destructive, meaning that reverting them requires storing the deleted or altered data in the change object itself, to be able to restore it upon reversion. This is no longer necessary, making it possible for changes to be much lighter;
- To undo a change, we need to have kept the earlier version of the grid accessible. We will see in the Runners section why this can be done efficiently, without keeping all grids in memory explicitly.
Runners
The implementation of grids and of the transformations on them is pluggable. This means that the way a grid is concretely represented in memory can be adapted, depending on the resources available and the broader context of execution. This relies on the notion of Runner, which is essentially a factory class for grids which picks a particular implementation of the Grid interface when creating such grids.
The local runner is the default one. It is designed to be run when all of the data to transform is located on the same machine (where OpenRefine is running). Project data is read from disk in a lazy fashion, i.e. only when the corresponding grid values need to be displayed, aggregated or exported. Therefore it makes it possible to run OpenRefine on large datasets without the need for a large working memory (RAM). We explain the differences between lazy and eager evaluation in the dedicated section.
The Spark runner is designed to run on distributed datasets. Those datasets can be split into blocks which are stored on different machines. The execution of the workflow can be shared between the various machines, which form a Spark cluster. As in the local runner, changes are evaluated lazily.
The testing runner is a very simple runner, which loads all the data it works on in memory. It is not optimized for performance at all: it simply meets the specification of the runner interface in the simplest possible way. It is used in tests of operations, importers and other basic blocks of workflows. Its simplicity makes it fast enough on small testing datasets generally used in such tests. This runner also performs additional checks during execution, so that incorrect behaviours can be detected more easily during testing. Unlike the local and Spark runners, the testing runner uses eager evaluation.
The runner can be configured on the command-line with the -r
option (/r
on Windows), by providing the class name of the data model runner to use. In the refine.ini
configuration file, the corresponding option is refine.runner.class
.
The class names of the available runners are:
org.openrefine.runners.local.LocalRunner
for the local runner (default)org.openrefine.runners.spark.SparkRunner
for the Spark-based runnerorg.openrefine.runners.testing.TestingRunner
for the testing runner
A common test suite, gathering test cases that all runners should pass, is available in RunnerTestBase. This can be used by creating a test class which extends this base class.
Lazy and eager evaluation
Given that grids are immutable, changing even just a single cell in a grid requires making a copy of that grid. One strategy to achieve this, which is used by the testing runner, is to create a copy of all cells of the original grid, with the change on the desired cell, and return this as a new grid. But grids do not have to be backed by a list of lists containing the value of each cell.
Instead, the lazy runners use a different strategy. The returned grid is a proxy object, which contains a pointer to the original grid, and implements all operations on the new grid by forwarding them to the original grid, and adapting their results to match the ones expected of the new grid.
We call those runners lazy because the operations on their grids are lazily evaluated: the actual computation does not happen when creating the modified grid, but is delayed to the later use of the data in the grid (such as running an aggregation on it to compute facet statistics, or retrieving a range of rows).
This has a few important consequences:
- the proxy objects representing those transformed grids are very lightweight: they just hold the recipe to compute the new grid out of the old one, not the data itself. This means that we can generally afford to keep many such proxy objects loaded in memory. In a lot of cases, OpenRefine will indeed hold a Grid object for every single step of a project's history. As a result, it becomes possible to undo a transformation even though Change objects do not support reverting their effects: we simply roll back to the earlier Grid object.
- when the history grows longer, the project data is accessed via a long chain of proxy objects, meaning that the project workflow is re-executed many times. When that becomes too expensive, it is possible to turn on caching in one of the intermediate grids, meaning that we compute all the contents of that grid once and store that in memory or on disk. This strategy is detailed in the Memory management section.
- with lazy evaluation, some computations required by changes can be evaluated many times (for instance, each time facets are refreshed). This is undesirable for expensive or effectful computations (for instance, fetching data from a URL). To prevent this from happening, changes are allowed to store the results of such expensive computations in ChangeData objects. Such objects are evaluated only once, persisted on disk, and the new grid can be derived by combining the old grid with some ChangeData objects (generally by joining them).
ChangeData objects
A ChangeData object is a data structure which holds change data which should be computed only once given the cost or side-effects of recomputing it multiple times. It is for instance used to store reconciliation data, data fetched from URLs, or the results of evaluating expressions which are not guaranteed to be pure.
ChangeData objects are able to index arbitrary data items associated to rows or records in a Grid. To each row or record id corresponds at most one such item. The typical lifecycle for a ChangeData object is as follows:
- The ChangeData object is derived from a Grid by applying a function row or record-wise to the grid, using Grid::mapRows or Grid::mapRecords. This is typically a lazy operation, meaning that the expensive computations involved are not done yet;
- Immediately after, it is saved to a file within the project's storage directory (see Project serialization). This saving can take time as this triggers the expensive computations applied at the earlier step.
- As soon as the saving has started (and while it is still in progress), the ChangeData can be read back from its serialization, using Runner.loadChangeData. Only the items computed and written at that stage are read back.
- The ChangeData can be joined back with the Grid it was derived from, inserting the change data into cells (for instance by creating a new column where fetched URLs are stored). This is done by means of methods such as Grid::join.
Project serialization
A single grid is serialized in its own directory, with the following components:
metadata.json
, containing the column model and overlay models associated with the grid;grid/part-*.gz
files, which are Gzip-compressed text files containing for each line the JSON serialization of a row in the project.
The fact that the grid's serialization is spread on multiple files makes it possible to process it in parallel (with one worker per partition). Rows are sorted and each partition contains a sequential chunk of rows, meaning that to access a given row by row number, it is possible to read the relevant partition file only. The number of partitions depends on the size of the project and the options of the runner.
A project is serialized in its own directory too, with the following components:
history.json
, containing the list of history entries, which includes the associated Changes and Operations;initial/
, a directory where the first grid of the project (the one created by the importer) is stored;changes/
, a directory where all the ChangeData objects are serialized. They are stored in subdirectories determined by the history entry id they are associated to.cache/
, a directory where grids other than the initial one can be stored. This is useful when the project history grows longer to avoid recomputing all operations from the initial grid at every HTTP request.
Memory management
TODO describe here the caching strategy (being reworked)
Local runner
The local runner is the default one, as it is designed to be efficient in OpenRefine's intended usage conditions: running locally on the machine where the data cleaning is being done. Its design is inspired by Spark. Spark itself could not be used in place of this runner because its support for distributed computations and redundancy adds significant overheads which make the tool less responsive when run locally. Also, OpenRefine relies on the order of rows in many contexts, and row order is not preserved by many Spark primitives, making its abstractions less useful for OpenRefine as default implementation.
Options
The following configuration parameters can be used with this runner:
Configuration key | Default value | Description |
---|---|---|
refine.runner.defaultParallelism | 4 | how many partitions datasets should generally be split, unless they are very small or very big |
refine.runner.minSplitSize | 4096 | minimum size of a partition in bytes. Datasets which are smaller than this value will not be split at all. |
refine.runner.maxSplitSize | 16777216 | maximum size of a partition in bytes. Datasets which are larger than defaultParallelism * maxSplitSize will be split in more partitions than the default parallelism. |
Partitioned Lazy Lists
The core data structure which underpins the lazy representation of Grids and ChangeData objects in the local runner is the Partitioned Lazy List (PLL). It is a lightweight version of Spark's Resilient Distributed Dataset (RDD). It is:
- a list, because it represents an ordered collections of objects;
- lazy, because by default it does not store its contents as explicit objects in memory. Instead, the elements are computed on-demand, when they are accessed.
- partitioned, because it divides its contents into contiguous groups of elements called partitions. Each partition can be enumerated from independently, making it possible to run processes in parallel on different parts of the list.
In contrast to RDDs, PLLs are:
- not distributed: all of the data must be locally accessible, all the computations are happening in the same JVM
- not resilient: there is no support for redundancy.
The concurrency in PLLs is implemented with Java threads. When instantiated, the local runner starts a thread pool which is used on demand when computations are executed.