Singerlake (Part 2)
Improving the Singerlake Spec
In a previous post we talked about a potential 'Singerlake' implementation. This was then discussed both in the Meltano Slack Channel, and during Meltanos' Office Hours - 2023-01-18. Several significant improvements came out of those discussions, with special thanks to Alex (@z3z1ma) who was already thinking about this topic at the time.
1. Recording Schema Changes
As Alex pointed out, our initial spec does not handle schema changes in a way that is easy to consume data from or after a specific schema change. The first iteration specifies that:
If the
SCHEMA
changes mid-sync, a new file will be started, beginning with the newSCHEMA
message.
This means that, to consume only the data from a specific change, a consumer would need to read every file in reverse order until the desired change is reached.
We can do better.
Using a combination of the FarmHash algorithms by Google, which offers a fingerprint function yielding an unsigned int64 from a corpus of text, and Base85 encoding we can create a 7 character hash of each successive schema.
This is short enough to be included in the .singer.gz
file prefix, making our final scheme:
- Include a hash of the schema in the path/prefix of each Singer file.
raw/<tap_id>/<stream_name>/<stream_schema_hash>/<stream_name>-<first_record_timestamp>-<last_record_timestamp>.singer.gz
2. Adding Metadata
Our initial spec relies on being able to list all the .singer.gz
files below a given path when reading from the Singerlake.
However list
can be a slow and expensive operation in many object storage systems.
Therefore, by adding a manifest (containing a list of files) at a predictable point on the Singerlake path, we can short-cut expensive list operations for very little additional complexity.
A proposed scheme is as follows:
{
"versions": {"<hash>": "v1", "<hash>": "v2", ...},
"files": [...]
}
Crucially, the metadata file need not be required, as it can readily be rebuilt by traversing the Singerlake.
- [Optional] Include a manifest file at the root of each Stream path.
raw/<tap_id>/<stream_name>/manifest.json
Similarly, to simplify stream discovery when reading from the Singerlake, a catalogue (adhering to the Singer Spec should be included at the root of each Tap path. This too can be optional, as it duplicates information stored in the paths and files themselves:
- [Optional] Include a catalogue at the root of each Tap path.
raw/<tap_id>/catalogue.json
Finally, some additional metadata at the root of the Singerlake would be of benefit to both maintainers and consumers of the lake. This would primarily list the available Tap directories (again to save list/scan operations), but could also be used to capture information not extractable from the Singer Spec itself. Some useful annotations might include:
- Tap/integration owner (person or team).
- A description of the Taps' relevance/business use.
- Link to relevant documentation (e.g. Meltano Hub plugin page).
- Link to job status/logs.
This file would form the basis of any auto-documentation of a Singerlake (e.g. using Sphinx). Once again, this file is not strictly needed for the operation of a Singerlake, and can therefore be optional. It is not clear yet what schema this might take, so this detail is left for a future iteration.
- [Optional] Include a metadata file at the root of the Singerlake.
raw/singerlake.json
3. Supporting Safe Parallelization of Writes
One of the key benefits of using an object storage service, like Amazon S3, is the scalability these systems offer.
However, by leveraging a system with the ability to support many parallel reads and writes, we risk the integrity of the Singerlake and the consistency of data during reads.
Therefore some provision for a locking mechanism, guaranteeing consistent writes to a Tap by multiple writers, would allow us to best leverage the scalability of the storage layer.
In particular, this services a common pattern of parallelization whereby a single Tap is run on multiple compute instances partitioned by Stream.
Multiple reads are inherently supported, so long as parallel readers does not refresh their read copy of manifest.json
during execution.
- Writers are expected to create a lock file at the root of the Stream-schema directory before writing to that Stream, and to delete (relinquish) it afterwards. If a lock file already exists for a specified stream, that writer is expected to wait until the lock file is deleted, or until a reader-configured maximum wait time.
raw/<tap_id>/<stream_name>/<stream_schema_hash>/singerlake.lock
The lockfile should contain a timestamp of when it was acquired, who it was acquired by and when it was last refreshed, in JSON format:
{
"writer-id": "<writer-uuid>"
"acquired": "<timestamp>",
"refreshed": "<timestamp>"
}
Writers with an active lock may update the lockfiles' refreshed
attribute during processing, to indicate that the lock is still in active use.
Writers waiting to acquire a lock must check the lockfile refreshed
attribute for updates before invalidating an active (assumed stale) lock.
Note: Lockfiles require a strongly consistent storage system (i.e. read-after-write consistency) to function. Whilst S3 is strongly consistent by default, it does not handle race conditions; if two
PUT
requests are made simultaneously, the most recently received will be persisted. Therefore each writer must read the lockfile after first write and verify thewriter-id
before proceeding. If a differentwriter-id
is read than was written, the writer must wait for the lock to be relinquished.
Next Steps
The above changes take us closer to workable specification for tap-singerlake
and target-singerlake
in the general case 🚀
However several other questions remain unanswered, to be tackled in a future post.
These include, but likely aren't limited to:
- Support for
BATCH
message types, and non-Singerlake files in general (e.g. photos, audio clips, documents). - Ongoing maintenance of a Singerlake, including 'vacuuming' operations (to combine many small files into fewer larger ones) and verification of metadata (including rebuilding via scan/list operations).
- Support for projections of data in the Singerlake, enabling the processing of data from
raw
tostaging
topublished
data formats.
In particular, this would likely include moving from the 'operational' projection applied uniformly to captured data (i.e. ordering and indexing streams according to when data was read/captured) to an ordering informed by the data itself (primary keys such as created-at
or sequential id).
- Support for target projection formats other than Singer files (e.g. csv, parquet, avro).