Operators

call>: Calls another workflow

call>: operator calls another workflow.

This operator embeds another workflow as a subtask.

# workflow1.dig
+step1:
  call>: another_workflow.dig
+step2:
  call>: common/shared_workflow.dig
# another_workflow.dig
+another:
  sh>: ../scripts/my_script.sh
call>: FILE

Path to a workflow definition file. File name must end with .dig. If called workflow is in a subdirectory, the workflow uses the subdirectory as the working directory. For example, a task has call>: common/called_workflow.dig, using queries/data.sql file in the called workflow should be ../queries/data.sql.

Example: another_workflow.dig

require>: Depends on another workflow

require>: operator runs another workflow. Unlike call> operator, the workflow is skipped if the workflow for the session time is already done successfully before.

This operator submits a new session to digdag.

# workflow1.dig
+step1:
  require>: another_workflow
# another_workflow.dig
+step2:
  sh>: tasks/step2.sh
require>: NAME

Name of a workflow.

Example: another_workflow

py>: Python scripts

py>: operator runs a Python script using python command.

See Python API documents for details including variable mappings to keyword arguments.

+step1:
  py>: my_step1_method
+step2:
  py>: tasks.MyWorkflow.step2
py>: [PACKAGE.CLASS.]METHOD

Name of a method to run.

  • py>: tasks.MyWorkflow.my_task

rb>: Ruby scripts

rb>: operator runs a Ruby script using ruby command.

See Ruby API documents for details including best practices how to configure the workflow using _export: require:.

_export:
  rb:
    require: tasks/my_workflow

+step1:
  rb>: my_step1_method
+step2:
  rb>: Task::MyWorkflow.step2
rb>: [MODULE::CLASS.]METHOD

Name of a method to run.

  • rb>: Task::MyWorkflow.my_task
require: FILE

Name of a file to require.

  • require: task/my_workflow

sh>: Shell scripts

sh>: operator runs a shell script.

Run a shell command (/bin/sh)

+step1:
  sh>: echo "hello world"

Run a shell script

+step1:
  sh>: tasks/step1.sh
+step2:
  sh>: tasks/step2.sh
sh>: COMMAND [ARGS...]

Name of the command to run.

  • sh>: tasks/workflow.sh --task1

The shell defaults to /bin/sh. If an alternate shell such as zsh is desired, use the shell option in the _export section.

_export:
  sh:
    shell: [/usr/bin/zsh]

loop>: Repeat tasks

loop>: operator runs subtasks multiple times.

This operator exports ${i} variable for the subtasks. Its value begins from 0. For example, if count is 3, a task runs with i=0, i=1, and i=2.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+repeat:
  loop>: 7
  _do:
    +step1:
      sh>: echo ${new Date((session_unixtime + i * 60 * 60 * 24) * 1000).toLocaleDateString()} is ${i} days later than $session_date
    +step2:
      sh>: echo ${
            new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleDateString()
            + " "
            + new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleTimeString()
        } is ${i} hours later than ${session_local_time}
loop>: COUNT

Number of times to run the tasks.

  • loop>: 7
_parallel: BOOLEAN

Runs the repeating tasks in parallel.

  • _parallel: true
_do: TASKS
Tasks to run.

for_each>: Repeat tasks

for_each>: operator runs subtasks multiple times using sets of variables.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+repeat:
  for_each>:
    fruit: [apple, orange]
    verb: [eat, throw]
  _do:
    sh>: echo ${verb} ${fruit}
    # this will generate 4 tasks:
    #  +for-fruit=apple&verb=eat:
    #    sh>: echo eat apple
    #  +for-fruit=apple&verb=throw:
    #    sh>: echo throw apple
    #  +for-fruit=orange&verb=eat:
    #    sh>: echo eat orange
    #  +for-fruit=orange&verb=throw:
    #    sh>: echo throw orange
for_each>: VARIABLES

Variables used for the loop in key: [value, value, ...] syntax. Variables can be an object or JSON string.

  • for_each>: {i: [1, 2, 3]}
  • or for_each>: {i: '[1, 2, 3]'}
_parallel: BOOLEAN

Runs the repeating tasks in parallel.

  • _parallel: true
_do: TASKS
Tasks to run.

if>: Conditional execution

if>: operator runs subtasks if true is given.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+run_if_param_is_true:
  if>: ${param}
  _do:
    sh>: echo ${param} == true
if>: BOOLEAN
true or false.
_do: TASKS
Tasks to run if true is given.

fail>: make the workflow failed

fail>: always fails and makes the workflow failed.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

This operator is useful used with if> operator to validate resuls of a previous task with _check directive so that a workflow fails when the validation doesn’t pass.

+fail_if_too_few:
  if>: ${count < 10}
  _do:
    fail>: count is less than 10!
fail>: STRING
Message so that _error task can refer the message using ${error.message} syntax.

td>: Treasure Data queries

td>: operator runs a Hive or Presto query on Treasure Data.

_export:
  td:
    database: www_access

+step1:
  td>: queries/step1.sql
+step2:
  td>: queries/step2.sql
  create_table: mytable_${session_date_compact}
+step3:
  td>: queries/step2.sql
  insert_into: mytable

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

td>: FILE.sql

Path to a query template file. This file can contain ${...} syntax to embed variables.

  • td>: queries/step1.sql
create_table: NAME

Name of a table to create from the results. This option deletes the table if it already exists.

This option adds DROP TABLE IF EXISTS; CREATE TABLE AS (Presto) or INSERT OVERWRITE (Hive) commands before the SELECT statement. If the query includes a -- DIGDAG_INSERT_LINE line, the commands are inserted there.

  • create_table: my_table
insert_into: NAME

Name of a table to append results into. The table is created if it does not already exist.

This option adds INSERT INTO (Presto) or INSERT INTO TABLE (Hive) command at the beginning of SELECT statement. If the query includes -- DIGDAG_INSERT_LINE line, the command is inserted to the line.

  • insert_into: my_table
download_file: NAME

Saves query result as a local CSV file.

  • download_file: output.csv
store_last_results: BOOLEAN

Stores the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

  • store_last_results: true
preview: BOOLEAN

Tries to show some query results to confirm the results of a query.

  • preview: true
result_url: NAME

Output the query results to the URL:

  • result_url: tableau://username:password@my.tableauserver.com/?mode=replace
database: NAME

Name of a database.

  • database: my_db
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).
engine: presto

Query engine (presto or hive).

  • engine: hive
  • engine: presto
priority: 0
Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074
td.last_results

The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

  • {"path":"/index.html","count":1}

td_run>: Treasure Data saved queries

td_run>: operator runs a query saved on Treasure Data.

_export:
  td:
    database: www_access

+step1:
  td_run>: myquery1
+step2:
  td_run>: myquery2
  session_time: 2016-01-01T01:01:01+0000

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

td_run>: NAME

Name of a saved query.

  • td_run>: my_query
download_file: NAME

Saves query result as a local CSV file.

  • download_file: output.csv
store_last_results: BOOLEAN

Stores the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

  • store_last_results: true
preview: BOOLEAN

Tries to show some query results to confirm the results of a query.

  • preview: true
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074
td.last_results

The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

  • {"path":"/index.html","count":1}

td_for_each>: Repeat using Treasure Data queries

td_for_each>: operator loops subtasks for each result rows of a Hive or Presto query on Treasure Data.

Subtasks set at _do section can reference results using ${td.each.COLUMN_NAME} syntax where COLUMN_NAME is a name of column.

