Skip to content

Singerlake (Part 1)

Edit: 2023-02-15

A 'Part 2' of this post is now available, integrating feedback from the Meltano community on this first iteration.

Designing a 'Singerlake' Spec

Managing data on behalf of an organization is a hard problem. As the total number of integrations increases, and pace of change of each integration increases too, the process of collecting and curating data can quickly get out of hand. The Singer Spec and its broad ecosystem of Taps/Targets provides a ready answer to the first problem; it drastically reduces the cost of integrations. In the process of developing tap-singer-jsonl and target-singer-jsonl I can more clearly see how the Singer Spec can also help with the second problem - managing change.

The Singer Spec

Already familiar with the Spec? Skip to Singer-formatted Files.

Fundamentally the Singer Spec describes a protocol of messages on top of a common interchange format (JSON). This interchange format allows the separation of concerns between reading data from a source and writing it to a destination. So long as the Tap (reader) can translate source data into the interchange format, and the Target (writer) can interpret that format for a given destination, Taps and Targets can be paired in any combination. From the Singer website:

Singer describes how data extraction scriptsβ€”called β€œtaps” β€”and data loading scriptsβ€”called β€œtargets”— should communicate, allowing them to be used in any combination to move data from any source to any destination.

At its core, the schema is made up of three messages: SCHEMA, RECORD and STATE. The SCHEMA message defines the structure of the data, the RECORD message contains the actual data being transferred, and the STATE message keeps track of the progress of an extraction.

During normal operation, data only exists in the exchange format very briefly. Only long enough to relay data from Tap to Target in a unix pipe:

tap-exchangeratesapi | target-csv

However this need not necessarily be the case. Singer messages can be written to files, for "playback" at a later date.

Singer-formatted Files

Writing Singer messages to file is already a common practice amongst Tap and Target developers, as a means of testing and debugging their work. Extracting "raw" messages can be as simple as:

tap-exchangeratesapi > messages.jsonl

A Tap might produce the following messages.jsonl file:

// messages.json
{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Mike"}}
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Philadelphia"}}
{"type": "STATE", "value": {"users": 2, "locations": 1}}

This is already very useful; I now have the ability to replay these messages at any time in the future, even if the upstream data model (the SCHEMA) changes. It also opens the possibility of replaying to many different Targets:

cat messages.jsonl | target-csv
cat messages.jsonl | target-parquet

However some limitations present themselves:

  1. Having multiple streams (users and locations) in the same file makes replaying just one stream inefficient; every line of the file must be read to recover a subset.
  2. By not recording the time each record was observed, I lose the ability to replay multiple Singer files in chronological order, and to filter replayed files by extract time.
  3. Storing STATE messages is redundant, as they pertain to the bookmark position of the upstream Tap rather than anything specific about the data extracted. These are not useful outside of the context of a recurring Tap>Target pipeline.
  4. Managing many small Singer-formatted files at scale is a challenge all of its own.
  5. JSONL (one JSON object per Line) is not a particularly efficient storage format.

What is a "Singerlake"?

As you may have spotted, the title "Singerlake" is derived from the term "data lake"; a common paradigm in Data Engineering. According to Wikipedia:

A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or files.

Put another way, a data lake is an organizational structure applied to raw files, designed to facilitate easy discovery and consumption of the data in those files. If you have experience with data warehousing, or even organizing a collection of photo or music files, what follows will feel familiar to you.

A Singerlake is an organized collection of .singer files.

A Singerlake Spec

Combining the flexibility of the Singer Spec with accumulated wisdom from the data lake pattern, we can capture and organize data in a near-raw format to be replayed into current and future destinations without loss or degradation.

1. Define a .singer file format πŸ“‚

Our messages.json from earlier is a valid Singer-formatted JSONL file, however it will not serve our purposes well. Therefore, to facilitate easy discovery and consumption of the data, we can:

  1. Write data to file by stream, omitting STATE messages entirely and beginning each file with a SCHEMA message.
  2. If the SCHEMA changes mid-sync, a new file will be started, beginning with the new SCHEMA message.
  3. Files can be separated into chunks, to avoid very large files, so long as each begins with a SCHEMA message.

Following this scheme, the above messages.jsonl becomes:

// users.singer
{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Mike"}}
// locations.singer
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Philadelphia"}}

2. Keep track of Time πŸ•°οΈ

In order to be able to discover and replay files in the order that they were observed, we need to collect timestamps. Both for each record and at the filename level. For the former, the Singer Spec defines and optional RECORD message property:

time_extracted Optional. The time this record was observed in the source. This should be an RFC3339 formatted date-time, like "2017-11-20T16:45:33.000Z".

For the latter, ISO 8601 allows a compact representation with no separators (except for the T and Z denoting 'time' and 'UTC'), which is well suited to filenames:

>>> datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
'20230117T180903Z'

Therefore, we can specify two new directives:

  1. RECORD messages must include a time_extracted. If none is included, it must be added with the 'recieved time' at point of write.
  2. Filenames must include the first and last time_extracted for the RECORD messages it contains.

Following this scheme, our .singer files can be named:

.
β”œβ”€β”€ 20230117T180903Z-20230117T181107Z.singer
└── 20230117T181109Z-20230117T181208Z.singer

3. Manage many small Singer-formatted files πŸ—„οΈ

Just as a database instance can be divided into a hierarchy of databases, schema and tables, it is advantageous to arrange our .singer files to facilitate easy discovery and consumption of the data. A well-ordered hierarchy would include:

  1. The stream name in both the filename and as a containing folder/prefix.
  2. The upstream Tap identifier as parent to these stream folders/prefixes.
  3. A top raw root folder/prefix, in line with the common data lake practice of separating raw, staging and analytics data layers.

Following this scheme, our folder structure would be:

.
└── raw
    └── tap-example--meltanolabs
        β”œβ”€β”€ locations
        β”‚Β Β  └── locations-20230117T181109Z-20230117T181208Z.singer
        └── users
            └── users-20230117T180903Z-20230117T181107Z.singer

Or, in template form:

raw/<tap_id>/<stream_name>/<stream_name>-<first_record_timestamp>-<last_record_timestamp>.singer

Note: the <tap_id> used in the example above follows the Meltano convention of <tap_name>--<variant_name>. This is to disambiguate multiple variants of the same Tap, which generally have similar but incompatible schemas.

4. Improve the efficiency of the JSONL interchange format πŸ—œοΈ

This one is straight-forward and reasonably uncontentious:

  1. Use compression.

In the spirit of being prescriptive (this is a specification after all) I'd be inclined to mandate the bzip2 algorithm, being a good match for this use case. bzip2 is more more computationally expensive than the more common gzip, but offers improved compression ("within 10% to 15% of the best available techniques"1) whilst being 3x faster to decompress than compress. Given the verbosity of JSONL, and the common intention with data lakes to "write once, read occasionally, delete never", it seems worthwhile to default to bzip2, but allow gzip.

So our final scheme would be:

.
└── raw
    └── tap-example--meltanolabs
        β”œβ”€β”€ locations
        β”‚Β Β  └── locations-20230117T181109Z-20230117T181208Z.singer.gz
        └── users
            └── users-20230117T180903Z-20230117T181107Z.singer.bz2

Conveniently both are natively supported in Python and by the smart_open library I am using in tap/target-singer-jsonl.

Next Steps

tap-singer-jsonl and target-singer-jsonl are a good first stab at a Singerlake toolchain. I intend to iterate on them in the above direction, as time permits.