Language API - Python

Programmable workflow in Python

workflow.dig:

+step1:
  py>: tasks.MyWorkflow.step1

+step2:
  py>: tasks.MyWorkflow.step2

tasks/__init__.py:

class MyWorkflow(object):
  def step1(self):
    print("step1")

  def step2(self):
    print("step2")

Defining variables

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.store({'my_value': 1})

  def step2(self):
    print("step2: %s" % digdag.env.params["my_value"])

Method argument mapping

import digdag
class MyWorkflow(object):
  def step1(self, session_time):
    digdag.env.store({'my_value': 1})

  def step2(self, my_value="default"):
    print("step2: %s" % my_value)

Generating child tasks

Generating Python child tasks:

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.add_subtask(MyWorkflow.step3, arg1=1)

  def step2(self, my_value="default"):
    print("step2: %s" % my_value)

  def step3(self, arg1):
    print("step3: %s" % arg1)

Generating other child tasks:

import digdag
class MyWorkflow(object):
  def step1(self):
    digdag.env.add_subtask({
      '_type': 'mail',
      'body': 'this is email body in string',
      'subject': 'workflow started',
      'to': ['[email protected]'],
    })

Note

Arguments need to be serializable using JSON. If arguments include non-serializable objects such as function, add_subtask throws TypeError.