For example, if you run a query select email, name from users and the query returns 3 rows, this operator runs subtasks 3 times with ${td.each.email} and ${td.each.name}} parameters.

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+for_each_users:
  td_for_each>: queries/users.sql
  _do:
    +show:
      echo>: found a user ${td.each.name} email ${td.each.email}

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

td>: FILE.sql

Path to a query template file. This file can contain ${...} syntax to embed variables.

  • td>: queries/step1.sql
database: NAME

Name of a database.

  • database: my_db
apikey: APIKEY

API key. This must be set as a secret parameter.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).
engine: presto

Query engine (presto or hive).

  • engine: hive
  • engine: presto
priority: 0
Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

td_wait_table>: Waits for data arriving at Treasure Data table

td_wait_table>: operator checks a table periodically until it has certain number of records in a configured range. This is useful to wait execution of following tasks until some records are imported to a table.

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+wait:
  td_wait_table>: target_table

+step1:
  td>: queries/use_records.sql

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

td_wait_table>: FILE.sql

Name of a table.

  • td_wait_table>: target_table
rows: N

Number of rows to wait (default: 0).

  • rows: 10
database: NAME

Name of a database.

  • database: my_db
apikey: APIKEY

API key. This must be set as a secret parameter.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).
engine: presto

Query engine (presto or hive).

  • engine: hive
  • engine: presto
priority: 0
Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

td_wait>: Waits for data arriving at Treasure Data table

td_wait>: operator runs a query periodically until it returns true. This operator can use more complex query compared to td_wait_table>: operator

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+wait:
  td_wait>: queries/check_recent_record.sql

+step1:
  td>: queries/use_records.sql

Example queries:

select 1 from target_table where TD_TIME_RANGE(time, '${session_time}') limit 1

select count(*) > 1000 from target_table where TD_TIME_RANGE(time, '${last_session_time}')

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

td_wait>: FILE.sql

Path to a query template file. This file can contain ${...} syntax to embed variables.

  • td_wait>: queries/check_recent_record.sql
database: NAME

Name of a database.

  • database: my_db
apikey: APIKEY

API key. This must be set as a secret parameter.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).
engine: presto

Query engine (presto or hive).

  • engine: hive
  • engine: presto
priority: 0
Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

td_load>: Treasure Data bulk loading

td_load>: operator loads data from storages, databases, or services.

+step1:
  td_load>: config/guessed.dig
  database: prod
  table: raw

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when submitting Treasure Data bulk load jobs.

Parameters

td_load>: FILE.yml

Path to a YAML template file. This configuration needs to be guessed using td command.

  • td_load>: imports/load.yml
database: NAME

Name of the database load data to.

  • database: my_database
table: NAME

Name of the table load data to.

  • table: my_table
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

td_ddl>: Treasure Data operations

td_ddl> operator runs an operational task on Treasure Data.

_export:
  td:
    database: www_access

+step1:
  td_ddl>:
  create_tables: ["my_table_${session_date_compact}"]
+step2:
  td_ddl>:
  drop_tables: ["my_table_${session_date_compact}"]
+step3:
  td_ddl>:
  empty_tables: ["my_table_${session_date_compact}"]
+step4:
  td_ddl>:
  rename_tables: [{from: "my_table_${session_date_compact}", to: "my_table"}]

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when performing Treasure Data operations.

Parameters

create_tables: [ARRAY OF NAMES]

Create new tables if not exists.

  • create_tables: [my_table1, my_table2]
empty_tables: [ARRAY OF NAME]

Create new tables (drop it first if it exists).

  • empty_tables: [my_table1, my_table2]
drop_tables: [ARRAY OF NAMES]

Drop tables if exists.

  • drop_tables: [my_table1, my_table2]
rename_tables: [ARRAY OF {to:, from:}]

Rename a table to another name (override the destination table if it already exists).

  • rename_tables: [{from: my_table1, to: my_table2}]
create_databases: [ARRAY OF NAMES]

Create new databases if not exists.

  • create_databases: [my_database1, my_database2]
empty_databases: [ARRAY OF NAME]

Create new databases (drop it first if it exists).

  • empty_databases: [my_database1, my_database2]
drop_databases: [ARRAY OF NAMES]

Drop databases if exists.

  • drop_databases: [my_database1, my_database2]
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

td_partial_delete>: Delete range of Treasure Data table

td_partial_delete>: operator deletes records from a Treasure Data table.

Please be aware that records imported using streaming import can’t be deleted for several hours using td_partial_delete. Records imported by INSERT INTO, Data Connector, and bulk imports can be deleted immediately.

Time range needs to be hourly. Setting non-zero values to minutes or seconds will be rejected.

_export:
  td:
    apikey: YOUR/API_KEY

+step1:
  td_partial_delete>:
  database: mydb
  table: mytable
  from: 2016-01-01 00:00:00 +0800
  to:   2016-02-01 00:00:00 +0800

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data queries.

Parameters

database: NAME

Name of the database.

  • database: my_database
table: NAME

Name of the table to export.

  • table: my_table
from: yyyy-MM-dd HH:mm:ss[ Z]

