Creating a New CloudQuery Source Plugin in Go
This guide will help you develop a new source or destination integration for CloudQuery in Go. CloudQuery's modular architecture means that a source integration can be used to fetch data from any third-party API, and then be combined with a destination integration to insert data into any supported destination. We will cover the basics of how to get started, and then dive into some more advanced topics. We will also cover how to release your integration for use by the wider CloudQuery community.
This guide assumes that you are somewhat familiar with CloudQuery. If you are not, we recommend starting by reading the Quickstart guide and playing around with the CloudQuery CLI a bit first.
Though you by no means need to be an expert, you will also need some familiarity with Go. The official Go Tutorial (opens in a new tab) and A Tour of Go (opens in a new tab) are great resources to learn the basics and prepare your environment.
Core Concepts
Before we dive in, let's quickly cover some core concepts of CloudQuery integrations, so that they're familiar when we see our first example.
Syncs
A sync is the process that gets kicked off when a user runs cloudquery sync
. A sync is responsible for fetching data from a third-party API and inserting it into the destination (database, data lake, stream, etc.). When you write a source integration for CloudQuery, you will only need to implement the part that interfaces with the third-party API. The rest of the sync process, such as delivering to the destination database, is handled by the CloudQuery SDK.
Tables and Services
A table is the term CloudQuery uses for a collection of related data. In most databases it directly maps to an actual database table, but in some destinations it could be stored as a file, stream or other medium. Inside integration code, tables get grouped into collections called "services". Many REST APIs are logically grouped, and services are meant to map closely to these underlying API groupings. For example, an API might expose an endpoint called GET /v1/accounts/users
. The service in this case would be called accounts
, and the table users
. The final table name will be <plugin_name>_<service_name>_<table_name>
, e.g. myplugin_accounts_users
.
Services each get their own directory under the services
directory of your integration. Inside a service directory, every table will typically have its own .go
file. A table is defined as a function that returns a *schema.Table
(opens in a new tab).
Not every integration will have enough tables to justify grouping them into services. For integrations with only a few tables, it's fine to put them directly in the resources
directory. We will look at examples of this soon! For now, let's cover a few more important concepts.
Resolvers
Resolvers are functions associated with a table that get called when it's time to populate data for that table. There are two types of resolvers:
Table resolvers
Table resolvers are responsible for fetching data from the third-party API. A table resolver receives a res
(results) channel as argument, to which it should send all results from the API. For top-level tables, this function will only be called once per multiplexer client. For dependent tables, the resolver will be called once for each parent row, and the parent resource will be passed in as well. (More on this, and multiplexers, shortly.)
Column resolvers
Column resolvers are responsible for mapping data from the third-party API into the columns of the table. In most cases, you will not need to implement this, as the SDK will automatically map data from the struct passed in by the table resolver to the columns of the table. But in some cases, you may need to implement a custom column resolver to fetch additional data or do custom transformations.
Multiplexers
Multiplexers are a way to parallelize the fetching of data from the third-party API. Some top-level tables require multiple calls to fetch all their data. For example, a sync for the GitHub source integration that fetches data for multiple organizations, will need to make one call per organization to list all repositories. By multiplexing over organizations, these top-level queries can also be done in parallel. Each table defines the multiplexer that it should use. The CloudQuery integration SDK will then call the table resolver once for each client in the multiplexer.
Incremental Tables
Some APIs lend themselves to being synced incrementally: rather than fetch all past data on every sync, an incremental table will only fetch data that has changed since the last sync. This is done by storing some metadata in a state backend. The metadata is known as a cursor, and it marks where the last sync ended, so that the next sync can resume from the same point. Incremental syncs can be vastly more efficient than full syncs, especially for tables with large amounts of data. This is because only the data that's changed since the last sync needs to be retrieved, and in many cases this is a small subset of the overall dataset.
Incremental tables are great for efficiency, but add some additional complexity both on you and on your users. As the integration author, you should consider first whether the table needs to be incremental, then whether it can be made to be incremental. You can also read more about managing incremental tables.
Here are the basic steps of adding support for incremental tables to your integration:
- Within the integration client initialize the state client using
state.NewConnectedClient
orstate.NewConnectedClientWithOptions
(see example (opens in a new tab)). - You must call
Close()
on the returned client. - When defining incremental tables add
IsIncremental: true
to the table definition (see example (opens in a new tab)). - Add
IncrementalKey: true
to the column definition for the column that is used as the cursor (see example (opens in a new tab)). - In your table resolver manipulate the cursor using the
stateClient.GetKey()
andstateClient.SetKey()
methods. - You must call
stateClient.Flush()
in the integration client after the sync runs successfully (see example (opens in a new tab)). If you don't this then the new cursor will not be persisted to the state backend.
Creating Your First Integration for CloudQuery
In this section we will go through all the steps of building a simple source integration. We will start by creating a new integration from scratch, then we will add a table to it. To serve as a fun real-world example, we will create an integration that fetches comic data from the XKCD API (opens in a new tab).
Initializing Your Integration with the scaffold
Tool
The easiest way to get started writing an integration is to use the scaffold
tool. This tool will create a new integration directory with all the boilerplate code you need to get started. It will also create a services
directory with an example table.
The scaffold tool is available as a binary for Linux, macOS and Windows. You can download the latest version from the releases page (opens in a new tab).
On MacOS, you can install the tool using Homebrew:
brew install cloudquery/tap/scaffold
With the tool installed, you can create a new integration by running (replace <org>
and <name>
with values for your GitHub org and the name of your integration):
cq-scaffold source <org> <name>
This will create a new directory called cq-source-<name>
. You should then cd
into the directory and run go mod tidy
to download the dependencies.
At the time of writing, the scaffold creates a directory structure that looks like this:
.
├── Makefile
├── README.md
├── client
│  ├── client.go
│  └── spec.go
├── go.mod
├── main.go
└── resources
├── plugin
│  ├── client.go
│  └── plugin.go
└── services
└── table.go
Creating a Table
The scaffold tool creates a single table in the resources
directory. Let's take a look at the code in resources/table.go
that was generated for a new XKCD source integration:
package resources
import (
"context"
"fmt"
"github.com/cloudquery/plugin-sdk/schema"
)
func SampleTable() *schema.Table {
return &schema.Table{
Name: "xkcd_sample_table",
Resolver: fetchSampleTable,
Columns: []schema.Column{
{
Name: "column",
Type: schema.TypeString,
},
},
}
}
func fetchSampleTable(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
return fmt.Errorf("not implemented")
}
In this example, we have a table called xkcd_sample_table
with a single column called column
. The Resolver
field contains the resolver function that will be called to populate the table with data. The fetchSampleTable
function is a placeholder that returns an error. Our job as integration authors will be to add the correct columns for the table, and implement the resolver function.
Adding Columns to the Table
Adding columns to a table is easy, as long as you have a Go struct. The CloudQuery integration SDK will automatically map the fields of the struct to the columns of the table. In many cases an existing Go SDK will provide you with this struct. Then we can add a Transform
property that calls transformers.TransformWithStruct(&<StructName>{})
with a pointer to the struct. This will automatically map the fields of the struct to the columns of the table. For our hypothetical XKCD integration, we don't have an SDK to work with, so we will create our own struct inside a new xkcd
package. The struct will look like this:
type Comic struct {
Month string `json:"month"`
Num int `json:"num"`
Link string `json:"link"`
Year string `json:"year"`
News string `json:"news"`
SafeTitle string `json:"safe_title"`
Transcript string `json:"transcript"`
Alt string `json:"alt"`
Img string `json:"img"`
Title string `json:"title"`
Day string `json:"day"`
}
We'll also rename the SampleTable
function to Comics
, update some properties and add the Transformer
property:
func Comics() *schema.Table {
return &schema.Table{
Name: "xkcd_comics",
Resolver: fetchComics,
Transform: transformers.TransformWithStruct(&xkcd.Comic{}),
}
}
Writing a Table Resolver
With the columns defined, we can now write the resolver function. The resolver function is responsible for fetching the data from the API and returning it to CloudQuery. The resolver function takes a context.Context
object, a schema.ClientMeta
object, a *schema.Resource
object, and a chan<- interface{}
object. The context.Context
object is used to cancel the resolver function if the user cancels the sync. The schema.ClientMeta
object is a generic object that can be used to store any data that needs to be shared between resolvers. The *schema.Resource
object is the parent resource of the table, if any, and is used to implement parent-child relationships. In our case this will be nil
. The chan<- interface{}
object is used to send the data back to CloudQuery.
We won't go into all the details of making API calls here, but let's look at what a resolver function for our XKCD integration might look like:
func fetchComics(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
client := meta.(*Client)
latest, err := client.XKCD.GetLatestComic(ctx)
if err != nil {
return err
}
res <- latest
for i := 1; i < latest.Num; i++ {
comic, err := client.XKCD.GetComic(ctx, i)
if err != nil {
return err
}
res <- comic
}
return nil
}
In the above code, we are getting a list of Comics from the XKCD API and sending them to the CloudQuery over the res
channel. We first need to get the latest comic, then we can iterate through all the IDs from 1 to that number. You can send items to the channel one at a time, or as a slice of items. The sooner an is dispatched over the channel, the sooner it will be written to the destination(s), so we prefer to write them as soon as they are available. And as long as the struct sent matches the one used for the table, the CloudQuery SDK will handle the rest.
In the above example, we used a Client
struct that we haven't talked about yet. The Client
struct is used to store any data that needs to be shared between resolvers. For example, it may store the API key that we'll need to make API calls, or an SDK client that we'll use to make API calls. The Client
struct is defined in the client
directory, and is instantiated with a call to client.New
in the plugin
directory. In this case, we were using it to store an instance of the XKCD client. (We won't show the full XKCD client implementation here.)
Testing the CloudQuery Integration
There are two options for running an integration before as a developer before it is released: as a gRPC server, or as a standalone binary. We will briefly summarize both options here, or you can read about them in more detail in Running Locally.
Run the integration as a gRPC Server
This mode is especially useful for setting breakpoints your code for debugging, as you can run it in server mode from your IDE and attach a debugger to it. To run the integration as a gRPC server, you can run the following command in the root of the integration directory:
go run main.go serve
(Note: If you see errors about missing dependencies, you can run go mod tidy
to fix them.)
This will start a gRPC server on port 7777. You can then create a configuration file that sets the registry
and path
properties to point to this server. For example:
kind: source
spec:
name: "xkcd"
registry: "grpc"
path: "localhost:7777"
version: "v1.0.0"
tables:
["*"]
destinations:
- "sqlite"
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.18"
spec:
connection_string: ./db.sql
With the above configuration, we can now run cloudquery sync
as normal:
cloudquery sync config.yaml
Note that when running a source integration as a gRPC server, errors with the source integration will be printed to the console running the gRPC server, not to the CloudQuery log like usual.
Run the Integration as a Standalone Binary
To run the integration as a standalone binary, you can run the following command in the root of the integration directory:
go build
This will create a binary with the name of the integration directory (so, cq-source-<plugin-name>
). We can then refer to this binary by setting the registry to local
and path
as the path to the binary. Example:
kind: source
spec:
name: "xkcd"
registry: "local"
path: "/path/to/cq-source-xkcd"
version: "v1.0.0"
tables:
["*"]
destinations:
- "sqlite"
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.18"
spec:
connection_string: ./db.sql
With the above configuration, we can now run cloudquery sync
as normal:
cloudquery sync config.yaml
This time errors will be logged to cloudquery.log
, as usual. This mode is closest to how the integration will run when it is released, as the CLI is in charge of managing the integration process.
Writing a Column Resolver
Sometimes it is necessary, or useful, to add some additional information to a table. This doesn't happen often, however, and for the XKCD integration we will need to come up with a contrived example to show how this works. Let's imagine that, in addition to the Comic
struct fields, we also want to add whether the comic is a "good" comic or not. We can do this by adding a new column to the table, and then writing a resolver function for that column. The column will be called is_good
and will be a boolean. We'll add the column to the table definition like this:
func Comics() *schema.Table {
return &schema.Table{
Name: "xkcd_comics",
Resolver: fetchComics,
Transform: transformers.TransformWithStruct(&xkcd.Comic{}),
Columns: []schema.Column{
{
Name: "is_good",
Type: schema.TypeBool,
Resolver: resolveComicIsGood,
},
},
}
}
The Resolver
property is the function that will be called to resolve the column value. We'll define that function next:
func resolveComicIsGood(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, c schema.Column) error {
comic := resource.Item.(xkcd.Comic)
resource.Set(c.Name, strings.Contains(comic.Title, "XKCD"))
return nil
}
As big fans of meta-jokes, we define only comics with "XKCD" in the title to be good.
Adding Multiplexing
For our simple XKCD integration, multiplexing is not necessary. But let's say we were writing an integration that can fetch from multiple accounts. In that case, we may define an AccountMultiplex
multiplexer inside a new multiplexers.go
file in the client
directory:
func AccountMultiplex(meta schema.ClientMeta) []schema.ClientMeta {
var l = make([]schema.ClientMeta, 0)
client := meta.(*Client)
for _, acc := range client.accounts {
l = append(l, client.WithAccount(acc))
}
return l
}
This also requires a new WithAccount
method on the Client struct that sets an Account property on the client:
func (c *Client) WithAccount(account string) *Client {
newC := *c
newC.logger = c.logger.With().Str("account", account).Logger()
newC.Account = account
return &newC
}
It is also important to update the ID()
method on the client to include the account name. This is used in logging and error messages to identify the client, but also internally in the SDK to identify the client. We can update the ID()
method to include the account name like this
func (c *Client) ID() string {
return fmt.Sprintf("myplugin:%s", c.Account)
}
The exact format doesn't matter, as long as it is unique for every multiplexed value. Some integrations also include spec.Name
in the ID, to help identify the integration in scenarios where multiple instances are run in parallel.
Now we can instruct the integration SDK to use this multiplexer, where appropriate, by setting the Multiplex
property on the table to client.AccountMultiplex
:
func MyTable() *schema.Table {
return &schema.Table{
Name: "sample_table",
Resolver: fetchSampleTable,
Multiplex: client.AccountMultiplex,
// other properties ...
}
}
Inside the fetchSampleTable
resolver, we would then be able to get the current Account by accessing the Account
property on the client:
func fetchSampleTable(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
client := meta.(*Client)
account := client.Account
// ...
}
The GitHub integration multiplexers (opens in a new tab) can serve as a good example of how to implement and use multiplexing. In that case, some tables multiplex on organization, while others multiplex on organization and repository combined.
Publishing Your CloudQuery Integration
Visit the Publishing an integration to the Hub guide for instructions on how to publish your integration to the CloudQuery Hub.
Real-world Examples
A good way to learn how to create a new integrations is to look at the following examples:
- The XKCD Source integration (opens in a new tab) contains the full code from this tutorial.
- The Hacker News Source integration (opens in a new tab) is a good example of an integration with incremental tables.
- The K8s Source integration (opens in a new tab) is a good example of a more complex integration with many tables and mock tests.
This guide doesn't cover destination integrations yet, but you can also look at the following examples:
- The PostgreSQL Destination integration (opens in a new tab) is a good example of an "unmanaged" destination that handles batching itself
- The BigQuery Destination integration (opens in a new tab) is a good example of a "managed" destination that writes to each table in separate batches
Other source and destination integrations to reference can be found here (opens in a new tab)