Creating a New CloudQuery Source Integration in Python
This guide will help you develop a new source or destination integration for CloudQuery in Python. CloudQuery's modular architecture means that a source integration can be used to fetch data from any third-party API, and then be combined with a destination integration to insert data into any supported destination. We will cover the basics of how to get started, and then dive into some more advanced topics. We will also cover how to release your integration for use by the wider CloudQuery community.
This guide assumes that you are somewhat familiar with CloudQuery. If you are not, we recommend starting by reading the Quickstart guide and playing around with the CloudQuery CLI a bit first.
Though you by no means need to be an expert, you will also need some familiarity with Python. If you are new to Python, we recommend starting with the official Python tutorial (opens in a new tab).
Core Concepts
Before we dive in, let's quickly cover some core concepts of CloudQuery integrations, so that they're familiar when we see our first example.
Syncs
A sync is the process that gets kicked off when a user runs cloudquery sync
. A sync is responsible for fetching data from a third-party API and inserting it into the destination (database, data lake, stream, etc.). When you write a source integration for CloudQuery, you will only need to implement the part that interfaces with the third-party API. The rest of the sync process, such as delivering to the destination database, is handled by the CloudQuery SDK.
Tables and Services
A table is the term CloudQuery uses for a collection of related data. In most databases it directly maps to an actual database table, but in some destinations it could be stored as a file, stream or other medium. A table is defined by a name, a list of columns, and a resolver function. The resolver function is responsible for fetching data from the third-party API and sending it to CloudQuery. We will look at examples of this soon!
Every table will typically have its own .py
file inside the integration tables
directory.
Resolvers
Resolvers are functions associated with a table that get called when it's time to populate data for that table. There are two types of resolvers:
Table resolvers
Table resolvers are responsible for fetching data from the third-party API. In Python, a table resolver is a class that extends from cloudquery.sdk.scheduler.TableResolver
and implements the resolve
method. For top-level tables, resolve
will only be called once per multiplexer client. For dependent tables, the resolver will be called once for each parent row, and the parent resource will be passed in as well. (More on this, and multiplexers, shortly.)
Column resolvers
Column resolvers are responsible for mapping data from the third-party API into the columns of the table. In most cases, you will not need to implement this, as the SDK will automatically map data from the struct passed in by the table resolver to the columns of the table. But in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.
Multiplexers
Multiplexers are a way to parallelize the fetching of data from the third-party API. Some top-level tables require multiple calls to fetch all their data. For example, a sync for the GitHub source integration that fetches data for multiple organizations, will need to make one call per organization to list all repositories. By multiplexing over organizations, these top-level queries can also be done in parallel. Each table defines the multiplexer that it should use. The CloudQuery integration SDK will then call the table resolver once for each client in the multiplexer. Many integrations will not need to use multiplexers, but they are useful for integrations that need to fetch data for multiple accounts, organizations, or other entities.
Incremental Tables
Some APIs lend themselves to being synced incrementally: rather than fetch all past data on every sync, an incremental table will only fetch data that has changed since the last sync. This is done by storing some metadata in a state backend. The metadata is known as a cursor, and it marks where the last sync ended, so that the next sync can resume from the same point. Incremental syncs can be vastly more efficient than full syncs, especially for tables with large amounts of data. This is because only the data that's changed since the last sync needs to be retrieved, and in many cases this is a small subset of the overall dataset.
Incremental tables are great for efficiency, but add some additional complexity both on you and on your users. As the integration author, you should consider first whether the table needs to be incremental, then whether it can be made to be incremental. You can also read more about managing incremental tables.
Creating Your First Integration
As the Python SDK is still new, we don't yet have a integration scaffold generator. For now, we recommend starting by copying the code from an existing Python integration. Two reference integrations are Typeform (opens in a new tab) and Square (opens in a new tab). The Typeform integration is relatively simple, and the Square integration provides an example of generating tables from an OpenAPI spec.
Before running the integration locally, you will need to install its dependencies, preferably in a virtual environment:
pip3 install -r requirements.txt
If you copied a reference integration, this will include the CloudQuery integration SDK. If you are starting from scratch, you will need to install the SDK separately:
pip3 install cloudquery-plugin-sdk
Testing the Integration
There are two options for running a integration before as a developer before it is released: as a gRPC server, or as a standalone binary. We will briefly summarize both options here, or you can read about them in more detail in Running Locally.
Run the Integration as a gRPC Server
This mode is especially useful for setting breakpoints your code for debugging, as you can run it in server mode from your IDE and attach a debugger to it. To run the integration as a gRPC server, you can run the following command in the root of the integration directory:
python main.py serve
(Note: If you see errors about missing dependencies, make sure you are using the correct Python virtual environment (opens in a new tab) and/or have installed them via pip install -r requirements.txt
.)
This will start a gRPC server on port 7777. You can then create a configuration file that sets the registry
and path
properties to point to this server. For example:
kind: source
spec:
name: "my-integration"
registry: "grpc"
path: "localhost:7777"
version: "v1.0.0"
tables:
["*"]
destinations:
- "sqlite"
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.17"
spec:
connection_string: ./db.sql
With the above configuration, we can now run cloudquery sync
as normal:
cloudquery sync config.yaml
Note that when running a source integration as a gRPC server, errors with the source integration will be printed to the console running the gRPC server, not to the CloudQuery log like usual.
Run the Integration as a Docker Container
You can also build a Docker container for the integration, and then either run it directly as a gRPC server or via the docker
registry in a configuration file. See an example Docker file for a Python integration here (opens in a new tab).
We need to first build the image:
docker build -t my-integration:latest .
And then we can specify the docker
registry in our configuration file:
kind: source
spec:
name: "my-integration"
registry: "docker"
path: "my-integration:latest"
tables:
["*"]
destinations:
- "sqlite"
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.17"
spec:
connection_string: ./db.sql
Releasing and Deploying Your Integration
Releasing a Python integration for use by the wider CloudQuery community involves publishing a Docker image to any registry of your choice. We recommend using Docker Hub (opens in a new tab), but you can also use GitHub Container Registry (opens in a new tab) or any other registry that supports Docker images. The entrypoint should be python3 main.py
. You can see an example Dockerfile here (opens in a new tab).
Once published, users can then import your integration by specifying the image path in their configuration file together with the docker
registry, e.g.:
kind: source
spec:
name: cloudwidgets
path: ghcr.io/myorg/cloudwidgets
registry: docker
This will download and run the integration as a Docker container when cloudquery sync
is run.
Real-world Examples
A good way to learn how to create a new integrations in Python is to look at the following examples:
- The Python Source Integration Template (opens in a new tab) is an easy-to-use template for building your own integration.
- The Python XKCD Example (opens in a new tab) is an example of using the Python Source Integration Template to build a simple integration.
- The Typeform Source Integration (opens in a new tab) is an example of a simple integration that fetches data from a REST API.
- The Square Source Integration (opens in a new tab) is an example of a medium-complexity integration that fetches data from a REST API using an OpenAPI spec.