Creating a New CloudQuery Source Plugin in Python
This guide will help you develop a new source or destination plugin for CloudQuery in Python. CloudQuery's modular architecture means that a source plugin can be used to fetch data from any third-party API, and then be combined with a destination plugin to insert data into any supported destination. We will cover the basics of how to get started, and then dive into some more advanced topics. We will also cover how to release your plugin for use by the wider CloudQuery community.
This guide assumes that you are somewhat familiar with CloudQuery. If you are not, we recommend starting by reading the Quickstart guide and playing around with the CloudQuery CLI a bit first.
Though you by no means need to be an expert, you will also need some familiarity with Python. If you are new to Python, we recommend starting with the official Python tutorial (opens in a new tab).
Before we dive in, let's quickly cover some core concepts of CloudQuery plugins, so that they're familiar when we see our first example.
A sync is the process that gets kicked off when a user runs
cloudquery sync. A sync is responsible for fetching data from a third-party API and inserting it into the destination (database, data lake, stream, etc.). When you write a source plugin for CloudQuery, you will only need to implement the part that interfaces with the third-party API. The rest of the sync process, such as delivering to the destination database, is handled by the CloudQuery SDK.
A table is the term CloudQuery uses for a collection of related data. In most databases it directly maps to an actual database table, but in some destinations it could be stored as a file, stream or other medium. A table is defined by a name, a list of columns, and a resolver function. The resolver function is responsible for fetching data from the third-party API and sending it to CloudQuery. We will look at examples of this soon!
Every table will typically have its own
.py file inside the plugin
Resolvers are functions associated with a table that get called when it's time to populate data for that table. There are two types of resolvers:
Table resolvers are responsible for fetching data from the third-party API. In Python, a table resolver is a class that extends
from cloudquery.sdk.scheduler.TableResolver and implements the
resolve method. For top-level tables,
resolve will only be called once per multiplexer client. For dependent tables, the resolver will be called once for each parent row, and the parent resource will be passed in as well. (More on this, and multiplexers, shortly.)
Column resolvers are responsible for mapping data from the third-party API into the columns of the table. In most cases, you will not need to implement this, as the SDK will automatically map data from the struct passed in by the table resolver to the columns of the table. But in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.
Multiplexers are a way to parallelize the fetching of data from the third-party API. Some top-level tables require multiple calls to fetch all their data. For example, a sync for the GitHub source plugin that fetches data for multiple organizations, will need to make one call per organization to list all repositories. By multiplexing over organizations, these top-level queries can also be done in parallel. Each table defines the multiplexer that it should use. The CloudQuery plugin SDK will then call the table resolver once for each client in the multiplexer. Many plugins will not need to use multiplexers, but they are useful for plugins that need to fetch data for multiple accounts, organizations, or other entities.
Some APIs lend themselves to being synced incrementally: rather than fetch all past data on every sync, an incremental table will only fetch data that has changed since the last sync. This is done by storing some metadata in a state backend. The metadata is known as a cursor, and it marks where the last sync ended, so that the next sync can resume from the same point. Incremental syncs can be vastly more efficient than full syncs, especially for tables with large amounts of data. This is because only the data that's changed since the last sync needs to be retrieved, and in many cases this is a small subset of the overall dataset.
Incremental tables are great for efficiency, but add some additional complexity both on you and on your users. As the plugin author, you should consider first whether the table needs to be incremental, then whether it can be made to be incremental. You can also read more about managing incremental tables.
As the Python SDK is still new, we don't yet have a plugin scaffold generator. For now, we recommend starting by copying the code from an existing Python plugin. Two reference plugins are Typeform (opens in a new tab) and Square (opens in a new tab). The Typeform plugin is relatively simple, and the Square plugin provides an example of generating tables from an OpenAPI spec.
Before running the plugin locally, you will need to install its dependencies, preferably in a virtual environment:
pip3 install -r requirements.txt
If you copied a reference plugin, this will include the CloudQuery plugin SDK. If you are starting from scratch, you will need to install the SDK separately:
pip3 install cloudquery-plugin-sdk
There are two options for running a plugin before as a developer before it is released: as a gRPC server, or as a standalone binary. We will briefly summarize both options here, or you can read about them in more detail in Running Locally.
This mode is especially useful for setting breakpoints your code for debugging, as you can run it in server mode from your IDE and attach a debugger to it. To run the plugin as a gRPC server, you can run the following command in the root of the plugin directory:
python main.py serve
(Note: If you see errors about missing dependencies, make sure you are using the correct Python virtual environment (opens in a new tab) and/or have installed them via
pip install -r requirements.txt.)
This will start a gRPC server on port 7777. You can then create a config file that sets the
path properties to point to this server. For example:
kind: source spec: name: "my-plugin" registry: "grpc" path: "localhost:7777" version: "v1.0.0" tables: ["*"] destinations: - "sqlite" --- kind: destination spec: name: sqlite path: cloudquery/sqlite registry: cloudquery version: "v2.4.16" spec: connection_string: ./db.sql
With the above configuration, we can now run
cloudquery sync as normal:
cloudquery sync config.yaml
Note that when running a source plugin as a gRPC server, errors with the source plugin will be printed to the console running the gRPC server, not to the CloudQuery log like usual.
You can also build a Docker container for the plugin, and then either run it directly as a gRPC server or via the
docker registry in a config file. See an example Docker file for a Python plugin here (opens in a new tab).
We need to first build the image:
docker build -t my-plugin:latest .
And then we can specify the
docker registry in our config file:
kind: source spec: name: "my-plugin" registry: "docker" path: "my-plugin:latest" tables: ["*"] destinations: - "sqlite" --- kind: destination spec: name: sqlite path: cloudquery/sqlite registry: cloudquery version: "v2.4.16" spec: connection_string: ./db.sql
Releasing a Python plugin for use by the wider CloudQuery community involves publishing a Docker image to any registry of your choice. We recommend using Docker Hub (opens in a new tab), but you can also use GitHub Container Registry (opens in a new tab) or any other registry that supports Docker images. The entrypoint should be
python3 main.py. You can see an example Dockerfile here (opens in a new tab).
Once published, users can then import your plugin by specifying the image path in their config file together with the
docker registry, e.g.:
kind: source spec: name: cloudwidgets path: ghcr.io/myorg/cloudwidgets registry: docker
This will download and run the plugin as a Docker container when
cloudquery sync is run.
A good way to learn how to create a new plugins in Python is to look at the following examples:
- The Python Source Plugin Template (opens in a new tab) is an easy-to-use template for building your own plugin.
- The Python XKCD Example (opens in a new tab) is an example of using the Python Source Plugin Template to build a simple plugin.
- The Typeform Source Plugin (opens in a new tab) is an example of a simple plugin that fetches data from a REST API.
- The Square Source Plugin (opens in a new tab) is an example of a medium-complexity plugin that fetches data from a REST API using an OpenAPI spec.