Skip to content

Commit

Permalink
Merge pull request #234 from dbt-labs/feature/job-chaining
Browse files Browse the repository at this point in the history
Add job chaining support
  • Loading branch information
b-per authored Feb 5, 2024
2 parents 2c6624e + 49daa6c commit aaef44f
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 29 deletions.
13 changes: 11 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

All notable changes to this project will be documented in this file.

## [Unreleased](https://github.com/dbt-labs/terraform-provider-dbtcloud/compare/v0.2.19...HEAD)
## [Unreleased](https://github.com/dbt-labs/terraform-provider-dbtcloud/compare/v0.2.20...HEAD)

## [0.2.20](https://github.com/dbt-labs/terraform-provider-dbtcloud/compare/v0.2.19...v0.2.20)

## [0.2.18](https://github.com/dbt-labs/terraform-provider-dbtcloud/compare/v0.2.18...v0.2.19)
## Changes

- Add support for job chaining and `job_completion_trigger_condition` (feature is in closed Beta in dbt Cloud as of 5 FEB 2024)

## Documentations

- Improve docs for jobs

## [0.2.19](https://github.com/dbt-labs/terraform-provider-dbtcloud/compare/v0.2.18...v0.2.19)

## Changes

Expand Down
10 changes: 10 additions & 0 deletions docs/data-sources/job.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ description: |-
- `description` (String) Long description for the job
- `environment_id` (Number) ID of the environment the job is in
- `id` (String) The ID of this resource.
- `job_completion_trigger_condition` (Set of Object) Which other job should trigger this job when it finishes, and on which conditions. (see [below for nested schema](#nestedatt--job_completion_trigger_condition))
- `name` (String) Given name for the job
- `self_deferring` (Boolean) Whether this job defers on a previous run of itself (overrides value in deferring_job_id)
- `timeout_seconds` (Number) Number of seconds before the job times out
- `triggers` (Map of Boolean) Flags for which types of triggers to use, keys of github_webhook, git_provider_webhook, schedule, custom_branch_only
- `triggers_on_draft_pr` (Boolean) Whether the CI job should be automatically triggered on draft PRs

<a id="nestedatt--job_completion_trigger_condition"></a>
### Nested Schema for `job_completion_trigger_condition`

Read-Only:

- `job_id` (Number)
- `project_id` (Number)
- `statuses` (Set of String)
50 changes: 47 additions & 3 deletions docs/resources/job.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ description: |-
Those improvements include modifications to deferral which was historically set at the job level and will now be set at the environment level.
Deferral can still be set to "self" by setting `self_deferring` to `true` but with the new approach, deferral to other runs need to be done with `deferring_environment_id` instead of `deferring_job_id`.


~> As of beginning of February 2024, job chaining with `job_completion_trigger_condition` is in private beta and not available to all users.
<br/>
<br/>
This notice will be removed once the feature is generally available.


## Example Usage

```terraform
Expand Down Expand Up @@ -71,6 +78,33 @@ resource "dbtcloud_job" "ci_job" {
schedule_days = [0, 1, 2, 3, 4, 5, 6]
schedule_type = "days_of_week"
}
# a job that is set to be triggered after another job finishes
# this is sometimes referred as 'job chaining'
resource "dbtcloud_job" "downstream_job" {
environment_id = dbtcloud_environment.project2_prod_environment.environment_id
execute_steps = [
"dbt build -s +my_model"
]
generate_docs = true
name = "Downstream job in project 2"
num_threads = 32
project_id = dbtcloud_project.dbt_project2.id
run_generate_sources = true
triggers = {
"custom_branch_only" : false,
"github_webhook" : false,
"git_provider_webhook" : false,
"schedule" : false
}
schedule_days = [0, 1, 2, 3, 4, 5, 6]
schedule_type = "days_of_week"
job_completion_trigger_condition {
job_id = dbtcloud_job.daily_job.id
project_id = dbtcloud_project.dbt_project.id
statuses = ["success"]
}
}
```

<!-- schema generated by tfplugindocs -->
Expand All @@ -82,7 +116,7 @@ resource "dbtcloud_job" "ci_job" {
- `execute_steps` (List of String) List of commands to execute for the job
- `name` (String) Job name
- `project_id` (Number) Project ID to create the job in
- `triggers` (Map of Boolean) Flags for which types of triggers to use, possible values are `github_webhook`, `git_provider_webhook`, `schedule` and `custom_branch_only`. <br>`custom_branch_only` is only relevant for CI jobs triggered automatically on PR creation to only trigger a job on a PR to the custom branch of the environment.
- `triggers` (Map of Boolean) Flags for which types of triggers to use, possible values are `github_webhook`, `git_provider_webhook`, `schedule` and `custom_branch_only`. <br>`custom_branch_only` is only relevant for CI jobs triggered automatically on PR creation to only trigger a job on a PR to the custom branch of the environment. To create a job in a 'deactivated' state, set all to `false`.

### Optional

Expand All @@ -91,9 +125,10 @@ resource "dbtcloud_job" "ci_job" {
- `deferring_job_id` (Number) Job identifier that this job defers to (legacy deferring approach)
- `description` (String) Description for the job
- `generate_docs` (Boolean) Flag for whether the job should generate documentation
- `is_active` (Boolean) Flag for whether the job is marked active or deleted
- `is_active` (Boolean) Flag for whether the job is marked active or deleted. To create/keep a job in a 'deactivated' state, check the `triggers` config.
- `job_completion_trigger_condition` (Block Set, Max: 1) Which other job should trigger this job when it finishes, and on which conditions (sometimes referred as 'job chaining'). (see [below for nested schema](#nestedblock--job_completion_trigger_condition))
- `num_threads` (Number) Number of threads to use in the job
- `run_generate_sources` (Boolean) Flag for whether the job should run generate sources
- `run_generate_sources` (Boolean) Flag for whether the job should add a `dbt source freshness` step to the job. The difference between manually adding a step with `dbt source freshness` in the job steps or using this flag is that with this flag, a failed freshness will still allow the following steps to run.
- `schedule_cron` (String) Custom cron expression for schedule
- `schedule_days` (List of Number) List of days of week as numbers (0 = Sunday, 7 = Saturday) to execute the job at if running on a schedule
- `schedule_hours` (List of Number) List of hours to execute the job at if running on a schedule
Expand All @@ -108,6 +143,15 @@ resource "dbtcloud_job" "ci_job" {

- `id` (String) The ID of this resource.

<a id="nestedblock--job_completion_trigger_condition"></a>
### Nested Schema for `job_completion_trigger_condition`

Required:

- `job_id` (Number) The ID of the job that would trigger this job after completion.
- `project_id` (Number) The ID of the project where the trigger job is running in.
- `statuses` (Set of String) List of statuses to trigger the job on. Possible values are `success`, `error` and `canceled`.

## Import

Import is supported using the following syntax:
Expand Down
29 changes: 28 additions & 1 deletion examples/resources/dbtcloud_job/resource.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,31 @@ resource "dbtcloud_job" "ci_job" {
# this is not going to be used when schedule is set to false
schedule_days = [0, 1, 2, 3, 4, 5, 6]
schedule_type = "days_of_week"
}
}

# a job that is set to be triggered after another job finishes
# this is sometimes referred as 'job chaining'
resource "dbtcloud_job" "downstream_job" {
environment_id = dbtcloud_environment.project2_prod_environment.environment_id
execute_steps = [
"dbt build -s +my_model"
]
generate_docs = true
name = "Downstream job in project 2"
num_threads = 32
project_id = dbtcloud_project.dbt_project2.id
run_generate_sources = true
triggers = {
"custom_branch_only" : false,
"github_webhook" : false,
"git_provider_webhook" : false,
"schedule" : false
}
schedule_days = [0, 1, 2, 3, 4, 5, 6]
schedule_type = "days_of_week"
job_completion_trigger_condition {
job_id = dbtcloud_job.daily_job.id
project_id = dbtcloud_project.dbt_project.id
statuses = ["success"]
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/hashicorp/terraform-plugin-docs v0.16.0
github.com/hashicorp/terraform-plugin-log v0.9.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.30.0
github.com/samber/lo v1.39.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3VRLtww=
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
55 changes: 54 additions & 1 deletion pkg/data_sources/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"strconv"

"github.com/dbt-labs/terraform-provider-dbtcloud/pkg/dbt_cloud"
"github.com/dbt-labs/terraform-provider-dbtcloud/pkg/utils"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/samber/lo"
)

var jobSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -71,6 +73,31 @@ var jobSchema = map[string]*schema.Schema{
Computed: true,
Description: "Whether the CI job should be automatically triggered on draft PRs",
},
"job_completion_trigger_condition": &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"job_id": {
Type: schema.TypeInt,
Computed: true,
Description: "The ID of the job that would trigger this job after completion.",
},
"project_id": {
Type: schema.TypeInt,
Computed: true,
Description: "The ID of the project where the trigger job is running in.",
},
"statuses": {
Type: schema.TypeSet,
Elem: &schema.Schema{Type: schema.TypeString},
Computed: true,
Description: "List of statuses to trigger the job on.",
},
},
},
Description: "Which other job should trigger this job when it finishes, and on which conditions.",
},
}

func DatasourceJob() *schema.Resource {
Expand All @@ -80,7 +107,11 @@ func DatasourceJob() *schema.Resource {
}
}

func datasourceJobRead(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
func datasourceJobRead(
ctx context.Context,
d *schema.ResourceData,
m interface{},
) diag.Diagnostics {
c := m.(*dbt_cloud.Client)

var diags diag.Diagnostics
Expand Down Expand Up @@ -132,6 +163,28 @@ func datasourceJobRead(ctx context.Context, d *schema.ResourceData, m interface{
return diag.FromErr(err)
}

if job.JobCompletionTrigger == nil {
if err := d.Set("job_completion_trigger_condition", nil); err != nil {
return diag.FromErr(err)
}
} else {
triggerCondition := job.JobCompletionTrigger.Condition
// we convert the statuses from ID to human-readable strings
statusesNames := lo.Map(triggerCondition.Statuses, func(status int, idx int) any {
return utils.JobCompletionTriggerConditionsMappingCodeHuman[status]
})
triggerConditionMap := map[string]any{
"job_id": triggerCondition.JobID,
"project_id": triggerCondition.ProjectID,
"statuses": statusesNames,
}
triggerConditionSet := utils.JobConditionMapToSet(triggerConditionMap)

if err := d.Set("job_completion_trigger_condition", triggerConditionSet); err != nil {
return diag.FromErr(err)
}
}

d.SetId(jobId)

return diags
Expand Down
5 changes: 5 additions & 0 deletions pkg/data_sources/job_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func TestDbtCloudJobDataSource(t *testing.T) {
resource.TestCheckResourceAttr("data.dbtcloud_job.test", "name", randomJobName),
resource.TestCheckResourceAttr("data.dbtcloud_job.test", "timeout_seconds", "180"),
resource.TestCheckResourceAttr("data.dbtcloud_job.test", "triggers_on_draft_pr", "false"),
resource.TestCheckResourceAttr(
"data.dbtcloud_job.test",
"job_completion_trigger_condition.#",
"0",
),
)

resource.ParallelTest(t, resource.TestCase{
Expand Down
62 changes: 44 additions & 18 deletions pkg/dbt_cloud/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,36 @@ type JobExecution struct {
Timeout_Seconds int `json:"timeout_seconds"`
}

type JobCompletionTrigger struct {
Condition JobCompletionTriggerCondition `json:"condition"`
}

type JobCompletionTriggerCondition struct {
JobID int `json:"job_id"`
ProjectID int `json:"project_id"`
Statuses []int `json:"statuses"`
}

type Job struct {
ID *int `json:"id"`
Account_Id int `json:"account_id"`
Project_Id int `json:"project_id"`
Environment_Id int `json:"environment_id"`
Name string `json:"name"`
Description string `json:"description"`
Execute_Steps []string `json:"execute_steps"`
Dbt_Version *string `json:"dbt_version"`
Triggers JobTrigger `json:"triggers"`
Settings JobSettings `json:"settings"`
State int `json:"state"`
Generate_Docs bool `json:"generate_docs"`
Schedule JobSchedule `json:"schedule"`
Run_Generate_Sources bool `json:"run_generate_sources"`
Deferring_Job_Id *int `json:"deferring_job_definition_id"`
DeferringEnvironmentId *int `json:"deferring_environment_id"`
Execution JobExecution `json:"execution"`
TriggersOnDraftPR bool `json:"triggers_on_draft_pr"`
ID *int `json:"id"`
Account_Id int `json:"account_id"`
Project_Id int `json:"project_id"`
Environment_Id int `json:"environment_id"`
Name string `json:"name"`
Description string `json:"description"`
Execute_Steps []string `json:"execute_steps"`
Dbt_Version *string `json:"dbt_version"`
Triggers JobTrigger `json:"triggers"`
Settings JobSettings `json:"settings"`
State int `json:"state"`
Generate_Docs bool `json:"generate_docs"`
Schedule JobSchedule `json:"schedule"`
Run_Generate_Sources bool `json:"run_generate_sources"`
Deferring_Job_Id *int `json:"deferring_job_definition_id"`
DeferringEnvironmentId *int `json:"deferring_environment_id"`
Execution JobExecution `json:"execution"`
TriggersOnDraftPR bool `json:"triggers_on_draft_pr"`
JobCompletionTrigger *JobCompletionTrigger `json:"job_completion_trigger_condition"`
}

func (c *Client) GetJob(jobID string) (*Job, error) {
Expand Down Expand Up @@ -115,6 +126,7 @@ func (c *Client) CreateJob(
selfDeferring bool,
timeoutSeconds int,
triggersOnDraftPR bool,
jobCompletionTriggerCondition map[string]any,
) (*Job, error) {
state := STATE_ACTIVE
if !isActive {
Expand Down Expand Up @@ -176,6 +188,19 @@ func (c *Client) CreateJob(
Timeout_Seconds: timeoutSeconds,
}

jobCompletionTrigger := &JobCompletionTrigger{}
if len(jobCompletionTriggerCondition) == 0 {
jobCompletionTrigger = nil
} else {
jobCompletionTrigger = &JobCompletionTrigger{
Condition: JobCompletionTriggerCondition{
JobID: jobCompletionTriggerCondition["job_id"].(int),
ProjectID: jobCompletionTriggerCondition["project_id"].(int),
Statuses: jobCompletionTriggerCondition["statuses"].([]int),
},
}
}

newJob := Job{
Account_Id: c.AccountID,
Project_Id: projectId,
Expand All @@ -191,6 +216,7 @@ func (c *Client) CreateJob(
Run_Generate_Sources: runGenerateSources,
Execution: jobExecution,
TriggersOnDraftPR: triggersOnDraftPR,
JobCompletionTrigger: jobCompletionTrigger,
}
if dbtVersion != "" {
newJob.Dbt_Version = &dbtVersion
Expand Down
Loading

0 comments on commit aaef44f

Please sign in to comment.