Delete records from this time (inclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • from: 2016-01-01 00:00:00 +0800
to: yyyy-MM-dd HH:mm:ss[ Z]

Delete records to this time (exclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • to: 2016-02-01 00:00:00 +0800
apikey: APIKEY

API key. This must be set as a secret parameter.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

td_table_export>: Treasure Data table export to S3

td_table_export>: operator loads data from storages, databases, or services.

+step1:
  td_table_export>:
  database: mydb
  table: mytable
  file_format: jsonl.gz
  from: 2016-01-01 00:00:00 +0800
  to:   2016-02-01 00:00:00 +0800
  s3_bucket: my_backup_backet
  s3_path_prefix: mydb/mytable

Secrets

td.apikey: API_KEY
The Treasure Data API key to use when running Treasure Data table exports.
aws.s3.access_key_id: ACCESS_KEY_ID

The AWS Access Key ID to use when writing to S3.

  • aws.s3.access_key_id: ABCDEFGHJKLMNOPQRSTU
aws.s3.secret_access_key: SECRET_ACCESS_KEY

The AWS Secret Access Key to use when writing to S3.

  • aws.s3.secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3

Parameters

database: NAME

Name of the database.

  • database: my_database
table: NAME

Name of the table to export.

  • table: my_table
file_format: TYPE

Output file format. Available formats are tsv.gz, jsonl.gz, json.gz, json-line.gz.

  • file_format: jsonl.gz
from: yyyy-MM-dd HH:mm:ss[ Z]

Export records from this time (inclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • from: 2016-01-01 00:00:00 +0800
to: yyyy-MM-dd HH:mm:ss[ Z]

Export records to this time (exclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • to: 2016-02-01 00:00:00 +0800
s3_bucket: NAME

S3 bucket name to export records to.

  • s3_bucket: my_backup_backet
s3_path_prefix: NAME

S3 file name prefix.

  • s3_path_prefix: mytable/mydb
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

pg>: PostgreSQL operations

pg> operator runs queries and/or DDLs on PostgreSQL

_export:
  pg:
    host: 192.0.2.1
    port: 5430
    database: production_db
    user: app_user
    ssl: true
    schema: myschema
    # strict_transaction: false

+replace_deduplicated_master_table:
  pg>: queries/dedup_master_table.sql
  create_table: dedup_master

+prepare_summary_table:
  pg>: queries/create_summary_table_ddl.sql

+insert_to_summary_table:
  pg>: queries/join_log_with_master.sql
  insert_into: summary_table

Secrets

pg.password: NAME
Optional user password to use when connecting to the postgres database.

Parameters

pg>: FILE.sql

Path of the query template file. This file can contain ${...} syntax to embed variables.

  • pg>: queries/complex_queries.sql
create_table: NAME

Table name to create from the results. This option deletes the table if it already exists.

This option adds DROP TABLE IF EXISTS; CREATE TABLE AS before the statements written in the query template file. Also, CREATE TABLE statement can be written in the query template file itself without this command.

  • create_table: dest_table
insert_into: NAME

Table name to append results into.

This option adds INSERT INTO before the statements written in the query template file. Also, INSERT INTO statement can be written in the query template file itself without this command.

  • insert_into: dest_table
download_file: NAME

Local CSV file name to be downloaded. The file includes the result of query.

  • download_file: output.csv
database: NAME

Database name.

  • database: my_db
host: NAME

Hostname or IP address of the database.

  • host: db.foobar.com
port: NUMBER

Port number to connect to the database. Default: 5432.

  • port: 2345
user: NAME

User to connect to the database

  • user: app_user
ssl: BOOLEAN

Enable SSL to connect to the database. Default: false.

  • ssl: true
schema: NAME

Default schema name. Default: public.

  • schema: my_schema
strict_transaction: BOOLEAN

Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

  • strict_transaction: false
status_table_schema: NAME

Schema name of status table. Default: same as the value of schema option.

  • status_table_schema: writable_schema
status_table: NAME

Table name of status table. Default: __digdag_status.

  • status_table: customized_status_table

redshift>: Redshift operations

redshift> operator runs queries and/or DDLs on Redshift

_export:
  redshift:
    host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
    # port: 5439
    database: production_db
    user: app_user
    ssl: true
    schema: myschema
    # strict_transaction: false

+replace_deduplicated_master_table:
  redshift>: queries/dedup_master_table.sql
  create_table: dedup_master

+prepare_summary_table:
  redshift>: queries/create_summary_table_ddl.sql

+insert_to_summary_table:
  redshift>: queries/join_log_with_master.sql
  insert_into: summary_table

Secrets

aws.redshift.password: NAME
Optional user password to use when connecting to the Redshift database.

Parameters

redshift>: FILE.sql

Path of the query template file. This file can contain ${...} syntax to embed variables.

  • redshift>: queries/complex_queries.sql
create_table: NAME

Table name to create from the results. This option deletes the table if it already exists.

This option adds DROP TABLE IF EXISTS; CREATE TABLE AS before the statements written in the query template file. Also, CREATE TABLE statement can be written in the query template file itself without this command.

  • create_table: dest_table
insert_into: NAME

Table name to append results into.

This option adds INSERT INTO before the statements written in the query template file. Also, INSERT INTO statement can be written in the query template file itself without this command.

  • insert_into: dest_table
download_file: NAME

Local CSV file name to be downloaded. The file includes the result of query.

  • download_file: output.csv
database: NAME

Database name.

  • database: my_db
host: NAME

Hostname or IP address of the database.

  • host: db.foobar.com
port: NUMBER

Port number to connect to the database. Default: 5439.

  • port: 2345
user: NAME

User to connect to the database

  • user: app_user
ssl: BOOLEAN

Enable SSL to connect to the database. Default: false.

  • ssl: true
schema: NAME

Default schema name. Default: public.

  • schema: my_schema
strict_transaction: BOOLEAN

Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

  • strict_transaction: false
status_table_schema: NAME

Schema name of status table. Default: same as the value of schema option.

  • status_table_schema: writable_schema
status_table: NAME

Table name prefix of status table. Default: __digdag_status.

  • status_table: customized_status_table

redshift_load>: Redshift load operations

redshift_load> operator runs COPY statement to load data from external storage on Redshift

_export:
  redshift:
    host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
    # port: 5439
    database: production_db
    user: app_user
    ssl: true
    # strict_transaction: false

+load_from_dynamodb_simple:
    redshift_load>:
    schema: myschema
    table: transactions
    from: dynamodb://transaction-table
    readratio: 123

+load_from_s3_with_many_options:
    redshift_load>:
    schema: myschema
    table: access_logs
    from: s3://my-app-bucket/access_logs/today
    column_list: host, path, referer, code, agent, size, method
    manifest: true
    encrypted: true
    region: us-east-1
    csv: "'"
    delimiter: "$"
    # json: s3://my-app-bucket/access_logs/jsonpathfile
    # avro: auto
    # fixedwidth: host:15,code:3,method:15
    gzip: true
    # bzip2: true
    # lzop: true
    acceptanydate: true
    acceptinvchars: "&"
    blanksasnull: true
    dateformat: yyyy-MM-dd
    emptyasnull: true
    encoding: UTF8
    escape: false
    explicit_ids: true
    fillrecord: true
    ignoreblanklines: true
    ignoreheader: 2
    null_as: nULl
    removequotes: false
    roundec: true
    timeformat: YYYY-MM-DD HH:MI:SS
    trimblanks: true
    truncatecolumns: true
    comprows: 12
    compupdate: ON
    maxerror: 34
    # noload: true
    statupdate: false
    role_session_name: federated_user
    session_duration: 1800
    # temp_credentials: false

Secrets

aws.redshift.password: NAME
Optional user password to use when connecting to the Redshift database.
aws.redshift_load.access_key_id, aws.redshift.access_key_id, aws.access_key_id
The AWS Access Key ID to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.
aws.redshift_load.secret_access_key, aws.redshift.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.
aws.redshift_load.role_arn, aws.redshift.role_arn, aws.role_arn
Optional Amazon resource names (ARNs) used to copy data to the Redshift. The role needs AssumeRole role to use this option. Requires temp_credentials to be true. If this option isn’t specified, this operator tries to use a federated user

Parameters

database: NAME

Database name.

  • database: my_db
host: NAME

Hostname or IP address of the database.

  • host: db.foobar.com
port: NUMBER

Port number to connect to the database. Default: 5439.

  • port: 2345
user: NAME

User to connect to the database

  • user: app_user
ssl: BOOLEAN

Enable SSL to connect to the database. Default: false.

  • ssl: true
schema: NAME

Default schema name. Default: public.

  • schema: my_schema
strict_transaction: BOOLEAN

Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

  • strict_transaction: false
status_table_schema: NAME

Schema name of status table. Default: same as the value of schema option.

  • status_table_schema: writable_schema
status_table: NAME

Table name prefix of status table. Default: __digdag_status.

  • status_table: customized_status_table
table: NAME

Table name in Redshift database to be loaded data

  • table: access_logs
from: URI

Parameter mapped to FROM parameter of Redshift`s COPY statement

  • from: s3://my-app-bucket/access_logs/today
column_list: CSV

Parameter mapped to COLUMN_LIST parameter of Redshift`s COPY statement

  • column_list: host, path, referer, code, agent, size, method
manifest: BOOLEAN

Parameter mapped to MANIFEST parameter of Redshift`s COPY statement

  • manifest: true
encrypted: BOOLEAN

Parameter mapped to ENCRYPTED parameter of Redshift`s COPY statement

  • encrypted: true
readratio: NUMBER

Parameter mapped to READRATIO parameter of Redshift`s COPY statement

  • readratio: 150
region: NAME

Parameter mapped to REGION parameter of Redshift`s COPY statement

  • region: us-east-1
csv: CHARACTER

Parameter mapped to CSV parameter of Redshift`s COPY statement. If you want to just use default quote charactor of CSV parameter, set empty string like csv: ‘’

  • csv: "'"
delimiter: CHARACTER

Parameter mapped to DELIMITER parameter of Redshift`s COPY statement

  • delimiter: "$"
json: URI

Parameter mapped to JSON parameter of Redshift`s COPY statement

  • json: auto
  • json: s3://my-app-bucket/access_logs/jsonpathfile
avro: URI

Parameter mapped to AVRO parameter of Redshift`s COPY statement

  • avro: auto
  • avro: s3://my-app-bucket/access_logs/jsonpathfile
fixedwidth: CSV

Parameter mapped to FIXEDWIDTH parameter of Redshift`s COPY statement

  • fixedwidth: host:15,code:3,method:15
gzip: BOOLEAN

Parameter mapped to GZIP parameter of Redshift`s COPY statement

  • gzip: true
bzip2: BOOLEAN

Parameter mapped to BZIP2 parameter of Redshift`s COPY statement

  • bzip2: true
lzop: BOOLEAN

Parameter mapped to LZOP parameter of Redshift`s COPY statement

  • lzop: true
acceptanydate: BOOLEAN

Parameter mapped to ACCEPTANYDATE parameter of Redshift`s COPY statement

  • acceptanydate: true
acceptinvchars: CHARACTER

Parameter mapped to ACCEPTINVCHARS parameter of Redshift`s COPY statement

  • acceptinvchars: "&"
blanksasnull: BOOLEAN

Parameter mapped to BLANKSASNULL parameter of Redshift`s COPY statement

  • blanksasnull: true
dateformat: STRING

Parameter mapped to DATEFORMAT parameter of Redshift`s COPY statement

  • dateformat: yyyy-MM-dd
emptyasnull: BOOLEAN

Parameter mapped to EMPTYASNULL parameter of Redshift`s COPY statement

  • emptyasnull: true
encoding: TYPE

Parameter mapped to ENCODING parameter of Redshift`s COPY statement

  • encoding: UTF8
escape: BOOLEAN

Parameter mapped to ESCAPE parameter of Redshift`s COPY statement

  • escape: false
explicit_ids: BOOLEAN

Parameter mapped to EXPLICIT_IDS parameter of Redshift`s COPY statement

  • explicit_ids: true
fillrecord: BOOLEAN

Parameter mapped to FILLRECORD parameter of Redshift`s COPY statement

  • fillrecord: true
ignoreblanklines: BOOLEAN

Parameter mapped to IGNOREBLANKLINES parameter of Redshift`s COPY statement

  • ignoreblanklines: true
ignoreheader: NUMBER

Parameter mapped to IGNOREHEADER parameter of Redshift`s COPY statement

  • ignoreheader: 2
null_as: STRING

Parameter mapped to NULL AS parameter of Redshift`s COPY statement

  • null_as: nULl
removequotes: BOOLEAN

Parameter mapped to REMOVEQUOTES parameter of Redshift`s COPY statement

  • removequotes: false
roundec: BOOLEAN

Parameter mapped to ROUNDEC parameter of Redshift`s COPY statement

  • roundec: true
timeformat: STRING

Parameter mapped to TIMEFORMAT parameter of Redshift`s COPY statement

  • timeformat: YYYY-MM-DD HH:MI:SS
trimblanks: BOOLEAN

Parameter mapped to TRIMBLANKS parameter of Redshift`s COPY statement

  • trimblanks: true
truncatecolumns: BOOLEAN

Parameter mapped to TRUNCATECOLUMNS parameter of Redshift`s COPY statement

  • truncatecolumns: true
comprows: NUMBER

Parameter mapped to COMPROWS parameter of Redshift`s COPY statement

  • comprows: 12
compupdate: TYPE

Parameter mapped to COMPUPDATE parameter of Redshift`s COPY statement

  • compupdate: ON
maxerror: NUMBER

Parameter mapped to MAXERROR parameter of Redshift`s COPY statement

  • maxerror: 34
noload: BOOLEAN

Parameter mapped to NOLOAD parameter of Redshift`s COPY statement

  • noload: true
statupdate: TYPE

Parameter mapped to STATUPDATE parameter of Redshift`s COPY statement

  • statupdate: off
temp_credentials

Whether this operator uses temporary security credentials. Default: true. This operator tries to use temporary security credentials as follows:

  • If role_arn is specified, it calls AssumeRole action
  • If not, it calls GetFederationToken action

See details about AssumeRole and GetFederationToken in the documents of AWS Security Token Service.

So either of AssumeRole or GetFederationToken action is called to use temporary security credentials by default for secure operation. But if this option is disabled, this operator uses credentials as-is set in the secrets insread of temporary security credentials.

  • temp_credentials: false
session_duration INTEGER

Session duration of temporary security credentials. Default: 3 hour. This option isn’t used when disabling temp_credentials

  • session_duration: 1800

redshift_unload>: Redshift load operations

redshift_unload> operator runs UNLOAD statement to export data to external storage on Redshift

_export:
  redshift:
    host: my-redshift.1234abcd.us-east-1.redshift.amazonaws.com
    # port: 5439
    database: production_db
    user: app_user
    ssl: true
    schema: myschema
    # strict_transaction: false

+load_from_s3_with_many_options:
    redshift_unload>:
    query: select * from access_logs
    to: s3://my-app-bucket/access_logs/today
    manifest: true
    encrypted: true
    delimiter: "$"
    # fixedwidth: host:15,code:3,method:15
    gzip: true
    # bzip2: true
    null_as: nULl
    escape: false
    addquotes: true
    parallel: true

Secrets

aws.redshift.password: NAME
Optional user password to use when connecting to the Redshift database.
aws.redshift_unload.access_key_id, aws.redshift.access_key_id, aws.access_key_id
The AWS Access Key ID to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.
aws.redshift_unload.secret_access_key, aws.redshift.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when accessing data source. This value is used to get temporary security credentials by default. See temp_credentials option for details.
aws.redshift_load.role_arn, aws.redshift.role_arn, aws.role_arn
Optional Amazon resource names (ARNs) used to copy data to the Redshift. The role needs AssumeRole role to use this option. Requires temp_credentials to be true. If this option isn’t specified, this operator tries to use a federated user

Parameters

database: NAME

Database name.

  • database: my_db
host: NAME

Hostname or IP address of the database.

  • host: db.foobar.com
port: NUMBER

Port number to connect to the database. Default: 5439.

  • port: 2345
user: NAME

User to connect to the database

  • user: app_user
ssl: BOOLEAN

Enable SSL to connect to the database. Default: false.

  • ssl: true
schema: NAME

Default schema name. Default: public.

  • schema: my_schema
strict_transaction: BOOLEAN

Whether this operator uses a strict transaction to prevent generating unexpected duplicated records just in case. Default: true. This operator creates and uses a status table in the database to make an operation idempotent. But if creating a table isn’t allowed, this option should be false.

  • strict_transaction: false
status_table_schema: NAME

Schema name of status table. Default: same as the value of schema option.

  • status_table_schema: writable_schema
status_table: NAME

Table name prefix of status table. Default: __digdag_status.

  • status_table: customized_status_table
query: STRING

SELECT query. The results of the query are unloaded.

  • query: select * from access_logs
to: URI

Parameter mapped to TO parameter of Redshift`s UNLOAD statement

  • to: s3://my-app-bucket/access_logs/today

manifest manifest: BOOLEAN

Parameter mapped to MANIFEST parameter of Redshift`s UNLOAD statement

  • manifest: true

encrypted encrypted: BOOLEAN

Parameter mapped to ENCRYPTED parameter of Redshift`s UNLOAD statement

  • encrypted: true

allowoverwrite allowoverwrite: BOOLEAN

Parameter mapped to ALLOWOVERWRITE parameter of Redshift`s UNLOAD statement

  • allowoverwrite: true

delimiter delimiter: CHARACTER

Parameter mapped to DELIMITER parameter of Redshift`s UNLOAD statement

  • delimiter: "$"

fixedwidth fixedwidth: BOOLEAN

Parameter mapped to FIXEDWIDTH parameter of Redshift`s UNLOAD statement

  • fixedwidth: host:15,code:3,method:15

gzip gzip: BOOLEAN

Parameter mapped to GZIP parameter of Redshift`s UNLOAD statement

  • gzip: true

bzip2 bzip2: BOOLEAN

Parameter mapped to BZIP2 parameter of Redshift`s UNLOAD statement

  • bzip2: true

null_as null_as: BOOLEAN

Parameter mapped to NULL_AS parameter of Redshift`s UNLOAD statement

  • null_as: nuLL

escape escape: BOOLEAN

Parameter mapped to ESCAPE parameter of Redshift`s UNLOAD statement

  • escape: true

addquotes addquotes: BOOLEAN

Parameter mapped to ADDQUOTES parameter of Redshift`s UNLOAD statement

  • addquotes: true

parallel parallel: TYPE

Parameter mapped to PARALLEL parameter of Redshift`s UNLOAD statement

  • parallel: ON

temp_credentials temp_credentials

Whether this operator uses temporary security credentials. Default: true. This operator tries to use temporary security credentials as follows:

  • If role_arn is specified, it calls AssumeRole action
  • If not, it calls GetFederationToken action

See details about AssumeRole and GetFederationToken in the documents of AWS Security Token Service.

So either of AssumeRole or GetFederationToken action is called to use temporary security credentials by default for secure operation. But if this option is disabled, this operator uses credentials as-is set in the secrets insread of temporary security credentials.

  • temp_credentials: false
session_duration INTEGER

Session duration of temporary security credentials. Default: 3 hour. This option isn’t used when disabling temp_credentials

  • session_duration: 1800

mail>: Sending email

mail>: operator sends an email.

To use Gmail SMTP server, you need to do either of:

  1. Generate a new app password at App passwords. This needs to enable 2-Step Verification first.
  2. Enable access for less secure apps at Less secure apps. This works even if 2-Step Verification is not enabled.
_export:
  mail:
    from: "[email protected]"

+step1:
  mail>: body.txt
  subject: workflow started
  to: [[email protected]]

+step2:
  mail>:
    data: this is email body embedded in a .dig file
  subject: workflow started
  to: [[email protected]]

+step3:
  sh>: this_task_might_fail.sh
  _error:
    mail>: body.txt
    subject: this workflow failed
    to: [[email protected]]

Secrets

mail.host: HOST

SMTP host name.

  • mail.host: smtp.gmail.com
mail.port: PORT

SMTP port number.

  • mail.port: 587
mail.username: NAME

SMTP login username.

  • mail.username: me
mail.password: PASSWORD

SMTP login password.

  • mail.password: MyPaSsWoRd
mail.tls: BOOLEAN

Enables TLS handshake.

  • mail.tls: true
mail.ssl: BOOLEAN

Enables legacy SSL encryption.

  • mail.ssl: false

Parameters

mail>: FILE

Path to a mail body template file. This file can contain ${...} syntax to embed variables. Alternatively, you can set {data: TEXT} to embed body text in the .dig file.

  • mail>: mail_body.txt
  • or mail>: {body: Hello, this is from Digdag}
subject: SUBJECT

Subject of the email.

  • subject: Mail From Digdag
to: [ADDR1, ADDR2, ...]

To addresses.

  • to: [analyst@examile.com]
from: ADDR

From address.

  • from: admin@example.com
host: NAME

SMTP host name.

  • host: smtp.gmail.com
port: NAME

SMTP port number.

  • port: 587
username: NAME

SMTP login username.

  • username: me
tls: BOOLEAN

Enables TLS handshake.

  • tls: true
ssl: BOOLEAN

Enables legacy SSL encryption.

  • ssl: false
html: BOOLEAN

Uses HTML mail (default: false).

  • html: true
debug: BOOLEAN

Shows debug logs (default: false).

  • debug: false
attach_files: ARRAY

Attach files. Each element is an object of:

  • path: FILE: Path to a file to attach.
  • content_type: Content-Type of this file. Default is application/octet-stream.
  • filename: Name of this file. Default is base name of the path.

Example:

attach_files:
  - path: data.csv
  - path: output.dat
    filename: workflow_result_data.csv
  - path: images/image1.png
    content_type: image/png

embulk>: Embulk data transfer

embulk>: operator runs Embulk to transfer data across storages including local files.

+load:
  embulk>: data/load.yml
embulk>: FILE.yml

Path to a configuration template file.

  • embulk>: embulk/mysql_to_csv.yml

s3_wait>: Wait for a file in Amazon S3

The s3_wait>: operator waits for file to appear in Amazon S3.

+wait:
  s3_wait>: my-bucket/my-key

Secrets

aws.s3.access_key_id, aws.access_key_id
The AWS Access Key ID to use when accessing S3.
aws.s3.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when accessing S3.
aws.s3.region, aws.region
An optional explicit AWS Region in which to access S3.
aws.s3.endpoint
An optional explicit API endpoint to use when accessing S3. This overrides the region secret.
aws.s3.sse_c_key
An optional Customer-Provided Server-Side Encryption (SSE-C) key to use when accessing S3. Must be Base64 encoded.
aws.s3.sse_c_key_algorithm
An optional Customer-Provided Server-Side Encryption (SSE-C) key algorithm to use when accessing S3.
aws.s3.sse_c_key_md5
An optional MD5 digest of the Customer-Provided Server-Side Encryption (SSE-C) key to use when accessing S3. Must be Base64 encoded.

For more information about SSE-C, See the AWS S3 Documentation.

Parameters

s3_wait>: BUCKET/KEY

Path to the file in Amazon S3 to wait for.

  • s3_wait>: my-bucket/my-data.gz
  • s3_wait>: my-bucket/file/in/a/directory
region: REGION
An optional explicit AWS Region in which to access S3. This may also be specified using the aws.s3.region secret.
endpoint: ENDPOINT
An optional explicit AWS Region in which to access S3. This may also be specified using the aws.s3.endpoint secret. Note: This will override the region parameter.
bucket: BUCKET
The S3 bucket where the file is located. Can be used together with the key parameter instead of putting the path on the operator line.
key: KEY
The S3 key of the file. Can be used together with the bucket parameter instead of putting the path on the operator line.
version_id: VERSION_ID
An optional object version to check for.
path_style_access: true/false
An optional flag to control whether to use path-style or virtual hosted-style access when accessing S3. Note: Enabling path_style_access also requires specifying a region.

Output Parameters

s3.last_object

Information about the detected file.

{
  "metadata": {
    "Accept-Ranges": "bytes",
    "Access-Control-Allow-Origin": "*",
    "Content-Length": 4711,
    "Content-Type": "application/octet-stream",
    "ETag": "5eb63bbbe01eeed093cb22bb8f5acdc3",
    "Last-Modified": 1474360744000,
    "Last-Ranges": "bytes"
  },
  "user_metadata": {
    "foo": "bar",
    "baz": "quux"
  }
}

Note

The s3_wait>: operator makes use of polling with exponential backoff. As such there might be some time interval between a file being created and the s3_wait>: operator detecting it.

bq>: Running Google BigQuery queries

The bq>: operator can be used to run a query on Google BigQuery.

_export:
  bq:
    dataset: my_dataset

+step1:
  bq>: queries/step1.sql
+step2:
  bq>: queries/step2.sql
  destination_table: result_table
+step3:
  bq>: queries/step3.sql
  destination_table: other_project:other_dataset.other_table

Note

The bq>: operator uses standard SQL by default, whereas the default in the BigQuery console is legacy SQL. To run legacy SQL queries, please set use_legacy_sql: true. For more information about standard SQL on BigQuery, see Migrating from legacy SQL.

Secrets

gcp.credential: CREDENTIAL

The Google Cloud Platform account credential private key to use, in JSON format.

For information on how to generate a service account key, see the Google Cloud Platform Documentation.

Upload the private key JSON file to the digdag server using the secrets client command:

digdag secrets --project my_project --set [email protected]

Parameters

bq>: query.sql

Path to a query template file. This file can contain ${...} syntax to embed variables.

  • bq>: queries/step1.sql
dataset: NAME

Specifies the default dataset to use in the query and in the destination_table parameter.

  • dataset: my_dataset
  • dataset: other_project:other_dataset
destination_table: NAME

Specifies a table to store the query results in.

  • destination_table: my_result_table
  • destination_table: some_dataset.some_table
  • destination_table: some_project:some_dataset.some_table
create_disposition: CREATE_IF_NEEDED | CREATE_NEVER

Specifies whether the destination table should be automatically created when executing the query.

  • CREATE_IF_NEEDED: (default) The destination table is created if it does not already exist.
  • CREATE_NEVER: The destination table must already exist, otherwise the query will fail.

Examples:

  • create_disposition: CREATE_IF_NEEDED
  • create_disposition: CREATE_NEVER
write_disposition: WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY

Specifies whether to permit writing of data to an already existing destination table.

  • WRITE_TRUNCATE: If the destination table already exists, any data in it will be overwritten.
  • WRITE_APPEND: If the destination table already exists, any data in it will be appended to.
  • WRITE_EMPTY: (default) The query fails if the destination table already exists and is not empty.

Examples:

  • write_disposition: WRITE_TRUNCATE
  • write_disposition: WRITE_APPEND
  • write_disposition: WRITE_EMPTY
priority: INTERACTIVE | BATCH
Specifies the priority to use for this query. Default: INTERACTIVE.
use_query_cache: BOOLEAN
Whether to use BigQuery query result caching. Default: true.
allow_large_results: BOOLEAN
Whether to allow arbitrarily large result tables. Requires destination_table to be set and use_legacy_sql to be true.
flatten_results: BOOLEAN
Whether to flatten nested and repeated fields in the query results. Default: true. Requires use_legacy_sql to be true.
use_legacy_sql: BOOLEAN
Whether to use legacy BigQuery SQL. Default: false.
maximum_billing_tier: INTEGER
Limit the billing tier for this query. Default: The project default.
table_definitions: OBJECT
Describes external data sources that are accessed in the query. For more information see BigQuery documentation.
user_defined_function_resources: LIST
Describes user-defined function resources used in the query. For more information see BigQuery documentation.

Output parameters

bq.last_job_id
The id of the BigQuery job that executed this query.

bq_ddl>: Managing Google BigQuery Datasets and Tables

The bq_ddl>: operator can be used to create, delete and clear Google BigQuery Datasets and Tables.

_export:
  bq:
    dataset: my_dataset

+prepare:
  bq_ddl>:
    create_datasets:
      - my_dataset_${session_date_compact}
    empty_datasets:
      - my_dataset_${session_date_compact}
    delete_datasets:
      - my_dataset_${last_session_date_compact}
    create_tables:
      - my_table_${session_date_compact}
    empty_tables:
      - my_table_${session_date_compact}
    delete_tables:
      - my_table_${last_session_date_compact}

Secrets

gcp.credential: CREDENTIAL
See gcp_credential.

Parameters

create_datasets: LIST

Create new datasets.

For detailed information about dataset configuration parameters, see the Google BigQuery Datasets Documentation.

Examples:

create_datasets:
  - foo
  - other_project:bar
create_datasets:
  - foo_dataset_${session_date_compact}
  - id: bar_dataset_${session_date_compact}
    project: other_project
    friendly_name: Bar dataset ${session_date_compact}
    description: Bar dataset for ${session_date}
    default_table_expiration: 7d
    location: EU
    labels:
      foo: bar
      quux: 17
    access:
      - domain: example.com
        role: READER
      - userByEmail: [email protected]
        role: WRITER
      - groupByEmail: [email protected]
        role: OWNER
empty_datasets: LIST

Create new datasets, deleting them first if they already exist. Any tables in the datasets will also be deleted.

For detailed information about dataset configuration parameters, see the Google BigQuery Datasets Documentation.

Examples:

empty_datasets:
  - foo
  - other_project:bar
empty_datasets:
  - foo_dataset_${session_date_compact}
  - id: bar_dataset_${session_date_compact}
    project: other_project
    friendly_name: Bar dataset ${session_date_compact}
    description: Bar dataset for ${session_date}
    default_table_expiration: 7d
    location: EU
    labels:
      foo: bar
      quux: 17
    access:
      - domain: example.com
        role: READER
      - userByEmail: [email protected]
        role: WRITER
      - groupByEmail: [email protected]
        role: OWNER
delete_datasets: LIST

Delete datasets, if they exist.

Examples:

delete_datasets:
  - foo
  - other_project:bar
delete_datasets:
  - foo_dataset_${last_session_date_compact}
  - other_project:bar_dataset_${last_session_date_compact}
create_tables: LIST

Create new tables.

For detailed information about table configuration parameters, see the Google BigQuery Tables Documentation.

Examples:

create_tables:
  - foo
  - other_dataset.bar
  - other_project:yet_another_dataset.baz
create_tables:
  - foo_dataset_${session_date_compact}
  - id: bar_dataset_${session_date_compact}
    project: other_project
    dataset: other_dataset
    friendly_name: Bar dataset ${session_date_compact}
    description: Bar dataset for ${session_date}
    expiration_time: 2016-11-01-T01:02:03Z
    schema:
      fields:
        - {name: foo, type: STRING}
        - {name: bar, type: INTEGER}
    labels:
      foo: bar
      quux: 17
    access:
      - domain: example.com
        role: READER
      - userByEmail: [email protected]
        role: WRITER
      - groupByEmail: [email protected]
        role: OWNER
empty_tables: LIST

Create new tables, deleting them first if they already exist.

For detailed information about table configuration parameters, see the Google BigQuery Tables Documentation.

Examples:

empty_tables:
  - foo
  - other_dataset.bar
  - other_project:yet_another_dataset.baz
empty_tables:
  - foo_table_${session_date_compact}
  - id: bar_table_${session_date_compact}
    project: other_project
    dataset: other_dataset
    friendly_name: Bar dataset ${session_date_compact}
    description: Bar dataset for ${session_date}
    expiration_time: 2016-11-01-T01:02:03Z
    schema:
      fields:
        - {name: foo, type: STRING}
        - {name: bar, type: INTEGER}
    labels:
      foo: bar
      quux: 17
    access:
      - domain: example.com
        role: READER
      - userByEmail: [email protected]
        role: WRITER
      - groupByEmail: [email protected]
        role: OWNER
delete_tables: LIST

Delete tables, if they exist.

Examples:

delete_tables:
  - foo
  - other_dataset.bar
  - other_project:yet_another_dataset.baz
delete_tables:
  - foo_table_${last_session_date_compact}
  - bar_table_${last_session_date_compact}

bq_extract>: Exporting Data from Google BigQuery

The bq_extract>: operator can be used to export data from Google BigQuery tables.

_export:
  bq:
    dataset: my_dataset

+process:
  bq>: queries/analyze.sql
  destination_table: result

+export:
  bq_extract>: result
  destination: gs://my_bucket/result.csv.gz
  compression: GZIP

Secrets

gcp.credential: CREDENTIAL
See gcp_credential.

Parameters

bq_extract>: TABLE

A reference to the table that should be exported.

  • bq_extract>: my_table
  • bq_extract>: my_dataset.my_table
  • bq_extract>: my_project:my_dataset.my_table
destination: URI | LIST

A URI or list of URIs with the location of the destination export files. These must be Google Cloud Storage URIs.

Examples:

destination: gs://my_bucket/my_export.csv
destination:
  - gs://my_bucket/my_export_1.csv
  - gs://my_bucket/my_export_2.csv
print_header: BOOLEAN
Whether to print out a header row in the results. Default: true.
field_delimiter: CHARACTER

A delimiter to use between fields in the output. Default: ,.

  • field_delimiter: '\t'
destination_format: CSV | NEWLINE_DELIMITED_JSON | AVRO

The format of the destination export file. Default: CSV.

  • destination_format: CSV
  • destination_format: NEWLINE_DELIMITED_JSON
  • destination_format: AVRO
compression: GZIP | NONE

The compression to use for the export file. Default: NONE.

  • compression: NONE
  • compression: GZIP

Output parameters

bq.last_job_id
The id of the BigQuery job that performed this export.

bq_load>: Importing Data into Google BigQuery

The bq_load>: operator can be used to import data into Google BigQuery tables.

_export:
  bq:
    dataset: my_dataset

+ingest:
  bq_load>: gs://my_bucket/data.csv
  destination_table: my_data

+process:
  bq>: queries/process.sql
  destination_table: my_result

Secrets

gcp.credential: CREDENTIAL
See gcp_credential.

Parameters

bq_load>: URI | LIST

A URI or list of URIs identifying files in GCS to import.

Examples:

bq_load>: gs://my_bucket/data.csv
bq_load>:
  - gs://my_bucket/data1.csv.gz
  - gs://my_bucket/data2_*.csv.gz
dataset: NAME

The dataset that the destination table is located in or should be created in. Can also be specified directly in the table reference.

  • dataset: my_dataset
  • dataset: my_project:my_dataset
destination_table: NAME

The table to store the imported data in.

  • destination_table: my_result_table
  • destination_table: some_dataset.some_table
  • destination_table: some_project:some_dataset.some_table
project: NAME
The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter.
source_format: CSV | NEWLINE_DELIMITED_JSON | AVRO | DATASTORE_BACKUP

The format of the files to be imported. Default: CSV.

  • source_format: CSV
  • source_format: NEWLINE_DELIMITED_JSON
  • source_format: AVRO
  • source_format: DATASTORE_BACKUP
field_delimiter: CHARACTER

The separator used between fields in CSV files to be imported. Default: ,.

  • field_delimiter: '\t'
create_disposition: CREATE_IF_NEEDED | CREATE_NEVER

Specifies whether the destination table should be automatically created when performing the import.

  • CREATE_IF_NEEDED: (default) The destination table is created if it does not already exist.
  • CREATE_NEVER: The destination table must already exist, otherwise the import will fail.

Examples:

  • create_disposition: CREATE_IF_NEEDED
  • create_disposition: CREATE_NEVER
write_disposition: WRITE_TRUNCATE | WRITE_APPEND | WRITE_EMPTY

Specifies whether to permit importing data to an already existing destination table.

  • WRITE_TRUNCATE: If the destination table already exists, any data in it will be overwritten.
  • WRITE_APPEND: If the destination table already exists, any data in it will be appended to.
  • WRITE_EMPTY: (default) The import fails if the destination table already exists and is not empty.

Examples:

  • write_disposition: WRITE_TRUNCATE
  • write_disposition: WRITE_APPEND
  • write_disposition: WRITE_EMPTY
skip_leading_rows: INTEGER

The number of leading rows to skip in CSV files to import. Default: 0.

  • skip_leading_rows: 1
encoding: UTF-8 | ISO-8859-1

The character encoding of the data in the files to import. Default: UTF-8.

  • encoding: ISO-8859-1
quote: CHARACTER

The character quote of the data in the files to import. Default: '"'.

  • quote: ''
  • quote: "'"
max_bad_records: INTEGER

The maximum number of bad records to ignore before failing the import. Default: 0.

  • max_bad_records: 100
allow_quoted_newlines: BOOLEAN
Whether to allow quoted data sections that contain newline characters in a CSV file. Default: false.
allow_jagged_rows: BOOLEAN
Whether to accept rows that are missing trailing optional columns in CSV files. Default: false.
ignore_unknown_values: BOOLEAN
Whether to ignore extra values in data that are not represented in the table schema. Default: false.
projection_fields: LIST
A list of names of Cloud Datastore entity properties to load. Requires source_format: DATASTORE_BACKUP.
autodetect: BOOLEAN
Whether to automatically infer options and schema for CSV and JSON sources. Default: false.
schema_update_options: LIST

A list of destination table schema updates that may be automatically performed when performing the import.

schema_update_options:
  - ALLOW_FIELD_ADDITION
  - ALLOW_FIELD_RELAXATION

Output parameters

bq.last_job_id
The id of the BigQuery job that performed this import.

gcs_wait>: Wait for a file in Google Cloud Storage

The gcs_wait>: operator can be used to wait for file to appear in Google Cloud Storage.

+wait:
  gcs_wait>: my_bucket/some/file

+wait:
  gcs_wait>: gs://my_bucket/some/file

Secrets

gcp.credential: CREDENTIAL
See gcp_credential.

Parameters

gcs_wait>: URI | BUCKET/OBJECT

Google Cloud Storage URI or path of the file to wait for.

  • gcs_wait>: my-bucket/my-directory/my-data.gz
  • gcs_wait>: gs://my-bucket/my-directory/my-data.gz
bucket: NAME
The GCS bucket where the file is located. Can be used together with the object parameter instead of putting the path on the operator command line.
object: PATH
The GCS path of the file. Can be used together with the bucket parameter instead of putting the path on the operator command line.

Output parameters

gcs_wait.last_object

Information about the detected file.

{
    "metadata": {
        "bucket": "my_bucket",
        "contentType": "text/plain",
        "crc32c": "yV/Pdw==",
        "etag": "CKjJ6/H4988CEAE=",
        "generation": 1477466841081000,
        "id": "my_bucket/some/file",
        "kind": "storage#object",
        "md5Hash": "IT4zYwc3D23HpSGe3nZ85A==",
        "mediaLink": "https://www.googleapis.com/download/storage/v1/b/my_bucket/o/some%2Ffile?generation=1477466841081000&alt=media",
        "metageneration": 1,
        "name": "some/file",
        "selfLink": "https://www.googleapis.com/storage/v1/b/my_bucket/o/some%2Ffile",
        "size": 4711,
        "storageClass": "STANDARD",
        "timeCreated": {
            "value": 1477466841070,
            "dateOnly": false,
            "timeZoneShift": 0
        },
        "updated": {
            "value": 1477466841070,
            "dateOnly": false,
            "timeZoneShift": 0
        }
    }
}

Note

The gcs_wait>: operator makes use of polling with exponential backoff. As such there might be some time interval between a file being created and the gcs_wait>: operator detecting it.

http>: Making HTTP requests

The http>: operator can be used to make HTTP requests.

+fetch:
  http>: https://api.example.com/foobars
  store_content: true

+process:
  for_each>:
    foobar: ${http.last_content}
  _do:
    bq>: query.sql
+notify:
  http>: https://api.example.com/data/sessions/{$session_uuid}
  method: POST
  content:
    status: RUNNING
    time: ${session_time}

Secrets

http.authorization: STRING
A string that should be included in the HTTP request as the value of the Authorization header. This can be used to authenticate using e.g. Oauth bearer tokens.
http.user: STRING
A user that should be used to authenticate using Basic Authentication.
http.password: STRING
A password that should be used to authenticate using Basic Authentication.
http.uri: URI
The URI of the HTTP request. This can be used instead of putting the URI on the operator command line in case the URI contains sensitive information.

Parameters

http>: URI

The URI of the HTTP request.

  • http>: https://api.example.com/foobar
  • http>: https://api.example.com/data/sessions/{$session_uuid}
method: STRING

The method of the HTTP request. Default: GET.

  • method: POST
  • method: DELETE
content: STRING | INTEGER | BOOLEAN | OBJECT | ARRAY

The content of the HTTP request. Default: No content.

Scalars (i.e. strings, integers, booleans, etc) will by default be sent as plain text. Objects and arrays will by default be JSON serialized. The content_format parameter can be used to control the content serialization format.

content: 'hello world'
content: '${session_time}'
content:
  status: RUNNING
  time: ${session_time}
content_format: text | json | form

The serialization format of the content of the HTTP request. Default: Inferred from the content parameter value type. Objects and arrays use json by default. Other value types default to text.

  • text: Send raw content as Content-Type: text/plain. Note: This requires that the content parameter is _not_ array or an object.
  • json: Serialize the content as JSON and send it as Content-Type: application/json. This format can handle any content parameter value type.
  • form: Encode content as an HTML form and send it as Content-Type: application/x-www-form-urlencoded. Note: This requires the content parameter value to be an object.
content: 'hello world @ ${session_time}'
content_format: text
content:
  status: RUNNING
  time: ${session_time}
content_format: json
content:
  status: RUNNING
  time: ${session_time}
content_format: form
content_type: STRING

Override the inferred Content-Type header.

content: |
  <?xml version="1.0" encoding="UTF-8"?>
  <notification>
    <status>RUNNING</status>
    <time>${session_time}</time>
  </notification>
content_format: text
content_type: application/xml
store_content: BOOLEAN
Whether to store the content of the response. Default: false.
headers: LIST OF KEY-VALUE PAIRS

Additional custom headers to send with the HTTP request.

headers:
  - Accept: application/json
  - X-Foo: bar
  - Baz: quux
retry: BOOLEAN

Whether to retry ephemeral errors. Default: true if the request method is GET, HEAD, OPTIONS or TRACE. Otherwise false.

Client 4xx errors (except for 408 Request Timeout and 429 Too Many Requests) will not be retried even if retry is set to true.

Note: Enabling retries might cause the target endpoint to receive multiple duplicate HTTP requests. Thus retries should only be enabled if duplicated requests are tolerable. E.g. when the outcome of the HTTP request is idempotent.

emr>: Amazon Elastic Map Reduce

The emr>: operator can be used to run EMR jobs, create clusters and submit steps to existing clusters.

For detailed information about EMR, see the Amazon Elastic MapReduce Documentation.

+emr_job:
  emr>:
  cluster:
    name: my-cluster
    ec2:
      key: my-ec2-key
      master:
        type: m3.2xlarge
      core:
        type: m3.xlarge
        count: 10
    logs: s3://my-bucket/logs/
  staging: s3://my-bucket/staging/
  steps:
    - type: spark
      application: pi.py
    - type: spark-sql
      query: queries/query.sql
      result: s3://my-bucket/results/${session_uuid}/
    - type: script
      script: scripts/hello.sh
      args: [hello, world]

Secrets

aws.emr.access_key_id, aws.access_key_id
The AWS Access Key ID to use when submitting EMR jobs.
aws.emr.secret_access_key, aws.secret_access_key
The AWS Secret Access Key to use when submitting EMR jobs.
aws.emr.role_arn, aws.role_arn
The AWS Role to assume when submitting EMR jobs.

Parameters

cluster: STRING | OBJECT

Specifies either the ID of an existing cluster to submit steps to or the configuration of a new cluster to create.

Using an existing cluster:

cluster: j-7KHU3VCWGNAFL

Creating a new minimal ephemeral cluster with just one node:

cluster:
  ec2:
    key: my-ec2-key
  logs: s3://my-bucket/logs/

Creating a customized cluster with several hosts:

cluster:
  name: my-cluster
  auto_terminate: false
  release: emr-5.2.0
  applications:
    - hadoop
    - spark
    - hue
    - zookeeper
  ec2:
    key: my-ec2-key
    subnet_id: subnet-83047402b
    master:
      type: m4.2xlarge
    core:
      type: m4.xlarge
      count: 10
      ebs:
        optimized: true
        devices:
          volume_specifiation:
            iops: 10000
            size_in_gb: 1000
            type: gp2
          volumes_per_instance: 6
    task:
      - type: c4.4xlarge
        count: 20
      - type: g2.2xlarge
        count: 6
  logs: s3://my-bucket/logs/
  bootstrap:
    - install_foo.sh
    - name: Install Bar
      path: install_bar.sh
      args: [baz, quux]
staging: S3_URI

A S3 folder to use for staging local files for execution on the EMR cluster. Note: the configured AWS credentials must have permission to put and get objects in this folder.

  • staging: s3://my-bucket/staging/
steps: LIST

A list of steps to submit to the EMR cluster.

steps:
  - type: flink
    application: flink/WordCount.jar

  - type: hive
    script: queries/hive-query.q
    vars:
      INPUT: s3://my-bucket/data/
      OUTPUT: s3://my-bucket/output/
    hiveconf:
      hive.support.sql11.reserved.keywords: false

  - type: spark
    application: spark/pi.scala

  - type: spark
    application: s3://my-bucket/spark/hello.py
    args: [foo, bar]

  - type: spark
    application: spark/hello.jar
    class: com.example.Hello
    jars:
      - libhello.jar
      - s3://td-spark/td-spark-assembly-0.1.jar
    conf:
      spark.locality.wait: 5s
      spark.memory.fraction: 0.5
    args: [foo, bar]

  - type: spark-sql
    query: spark/query.sql
    result: s3://my-bucket/results/${session_uuid}/

  - type: script
    script: s3://my-bucket/scripts/hello.sh
    args: [hello, world]

  - type: script
    script: scripts/hello.sh
    args: [world]

  - type: command
    command: echo
    args: [hello, world]
action_on_failure: TERMINATE_JOB_FLOW | TERMINATE_CLUSTER | CANCEL_AND_WAIT | CONTINUE
The action EMR should take in response to a job step failing.

Output parameters

emr.last_cluster_id
The ID of the cluster created. If a pre-existing cluster was used, this parameter will not be set.