Orchestrating CloudQuery Syncs with Kestra

This tutorial shows how to run CloudQuery as a Kestra flow, using the HackerNews API as a source and CSV file as a destination. Kestra is an open-source event-driven orchestrator, which has a dedicated CloudQuery plugin.

You can use Kestra to:

  • schedule CloudQuery syncs in a simple, declarative way using a built-in code editor
  • run your syncs based on events from other systems e.g. a new file in your S3 bucket, new message in a queue, or a new webhook request
  • monitor your syncs from a friendly UI and get notified when they fail.

Prerequisites

Step 1: Install Kestra

Kestra needs CloudQuery API key stored as a secret. Unless you use the Enterprise edition, the API key needs to be Base64 encoded and passed as an environmenet variable.

To encode the CloudQuery API key stored in the CLOUDQUERY_API_KEY environment variable, run

export SECRET_CLOUDQUERY_API_KEY=$(echo $CLOUDQUERY_API_KEY | base64)

Then start Kestra container and pass in the new environment variable:

docker run --pull=always --rm -it -p 8080:8080 --user=root -v /var/run/docker.sock:/var/run/docker.sock -v /tmp:/tmp kestra/kestra:latest server local

To learn more about Kestra startup configuration and options, see the Kestra Getting started guide.

Once the container is running, open http://localhost:8080 in your browser.

Step 2: Create a Kestra flow

Inside the Kestra UI, go to the Flows page and click on Create. You can now paste the following content (make sure to update the connection string and set the CloudQuery API key):

tasks:
  - id: wdir
    type: io.kestra.plugin.core.flow.WorkingDirectory
    tasks:
      - id: config_files
        type: io.kestra.plugin.core.storage.LocalFiles
        inputs:
          config.yml: |
            kind: source
            spec:
              name: hackernews
              path: cloudquery/hackernews
              registry: cloudquery
              version: v3.7.23
              tables: ["*"]
              destinations:
                - file
              spec:
                item_concurrency: 10
                start_time: "{{ now() | dateAdd(-2, 'HOURS') | date("yyyy-MM-dd'T'HH:mm:ssX", timeZone="UTC") }}"
            ---
            kind: destination
            spec:
              name: "file"
              path: "cloudquery/file"
              registry: "cloudquery"
              version: "v5.4.22"
              write_mode: "append"
              spec:
                path: "/output/{{'{{TABLE}}/{{UUID}}.{{FORMAT}}'}}"
                format: "csv" # options: parquet, json, csv
      - id: run_sync
        type: io.kestra.plugin.cloudquery.CloudQueryCLI
        env:
          # Visit https://kestra.io/docs/concepts/secret to learn how to manage secrets in Kestra
          CLOUDQUERY_API_KEY: "{{ secret('CLOUDQUERY_API_KEY') }}"
        commands:
          - cloudquery sync config.yml --log-console --log-level=debug
        outputFiles:
          - "**/*.csv"

The flow will extract HackerNews Items published in the last two hours and load them into a CSV file. You may notice that the input to the config_files task is virtually identical to your CloudQuery configuration file.

Kestra flows are built using a declarative YAML syntax, in the same way as you know it from CloudQuery.

Once you configured the flow in the editor, click on Save.

Kestra Flow Editor Screenshot

Step 3: Run the flow

To manually start a Kestra flow, click on the Execute button in the top right corner. Then, confirm by clicking on Execute.

You should now see the sync running in the Executions tab. You can navigate to the Logs tab to check the logs and validate everything runs as expected. Also, you can download the extracted CSV files from the Outputs tab:

Kestra Flow Execution Screenshot

Step 4: Schedule the flow

To run the flow periodically, we can add a trigger to run it on a schedule. Back in the Flow editor, add the following section:

triggers:
  - id: schedule
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "0 9 * * *" # every day at 9am
    timezone: US/Eastern

This cron expression will run the flow every day at 09:00. You can use crontab.guru to generate cron expressions and replace the one in the example above. Kestra also supports these special values for cron:

@yearly
@annually
@monthly
@weekly
@daily
@midnight
@hourly

With this in place, remember to click Save again. Your CloudQuery sync will now run on a regular schedule.

Next steps

Caching

To speed up your syncs and save bandwidth, you can cache the downloaded plugins locally. Kestra does not provide cache for container-based tasks, but you can use a shared volume to cache the plugins.

To do this, you need to configure Kestra to enable volume mounts first.

Then configure your CloudQuery task to use the shared volume:

- id: run_sync
  type: io.kestra.plugin.cloudquery.CloudQueryCLI
  env:
    CLOUDQUERY_API_KEY: "{{ secret('CLOUDQUERY_API_KEY') }}"
  # Mount the shared volume to the CloudQuery container
  docker:
    image: ghcr.io/cloudquery/cloudquery:latest
    volumes:
      - "/tmp/cq:/app/.cq"
  commands:
  # Use the shared volume for downloading plugins
    - cloudquery sync config.yml --log-console --log-level=debug  --cq-dir=/app/.cq

Note: The shared volume is mounted to the /tmp/cq directory in the example above on the host machine. You can adjust the path to your needs.

Production deployment

This tutorial was just a quick introduction to help you get started with a CloudQuery deployment on Kestra. You can now create additional Kestra tasks to perform transformations, send notifications and more. For more information, check out the CloudQuery docs and the Kestra docs.

To productionize your Kestra deployment, you will likely need to deploy it to a cloud environment, such as Kubernetes. For more information, see the Kestra Deployment with Kubernetes guide and if you have any questions, you can reach the Kestra team via Community Slack.