emr>: Amazon Elastic Map Reduce

emr> operator can be used to run EMR jobs, create clusters and submit steps to existing clusters.

For detailed information about EMR, see the Amazon Elastic MapReduce Documentation.

+emr_job:
  emr>:
  cluster:
    name: my-cluster
    ec2:
      key: my-ec2-key
      master:
        type: m3.2xlarge
      core:
        type: m3.xlarge
        count: 10
    logs: s3://my-bucket/logs/
  staging: s3://my-bucket/staging/
  steps:
    - type: spark
      application: pi.py
    - type: spark-sql
      query: queries/query.sql
      result: s3://my-bucket/results/${session_uuid}/
    - type: script
      script: scripts/hello.sh
      args: [hello, world]

Secrets

When you set those parameters, use digdag secrets command.

  • aws.emr.access_key_id, aws.access_key_id

    The AWS Access Key ID to use when submitting EMR jobs.

  • aws.emr.secret_access_key, aws.secret_access_key

    The AWS Secret Access Key to use when submitting EMR jobs.

  • aws.emr.role_arn, aws.role_arn

    The AWS Role to assume when submitting EMR jobs.

  • aws.emr.region, aws.region

    The AWS region to use for EMR service.

  • aws.emr.endpoint

    The AWS EMR endpoint address to use.

  • aws.s3.region, aws.region

    The AWS region to use for S3 service to store staging files.

  • aws.s3.endpoint

    The AWS S3 endpoint address to use for staging files.

  • aws.kms.region, aws.region

    The AWS region to use for KMS service to encrypt variables passed to EMR jobs.

  • aws.kms.endpoint

    The AWS KMS endpoint address to use for EMR variable encryption.

Options

  • cluster: STRING | OBJECT

    Specifies either the ID of an existing cluster to submit steps to or the configuration of a new cluster to create.

    Using an existing cluster:

    cluster: j-7KHU3VCWGNAFL
    

    Creating a new minimal ephemeral cluster with just one node:

    cluster:
      ec2:
        key: my-ec2-key
      logs: s3://my-bucket/logs/
    

    Creating a customized cluster with several hosts:

    cluster:
      name: my-cluster
      auto_terminate: false
      release: emr-5.2.0
      applications:
        - hadoop
        - spark
        - hue
        - zookeeper
      ec2:
        key: my-ec2-key
        subnet_id: subnet-83047402b
        master:
          type: m4.2xlarge
        core:
          type: m4.xlarge
          count: 10
          ebs:
            optimized: true
            devices:
              volume_specifiation:
                iops: 10000
                size_in_gb: 1000
                type: gp2
              volumes_per_instance: 6
        task:
          - type: c4.4xlarge
            count: 20
          - type: g2.2xlarge
            count: 6
      logs: s3://my-bucket/logs/
      bootstrap:
        - install_foo.sh
        - name: Install Bar
          path: install_bar.sh
          args: [baz, quux]
    
  • staging: S3_URI

    A S3 folder to use for staging local files for execution on the EMR cluster. Note: the configured AWS credentials must have permission to put and get objects in this folder.

    Examples:

    staging: s3://my-bucket/staging/
    
  • emr.region

    The AWS region to use for EMR service.

  • emr.endpoint

    The AWS EMR endpoint address to use.

  • s3.region

    The AWS region to use for S3 service to store staging files.

  • s3.endpoint

    The AWS S3 endpoint address to use for staging files.

  • kms.region

    The AWS region to use for KMS service to encrypt variables passed to EMR jobs.

  • kms.endpoint

    The AWS KMS endpoint address to use for EMR variable encryption.

  • steps: LIST

    A list of steps to submit to the EMR cluster.

    steps: - type: flink application: flink/WordCount.jar

    - type: hive
      script: queries/hive-query.q
      vars:
        INPUT: s3://my-bucket/data/
        OUTPUT: s3://my-bucket/output/
      hiveconf:
        hive.support.sql11.reserved.keywords: false
    
    - type: spark
      application: spark/pi.scala
    
    - type: spark
      application: s3://my-bucket/spark/hello.py
      args: [foo, bar]
    
    - type: spark
      application: spark/hello.jar
      class: com.example.Hello
      jars:
        - libhello.jar
        - s3://td-spark/td-spark-assembly-0.1.jar
      conf:
        spark.locality.wait: 5s
        spark.memory.fraction: 0.5
      args: [foo, bar]
    
    - type: spark-sql
      query: spark/query.sql
      result: s3://my-bucket/results/${session_uuid}/
    
    - type: script
      script: s3://my-bucket/scripts/hello.sh
      args: [hello, world]
    
    - type: script
      script: scripts/hello.sh
      args: [world]
    
    - type: command
      command: echo
      args: [hello, world]
    
  • action_on_failure: TERMINATE_JOB_FLOW | TERMINATE_CLUSTER | CANCEL_AND_WAIT | CONTINUE

    The action EMR should take in response to a job step failing.

Output parameters

  • emr.last_cluster_id

    The ID of the cluster created. If a pre-existing cluster was used, this parameter will not be set.